0
votes

I am trying to use Apache Qpid through Spring Boot application using Jms Qpid client. I am able to configure it but when I am trying to receive message from the queue, the logger is printing:

Dispatcher(918480905)Received a message(878303980)[1] from queue 1 )without a handler - rejecting(requeue)...

Here is my code:

JmsConfiguration.java

@Configuration
public class JmsConfiguration {

    @Primary
    @Bean
    public Context createContext()
    {
        Properties properties=new Properties();
        System.setProperty("IMMEDIATE_PREFETCH", "true");

        Context context=null;
        try {
            properties.load(this.getClass().getResourceAsStream("application.properties"));
            context = new InitialContext(properties);
        } catch (NamingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return context;
    }

    @Primary
    @Bean
    public ConnectionFactory createConnectionFactory(Context context)
    {
        ConnectionFactory connectionFactory=null;
        try {
            connectionFactory = (ConnectionFactory) context.lookup("qpidConnectionFactory");
        } catch (NamingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return connectionFactory;
    }

    @Primary
    @Bean
    public Connection jmsConnection(ConnectionFactory connectionFactory) throws Exception
    {
        Connection connection = connectionFactory.createConnection();
        connection.start();
        return connection;
    }

    @Primary
    @Bean
    public Queue jmsQueue(Context context) throws Exception
    {
        Queue queue = (Queue) context.lookup("myqueue");
        return queue;
    }
}

application.properties

java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory
connectionfactory.qpidConnectionFactory = amqp://guest:guest@clientid/?brokerlist='tcp://localhost:5672?maxprefetch='0''
queue.myqueue = queue1

ScheduledTask.java It just run send and receive messages in intervals.

@Component
public class ScheduledTasks
{
    Connection connection;
    Queue queue;

    @Autowired
    public ScheduledTasks(Connection connection, Queue queue) {
        this.connection=connection;
        this.queue=queue;
    }

    MessageListener messageListener = new MessageListener() {
        @Override
        public void onMessage(Message message) {
            System.out.println("Received id is------>");
            System.out.println(message);
        }
    };

    @Scheduled(fixedDelay = 2000)
    public void sendMessage() throws Exception
    {
        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
        Message message=session.createTextMessage();
        MessageProducer messageProducer=session.createProducer(queue);
        message.setStringProperty("value", "BOOM");
        messageProducer.send(message);
        session.commit();
        messageProducer.close();
        //connection.close();
        System.out.println("---------------Message Sent");
    }

    //@JmsListener(destination="queue1")
    @Scheduled(initialDelay=5000, fixedDelay = 5000)
    public void receiveMessage() throws Exception
    {
        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
        MessageConsumer messageConsumer = session.createConsumer(queue);
//        if(messageConsumer.getMessageListener()==null)
//          messageConsumer.setMessageListener(messageListener);
        Message message = messageConsumer.receive(3000);
        if(message!=null)
            System.out.println("----------------->"+message.getStringProperty("value"));
        session.commit();
        messageConsumer.close();
        //connection.close();
        System.out.println("--------------->Got Message");
    }
}
1

1 Answers

0
votes

You create an instance implementing MessageListener but you don't do anything with it.

In Spring you should use DefaultMessageListenerContainer or SimpleMessageListenerContainer from spring-jms and create it as a Spring Bean in the JmsConfiguration class. After setting connection details (ConnectionFactory, Queue, sessionTransacted etc.) you also need to set the JMS MessageListener implementing class.