0
votes

I Implemented the pipeline that reads a JSON, which contains an AWS S3 Key, from Pub/Sub; and download the S3 Object consisting newline-delimited JSON (NDJson); then, write the NDJson into files in GCS with windowing and GroupByKey.

Obviously, it is streaming with 10 seconds windowing and AfterPane.elementCountAtLeast(1000). However, the windowing was not divided and there is no files in GCS. Additionally, the exception, Shutting down JVM after 8 consecutive periods of measured GC thrashing. Memory is used/total/max = 4295/4946/4946 MB, GC last/max = 98.54/98.54 %, #pushbacks=1, gc thrashing=true. Heap dump not written., was occured.

The below figure is a job graph of my Dataflow. The GroupIntoShards does not output the data. I cannot understood why this is so. Furthermore, this was occurred unbound set. In other words, the result is fine if the number of Pub/Sub messages is a few (maybe it is enough that the GC does not occur). The reason maybe is closing the window because a subscription of Pub/Sub does not has any unacked messages. But, if the message was put continuously, GroupIntoShards does not works and there is no files in GCS. enter image description here

The following is a part of my source:

In a main class,

public static PipelineResult run(Options options) {
    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> inputed =
        .apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))
        .apply("NDJson Divider", ParDo.of(new NDJsonDivider(options.getSecretId())));

    inputed
        .apply(options.getWindowDuration() + " Window",
            Window.<String>into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration())))
                .triggering(
                    Repeatedly.forever(
                        AfterFirst.of(
                            AfterWatermark.pastEndOfWindow(),
                            AfterPane.elementCountAtLeast(1000),
                            AfterProcessingTime.pastFirstElementInPane().plusDelayOf(DurationUtils.parseDuration(options.getWindowDuration())))))
                .accumulatingFiredPanes()
                .withAllowedLateness(Duration.standardDays(2))
        )
        .apply(FileIO.<String, String>writeDynamic()
            .by(new TableRowPartitionContextFn())
            .via(TextIO.sink())
            .to(options.getOutputDirectory())
            .withNaming(PartitionedFileNaming::new)
            .withNumShards(options.getNumShards())
            .withDestinationCoder(StringUtf8Coder.of())
            .withCompression(Compression.GZIP)
        );

    return pipeline.run();
  }

In NDJsonDivider class,


  @ProcessElement
  public void processElement(ProcessContext c) {
    JsonObject msg;
    try {
      msg = new JsonParser().parse(new String(c.element().getPayload())).getAsJsonObject();
    } catch (JsonSyntaxException | IllegalStateException e) {
      LOG.error(e);
      return;
    }

    if (msg.has(RECORDS) && msg.get(RECORDS).isJsonArray()) {
      JsonArray records = msg.get(RECORDS).getAsJsonArray();
      try (BufferedReader reader = processPutEventMessage(records)) {
        if (reader == null) {
          return;
        }
        String line;
        while (Objects.nonNull(line = reader.readLine())) {
          c.outputWithTimestamp(line, Instant.now());
        }
      } catch (IOException e) {
        LOG.error("Failed to process Put Event Message: ", e);
      }
    }
  }

  private BufferedReader processPutEventMessage(JsonArray records) {
    try {
      putEventMessage s3Obj = extractS3ObjectInfo(records);
      GCPBridge.getInstance().setSecretId(this.secretId);
      return S3.getReader(s3Obj.region, s3Obj.bucket, s3Obj.key, "UTF-8");
    } catch (IllegalArgumentException | AWSS3Exception | NoSuchKeyException e) {
      LOG.error("Failed to access S3:", e);
    }
    return null;
  }

In TableRowPartitionContextFn class,

class TableRowPartitionContextFn implements SerializableFunction<String, String> {
  @Override
  public String apply(String e) {
    JsonObject data = new JsonParser().parse(e).getAsJsonObject();
    String Id = "", Type = "";
    if (data.has("id")) {
      Id = data.get("id").getAsString();
    }
    if (data.has("type")) {
      Type = data.get("type").getAsString();
    }
    return Id + "/" + Type;
  }
}
mvn -Pdataflow-runner compile exec:java -Dexec.mainClass=com.dev.playground -Dexec.args="--project=PROJECTID --inputTopic=projects/PROJECTID/topics/dev --outputDirectory=gs://dev/playground/output/ --secretId=dev --tempLocation=gs://dev/playground/tmp/ --runner=DataflowRunner

The duration is set by default @Default.String("10s") String getWindowDuration();

1
So what's the question?Mardoz
Would you mind please providing the command line you used to run the pipeline? I am wondering what the option is for durationAlex Amato
@Mardoz I wonder why the data set has not divided by the windowing and GroupByKey.Jae
@AlexAmato The command is: mvn -Pdataflow-runner compile exec:java -Dexec.mainClass=com.dev.playground -Dexec.args="--project=PROJECTID --inputTopic=projects/PROJECTID/topics/dev --outputDirectory=gs://dev/playground/output/ --secretId=dev --tempLocation=gs://dev/playground/tmp/ --runner=DataflowRunner The duration is set by default @Default.String("10s") String getWindowDuration();Jae

1 Answers

1
votes

I suspect the combination of Repeatedly.forever() with .accumulatingFiredPanes() will store a buffer of the data forever, which eventually leads to an out of memory error.

Please try using discardingFiredPanes(), which will not keep the elements around from previous firings of the pane.

This will mean that only the new elements which arrive will be emitted in the pane when triggered. It doesn't seem like you are doing any sort of aggregation/combined result with all of the values in the pane. It looks like you are just writing them to output directly, so I don't think you need accumulatingFiredPanes.

If you wanted to calculate some sort of aggregated statistic or value based all all the elements in the Window (e.g. a mean, sum, etc.). Then I would advise using a Combiner, after the Window to accumulate a result.