I'm trying to make an event processing stream using apache beam.
Steps which happen in my stream:
- Read from kafka topics in avro format & deserialize avro using schema registry
- Create Fixed Size window (1 hour) with triggering every 10 min (processing time)
- Write avro files in GCP dividing directories by topic name. (filename = schema + start-end-window-pane)
Now let's deep into code.
- This code shows how I read from Kafka. I use custom deserializer and coder to deserialize properly using schema registry (in my case it's hortonworks).
KafkaIO.<String, AvroGenericRecord>read()
.withBootstrapServers(bootstrapServers)
.withConsumerConfigUpdates(configUpdates)
.withTopics(inputTopics)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder(BeamKafkaAvroGenericDeserializer.class, AvroGenericCoder.of(serDeConfig()))
.commitOffsetsInFinalize()
.withoutMetadata();
- In pipeline after reading records by KafkaIO is creating windowing.
records.apply(Window.<AvroGenericRecord>into(FixedWindows.of(Duration.standardHours(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))
.withLateFirings(AfterPane.elementCountAtLeast(1))
)
.withAllowedLateness(Duration.standardMinutes(5))
.discardingFiredPanes()
)
What I want to achieve by this window is to group data by event time every 1 hour and trigger every 10 min.
- After grouping by a window it starts writing into Google Cloud Storage (GCS).
public class WriteAvroFilesTr extends PTransform<PCollection<AvroGenericRecord>, WriteFilesResult<AvroDestination>> {
private String baseDir;
private int numberOfShards;
public WriteAvroFilesTr(String baseDir, int numberOfShards) {
this.baseDir = baseDir;
this.numberOfShards = numberOfShards;
}
@Override
public WriteFilesResult<AvroDestination> expand(PCollection<AvroGenericRecord> input) {
ResourceId tempDir = getTempDir(baseDir);
return input.apply(AvroIO.<AvroGenericRecord>writeCustomTypeToGenericRecords()
.withTempDirectory(tempDir)
.withWindowedWrites()
.withNumShards(numberOfShards)
.to(new DynamicAvroGenericRecordDestinations(baseDir, Constants.FILE_EXTENSION))
);
}
private ResourceId getTempDir(String baseDir) {
return FileSystems.matchNewResource(baseDir + "/temp", true);
}
}
And
public class DynamicAvroGenericRecordDestinations extends DynamicAvroDestinations<AvroGenericRecord, AvroDestination, GenericRecord> {
private static final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
private final String baseDir;
private final String fileExtension;
public DynamicAvroGenericRecordDestinations(String baseDir, String fileExtension) {
this.baseDir = baseDir;
this.fileExtension = fileExtension;
}
@Override
public Schema getSchema(AvroDestination destination) {
return new Schema.Parser().parse(destination.jsonSchema);
}
@Override
public GenericRecord formatRecord(AvroGenericRecord record) {
return record.getRecord();
}
@Override
public AvroDestination getDestination(AvroGenericRecord record) {
Schema schema = record.getRecord().getSchema();
return AvroDestination.of(record.getName(), record.getDate(), record.getVersionId(), schema.toString());
}
@Override
public AvroDestination getDefaultDestination() {
return new AvroDestination();
}
@Override
public FileBasedSink.FilenamePolicy getFilenamePolicy(AvroDestination destination) {
String pathStr = baseDir + "/" + destination.name + "/" + destination.date + "/" + destination.name;
return new WindowedFilenamePolicy(FileBasedSink.convertToFileResourceIfPossible(pathStr), destination.version, fileExtension);
}
private static class WindowedFilenamePolicy extends FileBasedSink.FilenamePolicy {
final ResourceId outputFilePrefix;
final String fileExtension;
final Integer version;
WindowedFilenamePolicy(ResourceId outputFilePrefix, Integer version, String fileExtension) {
this.outputFilePrefix = outputFilePrefix;
this.version = version;
this.fileExtension = fileExtension;
}
@Override
public ResourceId windowedFilename(
int shardNumber,
int numShards,
BoundedWindow window,
PaneInfo paneInfo,
FileBasedSink.OutputFileHints outputFileHints) {
IntervalWindow intervalWindow = (IntervalWindow) window;
String filenamePrefix =
outputFilePrefix.isDirectory() ? "" : firstNonNull(outputFilePrefix.getFilename(), "");
String filename =
String.format("%s-%s(%s-%s)-(%s-of-%s)%s", filenamePrefix,
version,
formatter.print(intervalWindow.start()),
formatter.print(intervalWindow.end()),
shardNumber,
numShards - 1,
fileExtension);
ResourceId result = outputFilePrefix.getCurrentDirectory();
return result.resolve(filename, RESOLVE_FILE);
}
@Override
public ResourceId unwindowedFilename(
int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("Expecting windowed outputs only");
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.add(
DisplayData.item("fileNamePrefix", outputFilePrefix.toString())
.withLabel("File Name Prefix"));
}
}
}
I've written down the whole of my pipeline. It kind of works well but I have misunderstood (not sure) that I handle events by event time.
Could someone review my code (especially 1 & 2 steps where I read and group by windows) either it windows by event time or not?
P.S. For every record in Kafka I have timestamp field inside.
UPD
Thanks jjayadeep
I include in KafkaIO custom TimestampPolicy
static class CustomTimestampPolicy extends TimestampPolicy<String, AvroGenericRecord> {
protected Instant currentWatermark;
CustomTimestampPolicy(Optional<Instant> previousWatermark) {
this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
@Override
public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<String, AvroGenericRecord> record) {
currentWatermark = Instant.ofEpochMilli(record.getKV().getValue().getTimestamp());
return currentWatermark;
}
@Override
public Instant getWatermark(PartitionContext ctx) {
return currentWatermark;
}
}