3
votes

I had a Turbine and Hystrix setup working, but decided to change it over to Turbine AMQP so I could aggregate multiple services into one stream/dashboard.

I have set up a Turbine AMQP server running on localhost:8989, but it doesn't appear to be getting Hystrix data from the client service. When I hit the Turbine server's IP in my browser, I see data: {"type":"Ping"} repeatedly, even while I am polling the URL of the Hystrix. If I attempt to show the Turbine AMQP stream in the Hystrix Dashboard, I get: Unable to connect to Command Metric Stream.

I have a default install of RabbitMQ running on port 5672.

My client service using Hystrix-AMQP has a application.yml file that looks like so:

spring:
  application:
    name: policy-service
eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/
spring:
  rabbitmq:
    addresses: ${vcap.services.${PREFIX:}rabbitmq.credentials.uri:amqp://${RABBITMQ_HOST:localhost}:${RABBITMQ_PORT:5672}}

The tail end of the startup log looks like this:

2015-09-14 16:31:13.030  INFO 52844 --- [           main] com.netflix.discovery.DiscoveryClient    : Starting heartbeat executor: renew interval is: 30
2015-09-14 16:31:13.047  INFO 52844 --- [           main] c.n.e.EurekaDiscoveryClientConfiguration : Registering application policy-service with eureka with status UP
2015-09-14 16:31:13.194  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2015-09-14 16:31:13.195  INFO 52844 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'policy-service:8088.errorChannel' has 1 subscriber(s).
2015-09-14 16:31:13.195  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started _org.springframework.integration.errorLogger
2015-09-14 16:31:13.195  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {filter} as a subscriber to the 'cloudBusOutboundFlow.channel#0' channel
2015-09-14 16:31:13.195  INFO 52844 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'policy-    service:8088.cloudBusOutboundFlow.channel#0' has 1 subscriber(s).
2015-09-14 16:31:13.195  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.config.ConsumerEndpointFactoryBean#0
2015-09-14 16:31:13.195  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {filter} as a subscriber to the 'cloudBusInboundChannel' channel
2015-09-14 16:31:13.195  INFO 52844 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'policy-service:8088.cloudBusInboundChannel' has 1 subscriber(s).
2015-09-14 16:31:13.196  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.config.ConsumerEndpointFactoryBean#1
2015-09-14 16:31:13.196  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {message-handler} as a subscriber to the 'cloudBusInboundFlow.channel#0' channel
2015-09-14 16:31:13.196  INFO 52844 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'policy-service:8088.cloudBusInboundFlow.channel#0' has 1 subscriber(s).
2015-09-14 16:31:13.196  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.config.ConsumerEndpointFactoryBean#2
2015-09-14 16:31:13.196  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter} as a subscriber to the 'cloudBusWiretapChannel' channel
2015-09-14 16:31:13.196  INFO 52844 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'policy-service:8088.cloudBusWiretapChannel' has 1 subscriber(s).
2015-09-14 16:31:13.197  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.config.ConsumerEndpointFactoryBean#3
2015-09-14 16:31:13.197  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {amqp:outbound-channel-adapter} as a subscriber to the 'cloudBusOutboundChannel' channel
2015-09-14 16:31:13.197  INFO 52844 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'policy-service:8088.cloudBusOutboundChannel' has 1 subscriber(s).
2015-09-14 16:31:13.197  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.config.ConsumerEndpointFactoryBean#4
2015-09-14 16:31:13.198  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {bridge} as a subscriber to the 'cloudBusAmqpInboundFlow.channel#0' channel
2015-09-14 16:31:13.198  INFO 52844 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'policy-service:8088.cloudBusAmqpInboundFlow.channel#0' has 1 subscriber(s).
2015-09-14 16:31:13.198  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.config.ConsumerEndpointFactoryBean#5
2015-09-14 16:31:13.198  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {amqp:outbound-channel-adapter} as a subscriber to the 'hystrixStream' channel
2015-09-14 16:31:13.199  INFO 52844 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'policy-service:8088.hystrixStream' has 1 subscriber(s).
2015-09-14 16:31:13.199  INFO 52844 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.config.ConsumerEndpointFactoryBean#6
2015-09-14 16:31:13.219  INFO 52844 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 1073741823
2015-09-14 16:31:13.219  INFO 52844 --- [           main] ApplicationEventListeningMessageProducer : started org.springframework.integration.event.inbound.ApplicationEventListeningMessageProducer#0
2015-09-14 16:31:13.555  INFO 52844 --- [cTaskExecutor-1] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (4640c1c8-ff8f-45d7-8426-19d1b7a4cdb0) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2015-09-14 16:31:13.572  INFO 52844 --- [           main] o.s.i.a.i.AmqpInboundChannelAdapter      : started org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter#0
2015-09-14 16:31:13.573  INFO 52844 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2015-09-14 16:31:13.576  INFO 52844 --- [           main] c.n.h.c.m.e.HystrixMetricsPoller         : Starting HystrixMetricsPoller
2015-09-14 16:31:13.609  INFO 52844 --- [           main] ration$HystrixMetricsPollerConfiguration : Starting poller
2015-09-14 16:31:13.803  INFO 52844 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8088 (http)
2015-09-14 16:31:13.805  INFO 52844 --- [           main] com.ml.springboot.PolicyService     : Started PolicyService in 22.544 seconds (JVM running for 23.564)

