2
votes

I have 2 applications, the first app starts a ActiveMQ broker ( https://spring.io/guides/gs/messaging-jms/ ).

At the second app I want to subcribe a topic from the first app.

How can I do this without starting a ActiveMQ Server?


Possible solution:

Server Application Project

import java.time.LocalDateTime;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.ui.ModelMap;

@SpringBootApplication
@EnableJms
@EnableScheduling
public class JsmServerApplication {

    @Autowired
    JmsTemplate jmsTemplate;

    @Bean
    public BrokerService broker() throws Exception {
        BrokerService ret = new BrokerService();
        ret.addConnector("tcp://0.0.0.0:4444"); // allow remote connections
        ret.setBrokerName("primary-broker");
        ret.setUseJmx(true);
        return ret;
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory("tcp://localhost:4444");
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jacksonJmsMessageConverter());
        return factory;
    }

    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

    public static void main(String[] args) {
        SpringApplication.run(JsmServerApplication.class, args);
    }

    @Scheduled(cron = "*/5 * * * * ?")
    public void run() {
        ModelMap msg = new ModelMap("now", LocalDateTime.now().toString());
        System.out.println("Sending: " + msg);
        jmsTemplate.convertAndSend("messages", msg);
    }

}

Client Application Project

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import org.springframework.ui.ModelMap;

@SpringBootApplication
@EnableJms
public class JsmClientApplication {

    @Bean
    public ConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory("tcp://localhost:4444");
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jacksonJmsMessageConverter());
        return factory;
    }

    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

    @JmsListener(destination = "messages", containerFactory = "jmsListenerContainerFactory")
    public void msg(ModelMap msg) {
        System.out.println(msg);
    }

    public static void main(String[] args) {
        SpringApplication.run(JsmClientApplication.class, args);
    }

}

Is it a correct approch?


Solved with this:

http://javasampleapproach.com/java-integration/activemq-work-spring-jms-activemq-topic-publisher-subcribers-pattern-using-springboot

2

2 Answers

0
votes

See this answer for how to listen on a tcp port instead of the vm:// transport.

0
votes

You can use the MessageConsumer to consume the data like the code below

public static void main(String[] args) throws JMSException {
    // Getting JMS connection from the server
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
    Connection connection = connectionFactory.createConnection();
    connection.start();

    Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
    Topic topic = session.createTopic("topic");
    MessageConsumer consumer = session.createConsumer(topic);

    MessageListener listner = new MessageListener() {
        public void onMessage(Message message) {
            try {
                //do operations
           } catch (JMSException e) {

           }
         }
    };

    consumer.setMessageListener(listner);
    connection.close();

}

Since you are using the ActiveMQConnectionFactory, you can set the broker as below

BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:4444);
broker.setPersistent(false);

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");

If you do not have any restrictions in not using ActiveMQ, You can use Kafka for doing the same. Kafka provides you a highly scalable and distributed Message Bus with simple API.

https://kafka.apache.org/quickstart

I am not sure about the constraints but I just wanted to give you a feel of Kafka. However, the above code should help you in understanding the concept of subscribing and consuming messages from a topic.