1
votes
<int:service-activator input-channel="toKafka"  ref="conditionalProducerService" method="producerCircuitBreaker">

  <int:request-handler-advice-chain>
       <ref bean="circuitBreakerAdvice1" />
   </int:request-handler-advice-chain>
            </int:service-activator>

  <int:channel id="failedChannel2" />
  <int-kafka:outbound-channel-adapter
                            id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="producerContext" auto-startup="false" channel="toKafka" message-key="kafka_messageKey">
                            <int:poller fixed-delay="1000" error-channel="failedChannel2" />
            </int-kafka:outbound-channel-adapter>


      <int:chain input-channel="failedChannel2">
        <int:transformer expression="'failed:' + payload.failedMessage.payload + ' with ' + payload.cause.message" />
                            <int-stream:stderr-channel-adapter append-newline="true"/>
            </int:chain>

            <bean id="circuitBreakerAdvice1" class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
                            <property name="threshold" value="2" />                                         
                            <property name="halfOpenAfter" value="12000" />                     
            </bean>

  public Message<?> producerCircuitBreaker(Message<?> payload) {
          throw  new RuntimeException("foo Pro");}

With the above configuration ,we are trying:

1.Expecting to get the failed message to propagate to the error-channel="failedChannel2" which is not happening.as I couldn't see the transformed output in the console.

2.CircuitBreaker is working for the ServiceActivator(for application related exception here as above) but how can we configure the CB for the failed case for outbound adapter. example: when connection timed out or the server is down suddenly /network connection problem/some environemnt issue before sending the message from SI channel to external(kafka ) server.Can we configure CB with outbound adapter for such situation.

As per the SI doc regarding Circuit Breaker Advice,found below.

"Typically, this Advice might be used for external services, where it might take some time to fail (such as a timeout attempting to make a network connection)".

Please suggest on how to achieve this.Many thanks.

updated config:

        <int:gateway default-request-channel="toKafka" error-channel="errorChannel"
default-reply-timeout="0" />

 <int:service-activator input-channel="toKafka">
<bean class="com.XXX.ProducerMessageHandler" >
 <constructor-arg ref="producerContext"/>
</bean>
     <int:request-handler-advice-chain>
                                     <ref bean="circuitBreakerAdvice" />
                       </int:request-handler-advice-chain>