So it looks like PolicyService successfully connects to the message broker.

The Turbine AMQP server's end of log:

2015-09-14 16:58:05.887  INFO 51944 --- [           main] i.reactivex.netty.server.AbstractServer  : Rx server started at port: 8989
2015-09-14 16:58:05.991  INFO 51944 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2015-09-14 16:58:05.991  INFO 51944 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'bootstrap:-1.errorChannel' has 1 subscriber(s).
2015-09-14 16:58:05.991  INFO 51944 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started _org.springframework.integration.errorLogger
2015-09-14 16:58:05.991  INFO 51944 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {bridge} as a subscriber to the 'hystrixStreamAggregatorInboundFlow.channel#0' channel
2015-09-14 16:58:05.991  INFO 51944 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'bootstrap:-1.hystrixStreamAggregatorInboundFlow.channel#0' has 1 subscriber(s).
2015-09-14 16:58:05.991  INFO 51944 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.config.ConsumerEndpointFactoryBean#0
2015-09-14 16:58:05.991  INFO 51944 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 1073741823
2015-09-14 16:58:06.238  INFO 51944 --- [cTaskExecutor-1] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (spring.cloud.hystrix.stream) durable:false, auto-delete:false, exclusive:false. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2015-09-14 16:58:06.289  INFO 51944 --- [           main] o.s.i.a.i.AmqpInboundChannelAdapter      : started org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter#0
2015-09-14 16:58:06.290  INFO 51944 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2015-09-14 16:58:06.434  INFO 51944 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): -1 (http)

Any ideas why the Turbine AMQP server is not receiving communication from the Hystrix AMQP client?

EDIT: Turbine-AMQP main looks like:

package com.turbine.amqp;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.turbine.amqp.EnableTurbineAmqp;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableAutoConfiguration
@EnableTurbineAmqp
@EnableDiscoveryClient
public class TurbineAmqpApplication {

    public static void main(String[] args) {
        SpringApplication.run(TurbineAmqpApplication.class, args);
    }
}

Here's its application.yml:

server:
  port: 8989
spring:
  rabbitmq:
addresses: ${vcap.services.${PREFIX:}rabbitmq.credentials.uri:amqp://${RABBITMQ_HOST:localhost}:${RABBITMQ_PORT:5672}}

Hitting http://localhost:8989/turbine.stream produces a repeating stream of data: {"type":"Ping"}

and shows this in console:

2015-09-15 08:54:37.960  INFO 83480 --- [o-eventloop-3-1] o.s.c.n.t.amqp.TurbineAmqpConfiguration  : SSE Request Received
2015-09-15 08:54:38.025  INFO 83480 --- [o-eventloop-3-1] o.s.c.n.t.amqp.TurbineAmqpConfiguration  : Starting aggregation

EDIT: The below exception is thrown when I stop listening to the turbine stream, not when I try to listen with the dashboard.

2015-09-15 08:56:47.934  INFO 83480 --- [o-eventloop-3-3] o.s.c.n.t.amqp.TurbineAmqpConfiguration  : SSE Request Received
2015-09-15 08:56:47.946  WARN 83480 --- [o-eventloop-3-3] io.netty.channel.DefaultChannelPipeline  : An exception was thrown by a user handler's exceptionCaught() method while handling the following exception:

java.lang.NoSuchMethodError: rx.Observable.collect(Lrx/functions/Func0;Lrx/functions/Action2;)Lrx/Observable;
    at com.netflix.turbine.aggregator.StreamAggregator.lambda$null$36(StreamAggregator.java:89)
    at rx.internal.operators.OnSubscribeMulticastSelector.call(OnSubscribeMulticastSelector.java:60)
    at rx.internal.operators.OnSubscribeMulticastSelector.call(OnSubscribeMulticastSelector.java:40)
    at rx.Observable.unsafeSubscribe(Observable.java:8591)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMerge.java:190)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:160)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:96)
    at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
    at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy.java:173)
    at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
    at rx.subjects.PublishSubject.onNext(PublishSubject.java:101)
    at org.springframework.cloud.netflix.turbine.amqp.Aggregator.handle(Aggregator.java:53)
    at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:112)
    at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:102)
    at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:49)
    at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:342)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:131)
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:330)
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:164)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:276)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:142)
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:75)
    at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:71)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:277)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:239)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:248)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:171)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:119)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:277)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:239)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:101)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$400(AmqpInboundChannelAdapter.java:45)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$1.onMessage(AmqpInboundChannelAdapter.java:93)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:82)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:167)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:989)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:82)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103)
    at java.lang.Thread.run(Thread.java:745)
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: GroupedObservable.class
    at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:98)
    at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:56)
    ... 58 common frames omitted

