1
votes

I'm trying to make an event processing stream using apache beam.

Steps which happen in my stream:

  1. Read from kafka topics in avro format & deserialize avro using schema registry
  2. Create Fixed Size window (1 hour) with triggering every 10 min (processing time)
  3. Write avro files in GCP dividing directories by topic name. (filename = schema + start-end-window-pane)

Now let's deep into code.

  1. This code shows how I read from Kafka. I use custom deserializer and coder to deserialize properly using schema registry (in my case it's hortonworks).
KafkaIO.<String, AvroGenericRecord>read()
               .withBootstrapServers(bootstrapServers)
               .withConsumerConfigUpdates(configUpdates)
               .withTopics(inputTopics)
               .withKeyDeserializer(StringDeserializer.class)
               .withValueDeserializerAndCoder(BeamKafkaAvroGenericDeserializer.class, AvroGenericCoder.of(serDeConfig()))
               .commitOffsetsInFinalize()
               .withoutMetadata();
  1. In pipeline after reading records by KafkaIO is creating windowing.
records.apply(Window.<AvroGenericRecord>into(FixedWindows.of(Duration.standardHours(1)))
                .triggering(AfterWatermark.pastEndOfWindow()
                        .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))
                        .withLateFirings(AfterPane.elementCountAtLeast(1))
                )
                .withAllowedLateness(Duration.standardMinutes(5))
                .discardingFiredPanes()
        )

What I want to achieve by this window is to group data by event time every 1 hour and trigger every 10 min.

  1. After grouping by a window it starts writing into Google Cloud Storage (GCS).
public class WriteAvroFilesTr extends PTransform<PCollection<AvroGenericRecord>, WriteFilesResult<AvroDestination>> {
    private String baseDir;
    private int numberOfShards;

    public WriteAvroFilesTr(String baseDir, int numberOfShards) {
        this.baseDir = baseDir;
        this.numberOfShards = numberOfShards;
    }

    @Override
    public WriteFilesResult<AvroDestination> expand(PCollection<AvroGenericRecord> input) {
        ResourceId tempDir = getTempDir(baseDir);

        return input.apply(AvroIO.<AvroGenericRecord>writeCustomTypeToGenericRecords()
                .withTempDirectory(tempDir)
                .withWindowedWrites()
                .withNumShards(numberOfShards)
                .to(new DynamicAvroGenericRecordDestinations(baseDir, Constants.FILE_EXTENSION))
        );
    }

    private ResourceId getTempDir(String baseDir) {
        return FileSystems.matchNewResource(baseDir + "/temp", true);
    }
}

And

public class DynamicAvroGenericRecordDestinations extends DynamicAvroDestinations<AvroGenericRecord, AvroDestination, GenericRecord> {
    private static final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
    private final String baseDir;
    private final String fileExtension;

    public DynamicAvroGenericRecordDestinations(String baseDir, String fileExtension) {
        this.baseDir = baseDir;
        this.fileExtension = fileExtension;
    }

    @Override
    public Schema getSchema(AvroDestination destination) {
        return new Schema.Parser().parse(destination.jsonSchema);
    }

    @Override
    public GenericRecord formatRecord(AvroGenericRecord record) {
        return record.getRecord();
    }

    @Override
    public AvroDestination getDestination(AvroGenericRecord record) {
        Schema schema = record.getRecord().getSchema();
        return AvroDestination.of(record.getName(), record.getDate(), record.getVersionId(), schema.toString());
    }

    @Override
    public AvroDestination getDefaultDestination() {
        return new AvroDestination();
    }

    @Override
    public FileBasedSink.FilenamePolicy getFilenamePolicy(AvroDestination destination) {
        String pathStr = baseDir + "/" + destination.name + "/" + destination.date + "/" + destination.name;
        return new WindowedFilenamePolicy(FileBasedSink.convertToFileResourceIfPossible(pathStr), destination.version, fileExtension);
    }

    private static class WindowedFilenamePolicy extends FileBasedSink.FilenamePolicy {
        final ResourceId outputFilePrefix;
        final String fileExtension;
        final Integer version;

        WindowedFilenamePolicy(ResourceId outputFilePrefix, Integer version, String fileExtension) {
            this.outputFilePrefix = outputFilePrefix;
            this.version = version;
            this.fileExtension = fileExtension;
        }

        @Override
        public ResourceId windowedFilename(
                int shardNumber,
                int numShards,
                BoundedWindow window,
                PaneInfo paneInfo,
                FileBasedSink.OutputFileHints outputFileHints) {

            IntervalWindow intervalWindow = (IntervalWindow) window;

            String filenamePrefix =
                    outputFilePrefix.isDirectory() ? "" : firstNonNull(outputFilePrefix.getFilename(), "");

            String filename =
                    String.format("%s-%s(%s-%s)-(%s-of-%s)%s", filenamePrefix,
                            version,
                            formatter.print(intervalWindow.start()),
                            formatter.print(intervalWindow.end()),
                            shardNumber,
                            numShards - 1,
                            fileExtension);
            ResourceId result = outputFilePrefix.getCurrentDirectory();
            return result.resolve(filename, RESOLVE_FILE);
        }


        @Override
        public ResourceId unwindowedFilename(
                int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) {
            throw new UnsupportedOperationException("Expecting windowed outputs only");
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            builder.add(
                    DisplayData.item("fileNamePrefix", outputFilePrefix.toString())
                            .withLabel("File Name Prefix"));
        }
    }

}

I've written down the whole of my pipeline. It kind of works well but I have misunderstood (not sure) that I handle events by event time.

Could someone review my code (especially 1 & 2 steps where I read and group by windows) either it windows by event time or not?

P.S. For every record in Kafka I have timestamp field inside.

UPD

Thanks jjayadeep

I include in KafkaIO custom TimestampPolicy

static class CustomTimestampPolicy extends TimestampPolicy<String, AvroGenericRecord> {

        protected Instant currentWatermark;

        CustomTimestampPolicy(Optional<Instant> previousWatermark) {
            this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        @Override
        public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<String, AvroGenericRecord> record) {
            currentWatermark = Instant.ofEpochMilli(record.getKV().getValue().getTimestamp());
            return currentWatermark;
        }

        @Override
        public Instant getWatermark(PartitionContext ctx) {
            return currentWatermark;
        }
    }
1

1 Answers

1
votes

From the documentation present here [1] event time is used as the processing time by default in KafkaIO

By default, record timestamp (event time) is set to processing time in KafkaIO reader and source watermark is current wall time. If a topic has Kafka server-side ingestion timestamp enabled ('LogAppendTime'), it can enabled with KafkaIO.Read.withLogAppendTime(). A custom timestamp policy can be provided by implementing TimestampPolicyFactory. See KafkaIO.Read.withTimestampPolicyFactory(TimestampPolicyFactory) for more information.

Also processing time is the default timestamp method used as documented below

// set event times and watermark based on LogAppendTime. To provide a custom
// policy see withTimestampPolicyFactory(). withProcessingTime() is the default.

1 - https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/kafka/KafkaIO.html