<bean id="transformerService1" class="com.XXX.KafkaTransformerTest" />

 <int:transformer input-channel="errorChannel"
                              order="1" ref="transformerService1" method="transformFailed">

                       </int:transformer>  

 public void transformFailed(Message<?> message) {
          APPLOGGER.log("transformer message test" + message);


 public class ProducerMessageHandler extends KafkaProducerMessageHandler{

            public ProducerMessageHandler(KafkaProducerContext kafkaProducerContext) {
                            super(kafkaProducerContext);
                            // TODO Auto-generated constructor stub
            }

            @Override
            public void handleMessageInternal(final Message<?> message) throws Exception {

                            //super.handleMessageInternal(message);
                            throw new RuntimeException("test foo");
            }

log :

01-05@23:44:18,598 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 received message: GenericMessage [payload=hello, headers={timestamp=1452017658598, id=e0591162-3b93-9bb6-0699-89b15b20e904}] DEBUG: - com.XXX.ProducerMessageHandler#0 received message: GenericMessage [payload=hello, headers={timestamp=1452017658598, id=e0591162-3b93-9bb6-0699-89b15b20e904}] got exception : org.springframework.messaging.MessageHandlingException: error occurred in message handler [com.XXX.ProducerMessageHandler#0]; nested exception is java.lang.RuntimeException: test foo 01-05@23:44:18,606 DEBUG org.springframework.integration.channel.PublishSubscribeChannel - preSend on channel 'toKafka', message: GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] 01-05@23:44:18,606 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 received message: GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] DEBUG: - com.XXX.ProducerMessageHandler#0 received message: GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] got exception : org.springframework.messaging.MessageHandlingException: error occurred in message handler [com.XXX.ProducerMessageHandler#0]; nested exception is java.lang.RuntimeException: test foo 01-05@23:44:18,606 DEBUG org.springframework.integration.channel.PublishSubscribeChannel - preSend on channel 'toKafka', message: GenericMessage [payload=hello, headers={timestamp=1452017658606, id=119afbf1-6104-feb1-eb44-f646aa932277}] 01-05@23:44:18,606 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 received message: GenericMessage [payload=hello, headers={timestamp=1452017658606, id=119afbf1-6104-feb1-eb44-f646aa932277}] got exception : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6]; nested exception is org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice$CircuitBreakerOpenException: Circuit Breaker is Open for org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 01-05@23:44:18,606 DEBUG org.springframework.integration.channel.PublishSubscribeChannel - preSend on channel 'toKafka', message: GenericMessage [payload=hello, headers={timestamp=1452017658606, id=8dafe2e0-8efe-c827-e745-1387e6045e7d}] 01-05@23:44:18,606 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 received message: GenericMessage [payload=hello, headers={timestamp=1452017658606, id=8dafe2e0-8efe-c827-e745-1387e6045e7d}] got exception : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6]; nested exception is org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice$CircuitBreakerOpenException: Circuit Breaker is Open for org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6

2

2 Answers

1
votes

The advice only applies to the endpoint it is assigned to, not the downstream flow; unfortunately, the kafka schema doesn't permit it to be applied to the outbound channel adapter. I have created a JIRA issue for that.

A work-around would be to configure the KafkaProducerMessageHandler as a <bean/> and ref it from a <service-activator/>. Then you can apply your circuit breaker.

Another work-around would be to use an in-flow gateway...

<int:service-activator ... ref="gw">
    <int:request-handler-advice-chain ...

</int:service-activator>

<int:gateway id="gw" default-request-channel="toKafka" 
         default-reply-timeout="0"
         error-channel="..." ... />

I am not sure why you're not seeing a message in the error channel; usually, turning on DEBUG logging will help debug this kind of thing.

EDIT

I just tested with this and it works just fine...

<int:gateway default-request-channel="toKafka" error-channel="errorChannel"
    default-reply-timeout="0" />

<int:service-activator input-channel="toKafka">
    <bean class="com.example.Foo" />
    <int:request-handler-advice-chain>
        <bean class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
            <property name="threshold" value="2"/>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

EDIT2

If you are not using a gateway, you can handle it with a queue channel and a poller. This works fine for me too...

<int:channel id="toKafka">
    <int:queue />
</int:channel>

<int:service-activator input-channel="toKafka">
    <bean class="com.example.Foo" />
    <int:poller error-channel="errorChannel" fixed-delay="1000" />
    <int:request-handler-advice-chain>
        <bean class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
            <property name="threshold" value="2"/>
            <property name="halfOpenAfter" value="12000"/>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

Or, you can add a mid-flow gateway.

0
votes

copied from chat for future reference :

Sam : Hi Gary

    for(int i=0;i<4;i++){ 
    try{ 
   toKafka.send(MessageBuilder 
   .withPayload("hello"). 
     build()); 
     }catch(Exception e){ 
     System.out.println("got exception : " + e); } }

this is how I am sending message

Gary : So you are sending directly to the channel - you should use a MessagingGateway instead. Sam:Hi Gary. thanks.it is working with Gateway.

configuring CB with KafkaProducerMessageHandler is fine but it covers for any failure comes under below method

public void handleMessageInternal(final Message message) throws Exception

but I want to cover the issue with network errors as well like invalid broker list/server down which it is not covering and i am getting the exception in the console like this:

log

   12-24@16:46:46,250 DEBUGspringframework.integration.kafka.outbound.KafkaProducerMessageHandler - org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0 received message: GenericMessage [payload=TestVo[data=sample message]], headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@44286963, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@44286963, kafka_topic=tried_in, kafka_partitionId=2, id=7b596368-0aee-ddaa-2168-dc403e22c38f, timestamp=1450955805294}] 
   12-24@16:55:12,630 WARN apache.kafka.common.network.Selector - Error in I/O with /1.2.0.3 
   java.net.ConnectException: Connection refused: no further information 
   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
   at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) 
   at org.apache.kafka.common.network.Selector.poll(Selector.java:238) 
   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) 
   at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) 
   at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) 
   at java.lang.Thread.run(Unknown Source)

want CB to get invoked in this case as well.

Gary : The connection exception should occur within the handleMessageInternal() If the exception is not thrown, it's a bug. I'll take a look.

The Future is discarded in handleMessageInternal - I'll open a JIRA issue.

https://jira.spring.io/browse/INTEXT-218

Sam : ok.is it going to cover the case when kafka server is down for some reason?

Gary : yes; but you might want to reduce the timeout from the default (60s)