My dependencies for turbine-amqp are as follows:

dependencies {
    compile('org.springframework.cloud:spring-cloud-starter-turbine-amqp:1.0.3.RELEASE')
    compile 'org.springframework.boot:spring-boot-starter-web:1.2.5.RELEASE'
    compile 'org.springframework.boot:spring-boot-starter-actuator:1.2.5.RELEASE'

    testCompile("org.springframework.boot:spring-boot-starter-test") 
}

dependencyManagement {
    imports { 
         mavenBom 'org.springframework.cloud:spring-cloud-starter-parent:1.0.2.RELEASE'
    }
}
2
No. Did you exercise the circuit breaker? What does the config & main of turbine-amqp app look like? Can you hit <turbine-amqp app>/turbine.stream? What url did you put in the hystrix dashboard? I don't have enough details.spencergibb
Thanks for taking the time to look at this, I've added the details you asked for, but I'm not sure what you mean by exercising the circuit breaker.lp1776
Somewhere in the client app, you've created a hystrix command (a circuit breaker). If that code path hasn't been reached, all you get are pings.spencergibb
Oh! Yes, I am hitting the HystrixCommand multiple times and still just seeing pings come across on http://localhost:8989/turbine.streamlp1776
I am able to see that messages are being passed in the Rabbit MQ management console.lp1776

2 Answers

1
votes

It is so hard to find a solution.

Using Spring cloud 2.1.4.RELEASE I faced with similar problem.

The main cause is the incompatibility [exchanges] name in rabbitMQ between: spring-cloud-netflix-hystrix-stream and spring-cloud-starter-netflix-turbine-stream.

So solve it: See the name created exchange name when you start the service componente {the same that declare hystrix-stream}

on the componente that declare {turbine-stream} update the property

turbine.stream.destination=

in my case turbine.stream.destination=hystrixStreamOutput

0
votes

I faced with similar problem and I find a solution.

My Spring Cloud version is 2.1.0.RELEASE

The solution:

  1. add property
spring.cloud.stream.bindings.turbineStreamInput.destination: hystrixStreamOutput
turbine.stream.enabled: false
  1. add auto configuration
@EnableBinding(TurbineStreamClient.class)
public class TurbineStreamAutoConfiguration {

    @Autowired
    private BindingServiceProperties bindings;

    @Autowired
    private TurbineStreamProperties properties;

    @PostConstruct
    public void init() {
        BindingProperties inputBinding = this.bindings.getBindings()
                .get(TurbineStreamClient.INPUT);
        if (inputBinding == null) {
            this.bindings.getBindings().put(TurbineStreamClient.INPUT,
                    new BindingProperties());
        }
        BindingProperties input = this.bindings.getBindings()
                .get(TurbineStreamClient.INPUT);
        if (input.getDestination() == null) {
            input.setDestination(this.properties.getDestination());
        }
        if (input.getContentType() == null) {
            input.setContentType(this.properties.getContentType());
        }
    }

    @Bean
    public HystrixStreamAggregator hystrixStreamAggregator(ObjectMapper mapper,
            PublishSubject<Map<String, Object>> publisher) {
        return new HystrixStreamAggregator(mapper, publisher);
    }

}