0
votes

I'm new to Spring Integration and RabbitMQ and am stuck. I'm trying to write a program using spring integration that sends a message to one queue, then checks to see if the name property of the message exists in a hashmap: if it does exist, a "1" is added to the message, and if it doesn't "-1" is added. The message with the added number is then supposed to be sent to another queue.

Currently, the message is getting from the first queue to the second queue, but nothing is being added on because I'm getting a failed to transform message error.

I'm using a class called MessageCreator to make the original message and MessageImprover to add to the message.

Code:

package SpringMessaging;

public class MessageCreator {

private String messageJson;
private String name;
private int age;
public int count = 0;

 public String getName(){
 return name;
 }

 public int getAge(){
 return age;
  }

public void add(String name, int age){
this.name = name;
this.age = age;
   messageJson = "{" +
            "\"name\": \"" + name + "\"" +
            ", \"age\": " + age + ", \"count\": " + count +
            '}';
}

public String getJson() {
return messageJson;
    }
}


package SpringMessaging;

import java.util.HashMap;

public class MessageImprover {
    private String jsonMessage;


public MessageCreator improve (MessageCreator creator) {
    MessageCreator changed = new MessageCreator();
    changed.add(creator.getName(), creator.getAge());
     HashMap<String, Integer> table = new HashMap<String, Integer>();
            table.put("Icelantic", 8);
            table.put("Nordica", 9);
            table.put("Atomic", 10);
            table.put("Volkl", 11);
            table.put("Marker", 12);
        if(table.containsKey(creator.getName())==true){
            changed.count = 1;
      } else {
            changed.count = -1;
    }

     return changed;
    }
}

Here is the Main method:

public class Main {

public static void create(ApplicationContext context) {
    Sender sender = (Sender) context.getBean("Sender");     
    MessageCreator creator = new MessageCreator();
         creator.add("Icelantic", 25);
         sender.sendMessage(creator);

}            
public static void main(String[] args) {   
  ApplicationContext context = new ClassPathXmlApplicationContext("/SpringMessaging/ReceivingConfig.xml");
  create(context);
    }
}

Here are my xml config files:

<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:int="http://www.springframework.org/schema/integration"
   xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" 
   xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
   xsi:schemaLocation="http://www.springframework.org/schema/beans    
                        http://www.springframework.org/schema/beans/spring-beans.xsd    
                        http://www.springframework.org/schema/integration    
                        http://www.springframework.org/schema/integration/spring-integration.xsd
                        http://www.springframework.org/schema/integration/amqp
                        http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
                        http://www.springframework.org/schema/rabbit
                        http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> 

   <rabbit:connection-factory id="connectionFactory" host="bigdata-rdp" username="myuser" password="mypass" />
   <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
   <rabbit:admin connection-factory="connectionFactory" />
   <rabbit:queue name="first" auto-delete="false" durable="true" />
   <rabbit:queue name="second" auto-delete="false" durable="true" />

   <rabbit:fanout-exchange name="first-exchange" auto-delete="true" durable="true">
  <rabbit:bindings>
     <rabbit:binding queue="first" />
  </rabbit:bindings>
   </rabbit:fanout-exchange>

  <int:channel id="senderChannel">    
    </int:channel>

  <int-amqp:outbound-channel-adapter channel="senderChannel" exchange-name="first-exchange"  amqp-template="amqpTemplate" />

   <int-amqp:outbound-gateway id="outbound" request-channel="senderChannel"  exchange-name="first-exchange" amqp-template="amqpTemplate" />

   <int:gateway id="Sender" service-interface="SpringMessaging.Sender" default-request-channel="senderChannel">
       <int:method name="sendMessage" />
    </int:gateway>

  <int:poller default="true" fixed-rate="100"/>

</beans>

That config is just for sending to the first queue, and here is the other config:

