I Implemented the pipeline that reads a JSON, which contains an AWS S3 Key, from Pub/Sub; and download the S3 Object consisting newline-delimited JSON (NDJson); then, write the NDJson into files in GCS with windowing and GroupByKey.
Obviously, it is streaming with 10 seconds windowing and AfterPane.elementCountAtLeast(1000).
However, the windowing was not divided and there is no files in GCS. Additionally, the exception, Shutting down JVM after 8 consecutive periods of measured GC thrashing. Memory is used/total/max = 4295/4946/4946 MB, GC last/max = 98.54/98.54 %, #pushbacks=1, gc thrashing=true. Heap dump not written.
, was occured.
The below figure is a job graph of my Dataflow. The GroupIntoShards
does not output the data. I cannot understood why this is so. Furthermore, this was occurred unbound set. In other words, the result is fine if the number of Pub/Sub messages is a few (maybe it is enough that the GC does not occur). The reason maybe is closing the window because a subscription of Pub/Sub does not has any unacked messages. But, if the message was put continuously, GroupIntoShards
does not works and there is no files in GCS.
enter image description here
The following is a part of my source:
In a main class,
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(options);
PCollection<String> inputed =
.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))
.apply("NDJson Divider", ParDo.of(new NDJsonDivider(options.getSecretId())));
inputed
.apply(options.getWindowDuration() + " Window",
Window.<String>into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration())))
.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterWatermark.pastEndOfWindow(),
AfterPane.elementCountAtLeast(1000),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(DurationUtils.parseDuration(options.getWindowDuration())))))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.standardDays(2))
)
.apply(FileIO.<String, String>writeDynamic()
.by(new TableRowPartitionContextFn())
.via(TextIO.sink())
.to(options.getOutputDirectory())
.withNaming(PartitionedFileNaming::new)
.withNumShards(options.getNumShards())
.withDestinationCoder(StringUtf8Coder.of())
.withCompression(Compression.GZIP)
);
return pipeline.run();
}
In NDJsonDivider
class,
@ProcessElement
public void processElement(ProcessContext c) {
JsonObject msg;
try {
msg = new JsonParser().parse(new String(c.element().getPayload())).getAsJsonObject();
} catch (JsonSyntaxException | IllegalStateException e) {
LOG.error(e);
return;
}
if (msg.has(RECORDS) && msg.get(RECORDS).isJsonArray()) {
JsonArray records = msg.get(RECORDS).getAsJsonArray();
try (BufferedReader reader = processPutEventMessage(records)) {
if (reader == null) {
return;
}
String line;
while (Objects.nonNull(line = reader.readLine())) {
c.outputWithTimestamp(line, Instant.now());
}
} catch (IOException e) {
LOG.error("Failed to process Put Event Message: ", e);
}
}
}
private BufferedReader processPutEventMessage(JsonArray records) {
try {
putEventMessage s3Obj = extractS3ObjectInfo(records);
GCPBridge.getInstance().setSecretId(this.secretId);
return S3.getReader(s3Obj.region, s3Obj.bucket, s3Obj.key, "UTF-8");
} catch (IllegalArgumentException | AWSS3Exception | NoSuchKeyException e) {
LOG.error("Failed to access S3:", e);
}
return null;
}
In TableRowPartitionContextFn
class,
class TableRowPartitionContextFn implements SerializableFunction<String, String> {
@Override
public String apply(String e) {
JsonObject data = new JsonParser().parse(e).getAsJsonObject();
String Id = "", Type = "";
if (data.has("id")) {
Id = data.get("id").getAsString();
}
if (data.has("type")) {
Type = data.get("type").getAsString();
}
return Id + "/" + Type;
}
}
mvn -Pdataflow-runner compile exec:java -Dexec.mainClass=com.dev.playground -Dexec.args="--project=PROJECTID --inputTopic=projects/PROJECTID/topics/dev --outputDirectory=gs://dev/playground/output/ --secretId=dev --tempLocation=gs://dev/playground/tmp/ --runner=DataflowRunner
The duration is set by default @Default.String("10s") String getWindowDuration();
mvn -Pdataflow-runner compile exec:java -Dexec.mainClass=com.dev.playground -Dexec.args="--project=PROJECTID --inputTopic=projects/PROJECTID/topics/dev --outputDirectory=gs://dev/playground/output/ --secretId=dev --tempLocation=gs://dev/playground/tmp/ --runner=DataflowRunner
The duration is set by default@Default.String("10s") String getWindowDuration();
– Jae