1
votes

Recently, I have started working with Apache Beam and Google's Cloud Data Flow, to develop Big Data processing pipeline. I am planning to leverage Beam's internal stateful processing model, to develop my processing pipeline.

Below is gist of what I want to achieve

  • Read data from non-bounded (streaming) source, for GCP this would be PubSub. Transform the data into KV
  • Read data from bounded source, for GCP my case this would be GCS. Transform the data into KV
  • As one of the source is non-bounded, I have to do Window and Triggering. I am choosing to use Global Window, as I want to join data coming from the non-bounded source to the bounded source if it already exists in my internal state management.
  • Join data from PubSub and GCS, on common key.
  • Add data to Beam's internal state, using Beam's BagState (StateId and StateSpec). Emit the Iterable of data added to BagState. Flatten Iterable, and write the emitted PCollection to GCS
  • Perform some ParDo(function), on iterable of data. Write the emitted PCollection to GCS

Below is a sample code snippet

public static PipelineResult run(BeamStateFullProcessingPoC.Options options) {

    // User - Think of it as Java POJO / Avro record / Protobuf message.

    // create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    /**
    *   Step - 1
    *   Read data from non-bounded (streaming) source, for GCP this would be PubSub.  
    *   Transform the data into KV<String, Object>
    */
    final PCollection<PubsubMessage> pubsubEvents = ... 
    final PCollection<KV<String, User>> pubSubUserByUserId = ...

    /**
    *   Step - 2
    *   Read data from bounded source, for GCP my case this would be GCS. 
    *   Transform the data into KV<String, Object>
    */
    final PCollection<User> users = ...
    final PCollection<KV<String, User>> gcsUserByUserId = ...

    List<PCollection<KV<String, User>>> pCollectionList = new ArrayList<>();
    pCollectionList.add(pubSubUserByUserId);
    pCollectionList.add(gcsUserByUserId);

    PCollection<KV<String, User>> allKVData = PCollectionList
                  .of(pCollectionList)
                  .apply("flatten KV ID with User", Flatten.pCollections());


    /**
    *   Step - 3 
    *   Perform Window + Triggering and GroupByKey
    *   As one of the Source is streaming, we need to do Window and trigger, before grouping by key
    */
    final PCollection<KV<String, Iterable<User>>> iterableUserByIdKV = allKVData
            .apply("batch data by window + trigger",
                    Window.<KV<String, User>> into(new GlobalWindows())
                    .triggering(AfterProcessingTime.pastFirstElementInPane())
                    .discardingFiredPanes())
    .apply("GroupByKey per User", GroupByKey.create());

    /**
    *   Step - 4
    *   Add User to Beam's internal state, using Beam's BagState (StateId and StateSpec)
    *   Emit the Iterable<User> added to BagState
    *   Flatten Iterable, and write the emitted PCollection to GCS
    */    
    final PCollection<Iterable<User>> iterableUser = iterableUserByIdKV
            .apply("User added to State by Key", ParDo.of(new CreateInternalStateDoFn()));

    final PCollection<User> userAddedToState = iterableUser
            .apply("flatten userAddedToState", Flatten.iterables());
    userAddedToState.apply("write userAddedToState", AvroIO.write(User.class)
            .to(options.getOutputDirectoryForUserState())
            .withSuffix(".avro")
            .withWindowedWrites()
            .withNumShards(options.getNumShards()));


    /**
    *   Step - 5
    *   Perform some function via ParDo on Iterable<User> 
    *   Write emitted data to GCS
    */
    final PCollection<User> changeGenderUser = iterableUser
            .apply("DetectChangeGenderDoFn", ParDo.of(new DetectChangeGenderDoFn()));

    changeGenderUser.apply("write change gender", AvroIO.write(User.class)
            .to(options.getOutputDirectoryForChangeGender())
            .withSuffix(".avro")
            .withWindowedWrites()
            .withNumShards(options.getNumShards()));

    return pipeline.run();
}

Below is JSON paylod to create Data Flow Template Job

{
  "jobName": "poc-beam-state-management",
  "parameters": {
    "personSubscription": "projects/<project-name>/subscriptions/<subscription-name>",
    "locationForUser": "gs://<bucket>/<user-folder>/*.avro",
    "outputDirectoryForChangeGender": "gs://<bucket>/<folder>/",
    "outputDirectoryForUserState": "gs://<bucket>/<folder>/",
    "avroTempDirectory": "gs://<bucket>/<folder>/",
    "numShards": "5",
    "autoscalingAlgorithm": "THROUGHPUT_BASED",
    "numWorkers": "3",
    "maxNumWorkers": "18"    
  },
  "environment": {
    "subnetwork": "<some-subnet>",
    "zone": "<some-zone>",
    "serviceAccountEmail": "<some-service-account>",
  },
  "gcsPath": "gs://<bucket>/<folder>/templates/<TemplateName>"
}

When my Data Flow job starts, it is only performing the work on 1 Worker node.
I am under the assumption, that Google Cloud Platform, Data Flow will automagically scale the job with Worker nodes, as needed.

Q: How can Data Flow job, auto scale and leverage GCP capability of performing work in distributed fashion?

Data Flow Job DAG (sub-part-1) Data Flow Job DAG (sub-part-2)

1

1 Answers

0
votes

So your base assumption in the list you provided above has some flaws. You state that As one of the source is non-bounded... I am choosing to use Global Window, as I want to join data coming from the non-bounded source to the bounded source...emphasized text

In Beam, you can not perform a global window on an infinite stream since you can not fit the stream into memory. You would need to fit them into fixed windows which you can read here. Because of the global window the job would not never complete.

Secondly, if you are running a Dataflow streaming job then google defaults the job to autoscalingAlgorithm=NONE. You would want to specify it to autoscalingAlgorithm=THROUGHPUT_BASED. You can find details here that explain it better. This prevents your machine from autoscaling.

Hope this provides the answer you were looking for.