I am new to Cloud Dataflow / Apache Beam, so the concept/programming is still hazy to me.
What I want to do is that Dataflow listens to Pubsub and gets messages of this format in JSON:
{
"productId": "...",
"productName": "..."
}
And transform that to:
{
"productId": "...",
"productName": "...",
"sku": "...",
"inventory": {
"revenue": <some Double>,
"stocks": <some Integer>
}
}
So the steps needed are:
(IngestFromPubsub) Get records from Pubsub by listening to a topic (1 Pubsub message = 1 record)
(EnrichDataFromAPI)
a. Deserialize the payload's JSON string into Java object
b. By calling an external API, using the
sku
, I can enrich the data of each record by adding theinventory
attribute.c. Serialize the records again.
(WriteToGCS) Then every
x
number (can be parameterized) records, I need to write these in Cloud Storage. Please consider also the trivial case thatx=1
. (Doesx=1
, a good idea? I am afraid there will be too many Cloud Storage writes)
Even though I am a Python guy, I am already having difficulty doing this in Python, more so that I need to do write in Java. I am getting headache reading Beam's example in Java, it's too verbose and difficult to follow. All I understand is that each step is an .apply
to the PCollection.
So far, here is the result of my puny effort:
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("IngestFromPubsub", PubsubIO.readStrings().fromTopic(options.getTopic()))
// I don't really understand the next part, I just copied from official documentation and filled in some values
.apply(Window.<String>into(FixedWindows.of(Duration.millis(5000)))
.withAllowedLateness(Duration.millis(5000))
.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1000)))
.discardingFiredPanes()
)
.apply("EnrichDataFromAPI", ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.element();
// help on this part, I heard I need to use Jackson but I don't know, for API HttpClient is sufficient
// ... deserialize, call API, serialize again ...
c.output(enrichedJSONString);
}
}
))
.apply("WriteToGCS",
TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput()))
;
PipelineResult result = pipeline.run();
}
Please fill in the missing parts, and also give me a tip on Windowing (e.g. what's the appropriate configuration etc.) and in which steps should I insert/apply it.
sku
value? Does it persist in the origin message, being delivered from Pub/Sub? – Nick_Khsku
persist in the origin message. – oikonomiyakioption.setStreaming(true)
. Make sense, probably because it is an unbounded stream and Beam wants to lump together records based on time. My question is more on the side of possibility. – oikonomiyaki