2
votes

I am a newbie to Kafka-stream and I am experimenting it to process a steam of messages.

Scenario

Incoming payload structure is:

"building-<M>, sensor-<N>.<parameter>, value, timestamp". 

For example:

"building-1, sensor-1.temperature, 18, 2020-06-12T15:01:05Z"
"building-1, sensor-1.humidity, 75, 2020-06-12T15:01:05Z"
"building-1, sensor-2.temperature, 20, 2020-06-12T15:01:05Z"
"building-1, sensor-2.humidity, 70, 2020-06-12T15:01:05Z"

Message key in kafka is building-id.

Stream transforms this as a POJO for further downstream processing:

SensorData {
  buildingId = "building-1"
  sensorId = "sensor-1"
  parameterName = "temperature"
  parameterValue = 18
  timestamp = 1592048743000
  ..
  ..
}

Each sensor will send all of its parameters at same time as separate record. Each set of feed comes at every 5 minutes from each sensor.

Time stamp extractor is set to take the time from payload. It will also reject the record if the timestamp on the record is way off (say 1 hour deviation from current stream time)

In my topology, at one point, I want to perform an aggregate operation combining all the data from one sensor. For example, in the above sample, I want to perform an aggregation for each sensor using the temperature and humidity reported by that sensor.

Topology

I do a group using "buildingId" and "sensorId", then apply a session window of 2 minute gap with 1 minute grace period.

kStreamBuilder
    .stream("building-sensor-updates", ...)
    //Had to cleanup key and also needed some data from context
    .tranform(() -> new String2SensorObjectConvertor()) 
     //triggers another re-partition
    .groupBy((key, value) -> value.buildingId + "-" + value.sensorId, ...)
    .windowedBy(SessionWindows.with(..))
    .aggregate(
            () -> new SensorDataAggregator, 
            ...,
            Materialized.<String, SensorDataAggregator, 
              SessionStore<Bytes, byte[]>>as("session_aggregate_store"))
    .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
    .toStream()
    ...
    ...

As expected, this triggers a re-partition and sub-stream will consume records from this re-partition topic "sensor_data_processor-session_aggregate_store-repartition". I am seeing an issue there as explained later.

Test input data

I am testing a scenario where past data is re-processed again from storage or from Kafka offset. For testing, I feeding data from csv using Kafka-spool-connect. Time stamp of each record in the input CSV file is kept in ascending order. For a same sensor, next set of records will have 5 minutes increased timestamp.

"building-1, sensor-1.temperature, 18, 2020-06-12T15:01:02Z"
"building-1, sensor-1.humidity, 75, 2020-06-12T15:01:05Z"
"building-1, sensor-2.temperature, 20, 2020-06-12T15:01:03Z"
"building-1, sensor-2.humidity, 70, 2020-06-12T15:01:06Z"
"building-1, sensor-1.temperature, 19, 2020-06-12T15:06:04Z"
"building-1, sensor-1.humidity, 65, 2020-06-12T15:06:08Z"
"building-1, sensor-2.temperature, 21, 2020-06-12T15:06:05Z"
"building-1, sensor-2.humidity, 73, 2020-06-12T15:06:09Z"

I inject test data in bulk (200000) without any delay.

Issue

When the substream process the records from this re partition topic, I see following WARNING message from KStreamSessionWindowAggregate and the records gets skipped.

WARN org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate - Skipping record for expired window. key=[BUILDING-ID-1003-sensor-1] topic=[sensor_data_processor-session_aggregate_store-repartition] partition=[0] offset=[1870] timestamp=[1591872043000] window=[1591872043000,1591872043000] expiration=[1591951243000] streamTime=[1591951303000]

If you look at the time stamps in the WARNING message,

  • Time stamp of the message is "June 11, 2020 10:40:43Z"
  • Stream time has already passed "June 12, 2020 08:40:43Z"
  • Window expiration June 12, 2020 08:41:43Z

I tried with time window of 7 minutes with 2 min advance. I had similar issue there as well.

