<int:service-activator input-channel="toKafka" ref="conditionalProducerService" method="producerCircuitBreaker">
<int:request-handler-advice-chain>
<ref bean="circuitBreakerAdvice1" />
</int:request-handler-advice-chain>
</int:service-activator>
<int:channel id="failedChannel2" />
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="producerContext" auto-startup="false" channel="toKafka" message-key="kafka_messageKey">
<int:poller fixed-delay="1000" error-channel="failedChannel2" />
</int-kafka:outbound-channel-adapter>
<int:chain input-channel="failedChannel2">
<int:transformer expression="'failed:' + payload.failedMessage.payload + ' with ' + payload.cause.message" />
<int-stream:stderr-channel-adapter append-newline="true"/>
</int:chain>
<bean id="circuitBreakerAdvice1" class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
public Message<?> producerCircuitBreaker(Message<?> payload) {
throw new RuntimeException("foo Pro");}
With the above configuration ,we are trying:
1.Expecting to get the failed message to propagate to the error-channel="failedChannel2" which is not happening.as I couldn't see the transformed output in the console.
2.CircuitBreaker is working for the ServiceActivator(for application related exception here as above) but how can we configure the CB for the failed case for outbound adapter. example: when connection timed out or the server is down suddenly /network connection problem/some environemnt issue before sending the message from SI channel to external(kafka ) server.Can we configure CB with outbound adapter for such situation.
As per the SI doc regarding Circuit Breaker Advice,found below.
"Typically, this Advice might be used for external services, where it might take some time to fail (such as a timeout attempting to make a network connection)".
Please suggest on how to achieve this.Many thanks.
updated config:
<int:gateway default-request-channel="toKafka" error-channel="errorChannel"
default-reply-timeout="0" />
<int:service-activator input-channel="toKafka">
<bean class="com.XXX.ProducerMessageHandler" >
<constructor-arg ref="producerContext"/>
</bean>
<int:request-handler-advice-chain>
<ref bean="circuitBreakerAdvice" />
</int:request-handler-advice-chain>
<bean id="transformerService1" class="com.XXX.KafkaTransformerTest" />
<int:transformer input-channel="errorChannel"
order="1" ref="transformerService1" method="transformFailed">
</int:transformer>
public void transformFailed(Message<?> message) {
APPLOGGER.log("transformer message test" + message);
public class ProducerMessageHandler extends KafkaProducerMessageHandler{
public ProducerMessageHandler(KafkaProducerContext kafkaProducerContext) {
super(kafkaProducerContext);
// TODO Auto-generated constructor stub
}
@Override
public void handleMessageInternal(final Message<?> message) throws Exception {
//super.handleMessageInternal(message);
throw new RuntimeException("test foo");
}
log :
01-05@23:44:18,598 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 received message: GenericMessage [payload=hello, headers={timestamp=1452017658598, id=e0591162-3b93-9bb6-0699-89b15b20e904}] DEBUG: - com.XXX.ProducerMessageHandler#0 received message: GenericMessage [payload=hello, headers={timestamp=1452017658598, id=e0591162-3b93-9bb6-0699-89b15b20e904}] got exception : org.springframework.messaging.MessageHandlingException: error occurred in message handler [com.XXX.ProducerMessageHandler#0]; nested exception is java.lang.RuntimeException: test foo 01-05@23:44:18,606 DEBUG org.springframework.integration.channel.PublishSubscribeChannel - preSend on channel 'toKafka', message: GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] 01-05@23:44:18,606 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 received message: GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] DEBUG: - com.XXX.ProducerMessageHandler#0 received message: GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] got exception : org.springframework.messaging.MessageHandlingException: error occurred in message handler [com.XXX.ProducerMessageHandler#0]; nested exception is java.lang.RuntimeException: test foo 01-05@23:44:18,606 DEBUG org.springframework.integration.channel.PublishSubscribeChannel - preSend on channel 'toKafka', message: GenericMessage [payload=hello, headers={timestamp=1452017658606, id=119afbf1-6104-feb1-eb44-f646aa932277}] 01-05@23:44:18,606 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 received message: GenericMessage [payload=hello, headers={timestamp=1452017658606, id=119afbf1-6104-feb1-eb44-f646aa932277}] got exception : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6]; nested exception is org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice$CircuitBreakerOpenException: Circuit Breaker is Open for org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 01-05@23:44:18,606 DEBUG org.springframework.integration.channel.PublishSubscribeChannel - preSend on channel 'toKafka', message: GenericMessage [payload=hello, headers={timestamp=1452017658606, id=8dafe2e0-8efe-c827-e745-1387e6045e7d}] 01-05@23:44:18,606 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 received message: GenericMessage [payload=hello, headers={timestamp=1452017658606, id=8dafe2e0-8efe-c827-e745-1387e6045e7d}] got exception : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6]; nested exception is org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice$CircuitBreakerOpenException: Circuit Breaker is Open for org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6