I am trying to write a spring-cloud-stream function (spring-starter-parent 2.5.3, java 11, spring-cloud-version 2020.0.3) which has both a Kafka and Postgres transaction. The function will raise a simulated error whenever the consumed message starts with the string "fail," which I expect to cause the database transaction to roll back, then cause the kafka transaction to roll back. (I am aware that the Kafka transaction is not XA, which is fine.) So far I have not gotten the database transaction to work, but the kafka transaction does.
Currently I am using a @Transactional
annotation, which does not appear to start a database transaction. (The Kafka binder documentation recommends synchronizing database + kafka transactions using the ChainedTransactionManager, but the Spring Kafka documentation states it is deprecated in favor of using the @Transactional
annotation, and the S.C.S. example for this problem uses the @Transactional
annotation and the default transaction manager created by the start-jpa library (I think)). I can see in my debugger that regardless of whether or not I @EnableTransactionManagement
and use a @Transactional
on my consumer, the consumer is executed in a kafka transaction using a transaction template higher in the stack, but I do not see a database transaction anywhere.
I have a few questions I want to understand:
- Am I correct that the Kafka Listener Container runs my consumers in the context of a Kafka transaction regardless of whether or not I have a
@Transactional
annotation? And if so, is there a way to only run specific functions in a Kafka transaction? - Would the above change for producers, since the container doesn't have a way to intercept calls to the producers (as far as I know)?
- What should I do to synchronize a Kafka and a database transactions so that the DB commit happens before the Kafka commit?
I have the following Crud Repository, collection of handlers, and application.yml:
@Repository
public interface AuditLogRepository extends CrudRepository<AuditLog, Long> {
/**
* Create a new audit log entry if and only if another with the same message does not already
* exist. This is idempotent.
*/
@Transactional
@Modifying
@Query(
nativeQuery = true,
value = "insert into audit_log (message) values (?1) on conflict (message) do nothing")
void createIfNotExists(String message);
}
@Profile("ft")
@Configuration
@EnableTransactionManagement
public class FaultTolerantHandlers {
private static final Logger LOGGER = LoggerFactory.getLogger(FaultTolerantHandlers.class);
@Bean
public NewTopic inputTopic() {
return TopicBuilder.name("input").partitions(1).replicas(1).build();
}
@Bean
public NewTopic inputDltTopic() {
return TopicBuilder.name("input.DLT").partitions(1).build();
}
@Bean
public NewTopic leftTopic() {
return TopicBuilder.name("left").partitions(1).build();
}
@Bean
public NewTopic rightTopic() {
return TopicBuilder.name("right").partitions(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
LOGGER.info("Producing messages to input...");
template.send("input", "pass-1".getBytes());
template.send("input", "fail-1".getBytes());
template.send("input", "pass-2".getBytes());
template.send("input", "fail-2".getBytes());
LOGGER.info("Produced input.");
};
}
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(
BinderFactory binders) {
return (container, dest, group) -> {
ProducerFactory<byte[], byte[]> pf =
((KafkaMessageChannelBinder) binders.getBinder(null, MessageChannel.class))
.getTransactionalProducerFactory();
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(requireNonNull(pf));
container.setAfterRollbackProcessor(
new DefaultAfterRollbackProcessor<>(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L)));
};
}
// Receive messages from `input`.
// For each input, write an audit log to the database.
// For each input, produce a message to both `left` and `right` atomically.
// After three failed attempts to achieve the above, shuffle the message
// off to `input.DLT` and move on.
@Bean
@Transactional
public Consumer<String> persistAndSplit(
StreamBridge bridge,
AuditLogRepository repository
) {
return input -> {
bridge.send("left", ("left-" + input).getBytes());
repository.createIfNotExists(input);
if (input.startsWith("fail")) {
throw new RuntimeException("Simulated error");
}
bridge.send("right", ("right-" + input).getBytes());
};
}
@Bean
public Consumer<Message<String>> logger() {
return message -> {
var receivedTopic = message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
LOGGER.info("Received on topic=" + receivedTopic + " payload=" + message.getPayload());
};
}
}
spring:
cloud:
stream:
kafka:
binder:
transaction:
transaction-id-prefix: 'tx-'
required-acks: all
bindings:
persistAndSplit-in-0:
destination: input
group: input
logger-in-0:
destination: left,right,input.DLT
group: logger
consumer:
properties:
isolation.level: read_committed
function:
definition: persistAndSplit;logger
Thank you!