Observations

  1. As the key of the original messages is "building-id", all records from same building (and hence same sensor) should go in to one partition and the records from each sensor should be in order.

  2. I am also doing a tranform() at the beginning of topology. I had to cleanup key and also wanted some data from context. Though this may trigger a re-partition, this should not changed the order of records within a sensor as it only does a cleanup of key and hence the partition outcome would maintain same elements in the partition. I will get rid of this tranform() with some optimization.

  3. My window grouping is based on building-id + sensor-id, so the elements from same sensor in each re-partitioned group also should be coming in order.

Given all this, I was hoping that each partition/group's stream-time will monotonically progress as per the timestamp of the events in that partition as their order is maintained. But I see a jump in the stream-time. I looked at org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate and some kafka-stream documentations -

It appears to me, monotonic stream-time is maintained for stream-task and not per partitions. And same stream-task may be used for processing multiple topic partitions. Because the records are injected in quick succession, it may process a bulk of records from a a partition and when it picks up another topic partition, the stream time might have already crossed a lot compared to the time stamp of records in the new topic partition which will result in expiring.

Questions

  1. For replaying records like this, how this can be handled other than putting a large grace period for the window.

  2. Even in realtime scenario, this issue might happen if there are back pressure. Using a large grace period is not an option as results will get delayed as I am using Suppresed.untilWindowClose(). What would be the best way to handle this?

  3. If stream-time is maintained for stream-task and same task may be used for multiple topic partitions, is there anyway we can keep 1-1 mapping and stickiness between stream-task and topic partitions? If so, what would be the implications other than potential performance issues?

  4. Why wouldn't kafka-stream maintain stream-time for topic partition instead of per stream-task?

  5. When I looked at the "sensor_data_processor-session_aggregate_store-re-partition" topic mentioned in the warning message, I see that most of "temperature" records alone are getting published to that topic (Yes, for each group, "temperature" comes first in the test data set). Why only temperature records goes in to that topic ? Is it just a timing coincidence?

1
did you manage to solve the problem ? I'm interested in knowing how you solved it. I'm experiencing a quite similar situation issues.apache.org/jira/browse/KAFKA-10844mathieu
@mathieu Nop, wasn't solved. I removed 'Suppressed.untilWindowCloses' and used a large grace period and update the result in sink whenever there is an emit from the window. I had to do some adjustments to rekey the result to make sure same records get updated in the sink when there is an updated result for the same window instance. Its not efficient, but works for now.Rajesh Jose
Thanks for answering. We are on the same track (removing suppress, larger grace period, updating results).mathieu

1 Answers

2
votes

For replaying records like this, how this can be handled other than putting a large grace period for the window.

I guess you cannot. If you process data of today, and later data from yesterday, data from yesterday would be discarded. What you could do it, to start a new application. For this case, on startup the app has no stream time, and thus it will init its stream time with "yesterday" and thus data won't be discarded.

Even in realtime scenario, this issue might happen if there are back pressure. Using a large grace period is not an option as results will get delayed as I am using Suppresed.untilWindowClose(). What would be the best way to handle this?

Well, you have to pick your poison... Or you fall back to the Processor API and implement whatever logic you need manually.

If stream-time is maintained for stream-task and same task may be used for multiple topic partitions, is there anyway we can keep 1-1 mapping and stickiness between stream-task and topic partitions? If so, what would be the implications other than potential performance issues?

Stream time is definitely maintained per task, and there is a 1:1 mapping between tasks and partitions. Maybe the data is shuffled unexpectedly. My window grouping is based on building-id + sensor-id, so the elements from same sensor in each re-partitioned group also should be coming in order.: agreed, however, data would still be shuffled; thus, if one upstream task processed data faster than its "parallel" pears, it would lead to a fast advance of stream time if all downstream tasks, too.

Why wouldn't kafka-stream maintain stream-time for topic partition instead of per stream-task?

Not sure if I can follow. Each task tracks stream time individually. And there is a 1:1 mapping between tasks and partition. Hence, it seems both (tracking per partition or tracking per task -- assuming there is only one input partition per task) is the same.