0
votes

I'm trying to do some PoC on "exactly one delivery" concept with Apache Kafka using Spring Cloud Streams + Kafka Binding.

I installed Apache Kafka "kafka_2.11-1.0.0" and defined "transactionIdPrefix" in the producer, which I understand is the only thing I need to do to enable transactions in Spring Kafka, but when I do that and run simple Source & Sink bindings within the same application, I see some messages are received and printed in the consumer and some get an error.

For example, message #6 received:

[49] Received message [Payload String content=FromSource1 6][Headers={kafka_offset=1957, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6695c9a9, kafka_timestampType=CREATE_TIME, my-transaction-id=my-id-6, id=302cf3ef-a154-fd42-6b43-983778e275dc, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=test10, kafka_receivedTimestamp=1514384106395, timestamp=1514384106419}]

but message #7 had an error "Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION":

2017-12-27 16:15:07.405 ERROR 7731 --- [ask-scheduler-4] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@7d3bbc0b]; nested exception is org.apache.kafka.common.KafkaException: TransactionalId my-transaction-3: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION, failedMessage=GenericMessage [payload=byte[13], headers={my-transaction-id=my-id-7, id=d31656af-3286-99b0-c736-d53aa57a5e65, contentType=application/json, timestamp=1514384107399}]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:153)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:575)
  • What does this error means?
  • Is Something missing with my configuration?
  • Do I need to implement my the Source or the Sink differently when transactions is enabled?

UPDATE: I opened an issue on the project's github, please refer to the discussion there.


Couldn't find an example of how to use Spring Cloud Stream with Kafka binding + Trasanctions enabled

To reproduce, need to created a simple maven project with spring boot version "2.0.0.M5" and "spring-cloud-stream-dependencies" version "Elmhurst.M3", and to created a simple application with this configuration:

server:
  port: 8082
spring:
  kafka:
    producer:
      retries: 5555
      acks: "all"
  cloud:
    stream:
      kafka:
        binder:
          autoAddPartitions: true
          transaction:
            transactionIdPrefix: my-transaction-
      bindings:
        output1:
          destination: test10
          group: test111
          binder: kafka
        input1:
          destination: test10
          group: test111
          binder: kafka
          consumer:
            partitioned: true

I also created simple Source and Sink classes:

@EnableBinding(SampleSink.MultiInputSink.class)
public class SampleSink {

    @StreamListener(MultiInputSink.INPUT1)
    public synchronized void receive1(Message<?> message) {
        System.out.println("["+Thread.currentThread().getId()+"] Received message " + message);
    }

    public interface MultiInputSink {
        String INPUT1 = "input1";

        @Input(INPUT1)
            SubscribableChannel input1();

    }
}

and:

@EnableBinding(SampleSource.MultiOutputSource.class)
public class SampleSource {

    AtomicInteger atomicInteger = new AtomicInteger(1);

    @Bean
    @InboundChannelAdapter(value = MultiOutputSource.OUTPUT1, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
    public synchronized MessageSource<String> messageSource1() {
        return new MessageSource<String>() {
            public Message<String> receive() {
                String message = "FromSource1 "+atomicInteger.getAndIncrement();
                m.put("my-transaction-id","my-id-"+ UUID.randomUUID());
                return new GenericMessage(message, new MessageHeaders(m));
            }
        };
    }

    public interface MultiOutputSource {
        String OUTPUT1 = "output1";

        @Output(OUTPUT1)
            MessageChannel output1();

    }
}
1

1 Answers

0
votes

I opened a ticket on that to the project's github. Please refer to the answers and discussion there:

https://github.com/spring-cloud/spring-cloud-stream/issues/1166

but the first answer there was:

The binder doesn't currently support producer-initiated transactions.

Transactions are supported for processors (where the consumer starts the transaction and the producer participates in that transaction).

You should be able to use spring-kafka directly to initiate a transaction on the producer side when there is no consumer.