2
votes

My flink application does the following

  1. source: read data in form of records from Kafka
  2. split: based on certain criteria
  3. window : timewindow of 10seconds to aggregate into one bulkrecord
  4. sink: dump these bulkrecords to elasticsearch

I am facing issue where flink consumer is not able to hold data for 10seconds, and is throwing the following exception:

Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=18340663 , maxSize=5242880

I cannot apply countWindow, because if the frequency of records is too slow, then the elasticsearch sink might be deferred for a long time.

My question:

Is it possible to apply a OR function of TimeWindow and CountWindow, which goes as

> if ( recordCount is 500 OR 10 seconds have elapsed)
>           then dump data to flink
2
I guess a GlobalWindow and a custom Trigger (ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/…) should do the job. - TobiSH

2 Answers

1
votes

Not directly. But you can use a GlobalWindow with a custom triggering logic. Take a look at the source for the count trigger here.

Your triggering logic will look something like this.

private final ReducingStateDescriptor<Long> stateDesc = 
    new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
private long triggerTimestamp = 0;

@Override
public TriggerResult onElement(String element, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {

    ReducingState<Long> count = triggerContext.getPartitionedState(stateDesc);

    // Increment window counter by one, when an element is received
    count.add(1L); 

    // Start the timer when the first packet is received
    if (count.get() == 1) {
        triggerTimestamp = triggerContext.getCurrentProcessingTime() + 10000; // trigger at 10 seconds from reception of first event
        triggerContext.registerProcessingTimeTimer(triggerTimestamp); // Override the onProcessingTime method to trigger the window at this time
    }

    // Or trigger the window when the number of packets in the window reaches 500
    if (count.get() >= 500) {
        // Delete the timer, clear the count and fire the window   
        triggerContext.deleteProcessingTimeTimer(triggerTimestamp);
        count.clear();
        return TriggerResult.FIRE;
    }

    return TriggerResult.CONTINUE;
}
0
votes

You could also use the RocksDB state backend, but a custom Trigger will perform better.