<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:int="http://www.springframework.org/schema/integration"
   xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" 
   xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
   xsi:schemaLocation="http://www.springframework.org/schema/beans    
                        http://www.springframework.org/schema/beans/spring-beans.xsd    
                        http://www.springframework.org/schema/integration    
                        http://www.springframework.org/schema/integration/spring-integration.xsd
                        http://www.springframework.org/schema/integration/amqp
                        http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
                        http://www.springframework.org/schema/rabbit
                        http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

<import resource="classpath:/SpringMessaging/SendingConfig.xml" />

   <rabbit:fanout-exchange name="second-exchange" auto-delete="true" durable="true">
      <rabbit:bindings>
         <rabbit:binding queue="second" />
      </rabbit:bindings>
   </rabbit:fanout-exchange>

    <int:channel id="realreceiver">
        <int:queue />
     </int:channel>

   <int:publish-subscribe-channel id="receiverChannel"  />
        <int:bridge input-channel="receiverChannel" output-channel="realreceiver" />

    <int-amqp:outbound-channel-adapter channel="realreceiver" exchange-name="second-exchange"  amqp-template="amqpTemplate" />

   <int-amqp:inbound-gateway id="inbound" request-channel="receiverChannel"  amqp-template="amqpTemplate" queue-names="first, second" connection-factory="connectionFactory" acknowledge-mode="AUTO"  />

   <int:json-to-object-transformer input-channel="senderChannel" output-channel="receiverChannel" />

   <int:chain input-channel="receiverChannel">
      <int:json-to-object-transformer type="SpringMessaging.MessageCreator" />
        <int:service-activator method="improve">
            <bean class="SpringMessaging.MessageImprover" />
         </int:service-activator>
      <int:object-to-json-transformer content-type="text/json" />
   </int:chain>

   <int:service-activator input-channel="senderChannel" output-channel="receiverChannel" ref="MessageImprover" method = "improve" />

   <bean id="MessageImprover" class="SpringMessaging.MessageImprover" />
    <bean id="MessageCreator" class="SpringMessaging.MessageCreator" />

</beans>

The "improve" method is defined in the MessageImprover but it's not being recognized.

StackTrace:

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:915) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:825) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:97) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:97) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421) at java.lang.Thread.run(Thread.java:748) Caused by: org.springframework.integration.transformer.MessageTransformationException: failed to transform message at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44) at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:89) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:110) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:236) at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:185) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114) at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:154) at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:44) at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:74) at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:459) at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:426) at org.springframework.integration.amqp.inbound.AmqpInboundGateway.access$1100(AmqpInboundGateway.java:64) at org.springframework.integration.amqp.inbound.AmqpInboundGateway$Listener.doOnMessage(AmqpInboundGateway.java:307) at org.springframework.integration.amqp.inbound.AmqpInboundGateway$Listener.onMessage(AmqpInboundGateway.java:259) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822) ... 10 more Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input at [Source: (byte[])""; line: 1, column: 0] at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:58) at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4089) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3996) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3109) at org.springframework.integration.support.json.Jackson2JsonObjectMapper.fromJson(Jackson2JsonObjectMapper.java:88) at org.springframework.integration.support.json.Jackson2JsonObjectMapper.fromJson(Jackson2JsonObjectMapper.java:52) at org.springframework.integration.support.json.AbstractJacksonJsonObjectMapper.fromJson(AbstractJacksonJsonObjectMapper.java:56) at org.springframework.integration.json.JsonToObjectTransformer.doTransform(JsonToObjectTransformer.java:84) at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:33) ... 30 more

1
Could you share the whole stack trace? - Lucas Ross
thanks. have you made sure the Spring dependencies you're using are compatible versions? the framework seems to be trying to access an exception constructor that does not exist, and that is probably obscuring the root cause. - Lucas Ross
Not related, but you shouldn't use a queue channel as the output channel of the rabbit adapter. - Gary Russell
Thanks, I'll change that right now. - arsenal11
@LucasRoss how can I check if they're compatible? I'm very new to all this. - arsenal11

1 Answers

0
votes

??

public String improve (MessageCreator creator)

improve() takes an object of type MessageCreator which means the message payload must be a MessageCreator whereas it looks like your messages are just the JSON part of the creator sender.sendMessage(creator.getJson());