1
votes

When i using kafka channel and kafka sink together in a same time in flume, after some minutes i get this exception

java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
    at com.google.common.base.Preconditions.checkState(Preconditions.java:172)
    at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
    at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:255)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
    at java.lang.Thread.run(Thread.java:748)

This exception throws after that some events are delivered successfully.

If i use memory channel instead of kafka channel or loggerSink instead of kafka sink everything works good. even if i use one kafka cluster for kafka channel and another kafka cluster for kafka sink, everything works good.

Flume version is: 1.7.0
Kafka version is: 0.10.2.0

1

1 Answers

0
votes

I used flume and kafka on a single machine.
I change flume-channel topic partitions from 4 to 1 and the problem is fixed.