0
votes

I am new to Cloud Dataflow / Apache Beam, so the concept/programming is still hazy to me.

What I want to do is that Dataflow listens to Pubsub and gets messages of this format in JSON:

{
  "productId": "...",
  "productName": "..."
}

And transform that to:

{
  "productId": "...",
  "productName": "...",
  "sku": "...",
  "inventory": {
    "revenue": <some Double>,
    "stocks":  <some Integer>
  }
}

So the steps needed are:

  1. (IngestFromPubsub) Get records from Pubsub by listening to a topic (1 Pubsub message = 1 record)

  2. (EnrichDataFromAPI)

    a. Deserialize the payload's JSON string into Java object

    b. By calling an external API, using the sku, I can enrich the data of each record by adding the inventory attribute.

    c. Serialize the records again.

  3. (WriteToGCS) Then every x number (can be parameterized) records, I need to write these in Cloud Storage. Please consider also the trivial case that x=1. (Does x=1, a good idea? I am afraid there will be too many Cloud Storage writes)

Even though I am a Python guy, I am already having difficulty doing this in Python, more so that I need to do write in Java. I am getting headache reading Beam's example in Java, it's too verbose and difficult to follow. All I understand is that each step is an .apply to the PCollection.

So far, here is the result of my puny effort:

public static void main(String[] args) {

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("IngestFromPubsub", PubsubIO.readStrings().fromTopic(options.getTopic()))
        // I don't really understand the next part, I just copied from official documentation and filled in some values
        .apply(Window.<String>into(FixedWindows.of(Duration.millis(5000)))
            .withAllowedLateness(Duration.millis(5000))
            .triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1000)))
            .discardingFiredPanes()
        )
        .apply("EnrichDataFromAPI", ParDo.of(
            new DoFn<String, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    c.element();
                    // help on this part, I heard I need to use Jackson but I don't know, for API HttpClient is sufficient
                    // ... deserialize, call API, serialize again ...
                    c.output(enrichedJSONString);
                }
            }
        ))
        .apply("WriteToGCS", 
            TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput()))
    ;


    PipelineResult result = pipeline.run();
}

Please fill in the missing parts, and also give me a tip on Windowing (e.g. what's the appropriate configuration etc.) and in which steps should I insert/apply it.

1
How do you get sku value? Does it persist in the origin message, being delivered from Pub/Sub?Nick_Kh
As simple as a GET from myapihost.com/api/products/<sku>. Yes, the sku persist in the origin message.oikonomiyaki
What is the reason that you want to do custom batching when writing to GCS? Beam does batching for you by default.Mikhail Gryzykhin
@MikhailGryzykhin Initially, I only want one record-one Storage write. I didn't have Windowing initially, but when I try to execute there is a runtime error that says I need Windowing because option.setStreaming(true). Make sense, probably because it is an unbounded stream and Beam wants to lump together records based on time. My question is more on the side of possibility.oikonomiyaki

1 Answers

2
votes
  • I don't think you need any of the windowing in your IngestFromPubsub and EnrichDataFromAPI. The purpose of windowing is to group your records that are nearby in time together into windows so you can compute aggregate computations over them. But since you are not doing any aggregate computations, and are interested in dealing with each record independently, you don't need windows.

  • Since you are always converting one input record to one output record, your EnrichDataFromAPI should be a MapElements. This should make the code easier.

  • There are resources out there for processing JSON in Apache Bean Java: Apache Beam stream processing of json data

  • You don't necessarily need to use Jackson to map the JSON to a Java object. You might be able to manipulate the JSON directly. You can use Java's native JSON API to parse/manipulate/serialize.