2
votes

I am settings up a flink stream processor using kafka and elasticsearch. I want to replay my data, but when i set the parallelism to more than 1, It does not finish the program I believe this to be because that only one message is seen by the kafka stream to be identified as the end of the stream.


    public CustomSchema(Date _endTime) {
        endTime = _endTime;
    }

@Override
    public boolean isEndOfStream(CustomTopicWrapper nextElement) {
        if (this.endTime != null && nextElement.messageTime.getTime() >= this.endTime.getTime()) {
            return true;
        }
        return false;
    }

is there a way to tell all threads on the flink consumer group to end once one thread has completed?

1

1 Answers

0
votes

if you implemented your own SourceFunction use the cancel method like this example shows from the Flink SourceFunction. the class FlinkKafkaConsumerBase also has the cancel method.