I'm trying to create a PoC application in Java to figure out how to do transaction management in Spring Cloud Stream when using Kafka for message publishing. The use case I'm trying to simulate is a processor that receives a message. It then does some processing and generates two new messages destined to two separate topics. I want to be able to handle publishing both messages as a single transaction. So, if publishing the second message fails I want to roll (not commit) the first message. Does Spring Cloud Stream support such a use case?
I've set the @Transactional
annotation and I can see a global transaction starting before the message is delivered to the consumer. However, when I try to publish a message via the MessageChannel.send()
method I can see that a new local transaction is started and completed in the KafkaProducerMessageHandler
class' handleRequestMessage()
method. Which means that the sending of the message does not participate in the global transaction. So, if there's an exception thrown after the publishing of the first message, the message will not be rolled back. The global transaction gets rolled back but that doesn't do anything really since the first message was already committed.
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
transaction:
transaction-id-prefix: txn.
producer: # these apply to all producers that participate in the transaction
partition-key-extractor-name: partitionKeyExtractorStrategy
partition-selector-name: partitionSelectorStrategy
partition-count: 3
configuration:
acks: all
enable:
idempotence: true
retries: 10
bindings:
input-customer-data-change-topic:
consumer:
configuration:
isolation:
level: read_committed
enable-dlq: true
bindings:
input-customer-data-change-topic:
content-type: application/json
destination: com.fis.customer
group: com.fis.ec
consumer:
partitioned: true
max-attempts: 1
output-name-change-topic:
content-type: application/json
destination: com.fis.customer.name
output-email-change-topic:
content-type: application/json
destination: com.fis.customer.email
@SpringBootApplication
@EnableBinding(CustomerDataChangeStreams.class)
public class KafkaCloudStreamCustomerDemoApplication
{
public static void main(final String[] args)
{
SpringApplication.run(KafkaCloudStreamCustomerDemoApplication.class, args);
}
}
public interface CustomerDataChangeStreams
{
@Input("input-customer-data-change-topic")
SubscribableChannel inputCustomerDataChange();
@Output("output-email-change-topic")
MessageChannel outputEmailDataChange();
@Output("output-name-change-topic")
MessageChannel outputNameDataChange();
}
@Component
public class CustomerDataChangeListener
{
@Autowired
private CustomerDataChangeProcessor mService;
@StreamListener("input-customer-data-change-topic")
public Message<String> handleCustomerDataChangeMessages(
@Payload final ImmutableCustomerDetails customerDetails)
{
return mService.processMessage(customerDetails);
}
}
@Component
public class CustomerDataChangeProcessor
{
private final CustomerDataChangeStreams mStreams;
@Value("${spring.cloud.stream.bindings.output-email-change-topic.destination}")
private String mEmailChangeTopic;
@Value("${spring.cloud.stream.bindings.output-name-change-topic.destination}")
private String mNameChangeTopic;
public CustomerDataChangeProcessor(final CustomerDataChangeStreams streams)
{
mStreams = streams;
}
public void processMessage(final CustomerDetails customerDetails)
{
try
{
sendNameMessage(customerDetails);
sendEmailMessage(customerDetails);
}
catch (final JSONException ex)
{
LOGGER.error("Failed to send messages.", ex);
}
}
public void sendNameMessage(final CustomerDetails customerDetails)
throws JSONException
{
final JSONObject nameChangeDetails = new JSONObject();
nameChangeDetails.put(KafkaConst.BANK_ID_KEY, customerDetails.bankId());
nameChangeDetails.put(KafkaConst.CUSTOMER_ID_KEY, customerDetails.customerId());
nameChangeDetails.put(KafkaConst.FIRST_NAME_KEY, customerDetails.firstName());
nameChangeDetails.put(KafkaConst.LAST_NAME_KEY, customerDetails.lastName());
final String action = customerDetails.action();
nameChangeDetails.put(KafkaConst.ACTION_KEY, action);
final MessageChannel nameChangeMessageChannel = mStreams.outputNameDataChange();
emailChangeMessageChannel.send(MessageBuilder.withPayload(nameChangeDetails.toString())
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.setHeader(KafkaHeaders.TOPIC, mNameChangeTopic).build());
if ("fail_name_illegal".equalsIgnoreCase(action))
{
throw new IllegalArgumentException("Customer name failure!");
}
}
public void sendEmailMessage(final CustomerDetails customerDetails) throws JSONException
{
final JSONObject emailChangeDetails = new JSONObject();
emailChangeDetails.put(KafkaConst.BANK_ID_KEY, customerDetails.bankId());
emailChangeDetails.put(KafkaConst.CUSTOMER_ID_KEY, customerDetails.customerId());
emailChangeDetails.put(KafkaConst.EMAIL_ADDRESS_KEY, customerDetails.email());
final String action = customerDetails.action();
emailChangeDetails.put(KafkaConst.ACTION_KEY, action);
final MessageChannel emailChangeMessageChannel = mStreams.outputEmailDataChange();
emailChangeMessageChannel.send(MessageBuilder.withPayload(emailChangeDetails.toString())
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.setHeader(KafkaHeaders.TOPIC, mEmailChangeTopic).build());
if ("fail_email_illegal".equalsIgnoreCase(action))
{
throw new IllegalArgumentException("E-mail address failure!");
}
}
}
EDIT
We are getting closer. The local transaction does not get created anymore. However, the global transaction still gets committed even if there was an exception. From what I can tell the exception does not propagate to the TransactionTemplate.execute()
method. Therefore, the transaction gets committed. It seems like that the MessageProducerSupport
class in the sendMessage()
method "swallows" the exception in the catch clause. If there's an error channel defined then a message is published to it and thus the exception is not rethrown. I tried turning the error channel off (spring.cloud.stream.kafka.binder.transaction.producer.error-channel-enabled = false
) but that doesn't turn it off. So, just for a test I simply set the error channel to null in the debugger to force the exception to be rethrown. That seems to do it. However, the original message keeps getting redelivered to the initial consumer even though I have the max-attempts
set to 1 for that consumer.