0
votes

I am trying to write a spring-cloud-stream function (spring-starter-parent 2.5.3, java 11, spring-cloud-version 2020.0.3) which has both a Kafka and Postgres transaction. The function will raise a simulated error whenever the consumed message starts with the string "fail," which I expect to cause the database transaction to roll back, then cause the kafka transaction to roll back. (I am aware that the Kafka transaction is not XA, which is fine.) So far I have not gotten the database transaction to work, but the kafka transaction does.

Currently I am using a @Transactional annotation, which does not appear to start a database transaction. (The Kafka binder documentation recommends synchronizing database + kafka transactions using the ChainedTransactionManager, but the Spring Kafka documentation states it is deprecated in favor of using the @Transactional annotation, and the S.C.S. example for this problem uses the @Transactional annotation and the default transaction manager created by the start-jpa library (I think)). I can see in my debugger that regardless of whether or not I @EnableTransactionManagement and use a @Transactional on my consumer, the consumer is executed in a kafka transaction using a transaction template higher in the stack, but I do not see a database transaction anywhere.

I have a few questions I want to understand:

  • Am I correct that the Kafka Listener Container runs my consumers in the context of a Kafka transaction regardless of whether or not I have a @Transactional annotation? And if so, is there a way to only run specific functions in a Kafka transaction?
  • Would the above change for producers, since the container doesn't have a way to intercept calls to the producers (as far as I know)?
  • What should I do to synchronize a Kafka and a database transactions so that the DB commit happens before the Kafka commit?

I have the following Crud Repository, collection of handlers, and application.yml:

@Repository
public interface AuditLogRepository extends CrudRepository<AuditLog, Long> {

  /**
   * Create a new audit log entry if and only if another with the same message does not already
   * exist. This is idempotent.
   */
  @Transactional
  @Modifying
  @Query(
      nativeQuery = true,
      value = "insert into audit_log (message) values (?1) on conflict (message) do nothing")
  void createIfNotExists(String message);
}
@Profile("ft")
@Configuration
@EnableTransactionManagement
public class FaultTolerantHandlers {

  private static final Logger LOGGER = LoggerFactory.getLogger(FaultTolerantHandlers.class);

  @Bean
  public NewTopic inputTopic() {
    return TopicBuilder.name("input").partitions(1).replicas(1).build();
  }

  @Bean
  public NewTopic inputDltTopic() {
    return TopicBuilder.name("input.DLT").partitions(1).build();
  }

  @Bean
  public NewTopic leftTopic() {
    return TopicBuilder.name("left").partitions(1).build();
  }

  @Bean
  public NewTopic rightTopic() {
    return TopicBuilder.name("right").partitions(1).build();
  }

  @Bean
  public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
    return args -> {
      LOGGER.info("Producing messages to input...");
      template.send("input", "pass-1".getBytes());
      template.send("input", "fail-1".getBytes());
      template.send("input", "pass-2".getBytes());
      template.send("input", "fail-2".getBytes());
      LOGGER.info("Produced input.");
    };
  }

  @Bean
  ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(
      BinderFactory binders) {
    return (container, dest, group) -> {
      ProducerFactory<byte[], byte[]> pf =
          ((KafkaMessageChannelBinder) binders.getBinder(null, MessageChannel.class))
              .getTransactionalProducerFactory();
      KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(requireNonNull(pf));
      container.setAfterRollbackProcessor(
          new DefaultAfterRollbackProcessor<>(
              new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L)));
    };
  }

  // Receive messages from `input`.
  // For each input, write an audit log to the database.
  // For each input, produce a message to both `left` and `right` atomically.
  // After three failed attempts to achieve the above, shuffle the message
  // off to `input.DLT` and move on.
  @Bean
  @Transactional
  public Consumer<String> persistAndSplit(
      StreamBridge bridge,
      AuditLogRepository repository
  ) {
    return input -> {
      bridge.send("left", ("left-" + input).getBytes());
      repository.createIfNotExists(input);

      if (input.startsWith("fail")) {
        throw new RuntimeException("Simulated error");
      }

      bridge.send("right", ("right-" + input).getBytes());
    };
  }

  @Bean
  public Consumer<Message<String>> logger() {
    return message -> {
      var receivedTopic = message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
      LOGGER.info("Received on topic=" + receivedTopic + " payload=" + message.getPayload());
    };
  }
}
spring:
  cloud:
    stream:
      kafka:
        binder:
          transaction:
            transaction-id-prefix: 'tx-'
          required-acks: all
      bindings:
        persistAndSplit-in-0:
          destination: input
          group: input
        logger-in-0:
          destination: left,right,input.DLT
          group: logger
          consumer:
            properties:
              isolation.level: read_committed
    function:
      definition: persistAndSplit;logger

Thank you!

1

1 Answers

1
votes
  @Bean
  @Transactional
  public Consumer<String> persistAndSplit(
      StreamBridge bridge,
      AuditLogRepository repository
  ) {

In this case, the @Transactional is on the bean definition (which is only executed once, during application initialization); to get a runtime transaction, you need the code in the lambda to be so annotated; such as ...

  @Bean
  public Consumer<String> persistAndSplit(
      StreamBridge bridge,
      AuditLogRepository repository,
      TxCode code
  ) {
    return Txcode:run;
  }
@Component
class TxCode {

    @Autowired
    AuditLogRepository repository

    @Autowired
    StreamBridge bridge;

    @Transactional
    void run(String input) {
      bridge.send("left", ("left-" + input).getBytes());
      repository.createIfNotExists(input);

      if (input.startsWith("fail")) {
        throw new RuntimeException("Simulated error");
      }

      bridge.send("right", ("right-" + input).getBytes());
    };
}

(or you can pass the bridge and repo in as well).

return str -> code.run(str, repo, bridge);