5
votes

I'm using FluentD (v.12 last stable version) to send messages to Kafka. But FluentD is using an old KafkaProducer, so that the records timestamp is always set to -1. Thus i have to use the WallclockTimestampExtractor to set the timestamp of the record to the point in time, when the message arrives in kafka.

The timestamp i'm realy interested in, is send by fluentd within the message:

"timestamp":"1507885936","host":"V.X.Y.Z."

record representation in kafka:

offset = 0, timestamp= - 1, key = null, value = {"timestamp":"1507885936","host":"V.X.Y.Z."}

i would like to have a record like this in kafka:

offset = 0, timestamp= 1507885936, key = null, value = {"timestamp":"1507885936","host":"V.X.Y.Z."}

my workaround would look like: - write a consumer to extract the timestamp (https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html)

  • write a producer to produce a new record with the timestamp set (ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)

I would prefer a KafkaStreams solution, if there is one.

1
Cannot follow your question. What do you try to achieve?Matthias J. Sax
Thanks, @MatthiasJ.Sax ! I edited the question and hope my request is clearer nowsunjazz

1 Answers

7
votes

You can write a very simple Kafka Streams Application like:

KStreamBuilder builder = new KStreamBuilder();
builder.stream("input-topic").to("output-topic");

and configure the application with a custom TimestampExtractor that extract the timestamp from the record and returns it.

Kafka Streams will use the returned timestamps when writing the records back to Kafka.

Note: if you have out of order data -- ie, timestamps are not strictly ordered -- the result will contain out of order timestamps, too. Kafka Streams uses the returned timestamps to writing back to Kafka (ie, whatever the extractor returns, is used as record metadata timestamp). Note, that on write, the timestamp from the currently processed input record is used for all generated output records -- this hold for version 1.0 but might change in future releases.).