In my pipeline I want to output the messages to one of the PubSub topics based on the result from previous transformation. At the moment I'm sending output to the same topic:
SearchItemGeneratorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(SearchItemGeneratorOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(...)
//other transformations
.apply("ParseFile", new ParseFile()) // outputs PCollection<Message>, where each Message has a MessageType property with the name of the topic.
.apply("WriteItemsToTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));
And this is my Message object:
class Message {
private MessageType messageType;
private String payload;
//constructor, getters
}
My ParseFile transformer outputs PCollection and each Message object has a property messageType. Based on the messageType property I wanted to output to the different PubSub topics payload property of the Message. I read in this article paragraph Multiple transforms process the same PCollection but still didn't get how I can apply it or other solutions in my case.
Update thanks @Andrew for your solution. I solved my issue by using TupleTag but approach is similar. I created two different TupleTag objects in the main pipeline:
public static final TupleTag<String> full = new TupleTag<>("full");
public static final TupleTag<String> delta = new TupleTag<>("delta");
And then based on my condition I output the message in the DoFn with correct TupleTag:
TupleTag tupleTag = //assign full or delta TupleTag
processContext.output(tupleTag, jsonObject.toString());
And selected in the main pipeline from PCollectionTuple by each TupleTag to send to the Pub/Sub topics.
messages.get(full)
.apply("SendToIndexTopic", PubsubIO.writeStrings().to(options.getOutputIndexTopic()));
messages.get(delta)
.apply("SendToDeltaTopic", PubsubIO.writeStrings().to(options.getOutputDeltaTopic()));
The only thing to mention is that my TupleTag objects are static objects.