1
votes

I am using apache activemq with spring boot and I want to migrate to apache artemis to improve usage for cluster and nodes.

At the moment I am using mainly the concept of VirtualTopics and with JMS like

@JMSListener(destination = "Consumer.A.VirtualTopic.simple")
public void receiveMessage() {
    ...
}

...

public void send(JMSTemplate template) {
    template.convertAndSend("VirtualTopic.simple", "Hello world!");
}

I have read, that artemis changed it's address model to addresses, queues and routing types instead of queues, topics and virtual topics like in activemq. I have read a lot more, but I think I don't get it right, how I can migrate now. I tried it the same way like above, so I imported Artemis JMSClient from Maven and wanted to use it like before, but with FQQN (Fully Qualified Queue Name) or the VirtualTopic-Wildcard you can read on some sources. But somehow it does not work properly.

My Questions are: - How can I migrate VirtualTopics? Did I get it right with FQQN and those VirtualTopics-Wildcards? - How can I specify the routingtypes anycast and multicast for the code examples above? (In the online examples addresses and queues are hardcoded in the server broker.xml, but I want to create it on the fly of the application.) - How can I use it with openwire protocol and how does the application know what it uses? Does it only depend on the port I am using of artemis? So 61616 for openwire?

Can anyone help in clarifying my thoughts?

UPDATE:

Some further questions.

1) I always read something like "a default 5.x consumer". Is it expected then to get mixed with artemis? Like you leave all of those naming conventions and just add the addresses to the VirtualTopic name to a FQQN, and just change dependecies to artemis?

2) I've already tried the "virtualTopicConsumerWildcards" with "import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;" and "import org.apache.activemq.ActiveMQConnectionFactory;", but only in the second case it made a difference.

3) I also tried to only use OpenWire as protocol in the acceptor, but in this case (and with "import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;") I get following error when starting my application: "2020-03-30 11:41:19,504 ERROR [org.apache.activemq.artemis.core.server] AMQ224096: Error setting up connection from /127.0.0.1:54201 to /127.0.0.1:61616; protocol CORE not found in map: [OPENWIRE]".

4) Do I put i.e. multicast:://VirtualTopic.simple this as destination name in template.convertAndSend(...)? I tried template.setPubSubDomain(true) for multicast routing type and left it for anycast, this works. But is it a good way?

5) Do you maybe know, how I can "tell" my spring-boot-application with template.convertAndSend(...); to use Openwire?

UPDATE2: Shared durable subscriptions

@JmsListener(destination = "VirtualTopic.test", id = "c1", subscription = "Consumer.A.VirtualTopic.test", containerFactory = "queueConnectionFactory")
public void receive1(String m) {

}

@JmsListener(destination = "VirtualTopic.test", id = "c2", subscription = "Consumer.B.VirtualTopic.test", containerFactory = "queueConnectionFactory")
public void receive2(String m) {

}

@Bean
public DefaultJmsListenerContainerFactory queueConnectionFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setClientId("brokerClientId");
    factory.setSubscriptionDurable(true);
    factory.setSubscriptionShared(true);
    return factory;
}

Errors:

2020-04-17 11:23:44.485  WARN 7900 --- [enerContainer-3] o.s.j.l.DefaultMessageListenerContainer  : Setup of JMS message listener invoker failed for destination 'VirtualTopic.test' - trying to recover. Cause: org.apache.activemq.ActiveMQSession.createSharedDurableConsumer(Ljavax/jms/Topic;Ljava/lang/String;Ljava/lang/String;)Ljavax/jms/MessageConsumer; 
2020-04-17 11:23:44.514 ERROR 7900 --- [enerContainer-3] o.s.j.l.DefaultMessageListenerContainer  : Could not refresh JMS Connection for destination 'VirtualTopic.test' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: Broker: d1 - Client: brokerClientId already connected from /127.0.0.1:59979

What am I doing wrong here?

1
If you want to use OpenWire just use the OpenWire client libraries in your application instead of the Artemis core client libraries.Justin Bertram
This should be "import org.apache.activemq.ActiveMQConnectionFactory", which uses per default Openwire. But with "org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory" messages are sent per CORE somehow.Galadriel
If you use org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory messages will certainly be sent using the core protocol because that's the core JMS client's javax.jms.ConnectionFactory implementation.Justin Bertram
I see, thank you. So do you @JustinBertram have an idea, why the virtualTopicConsumerWildcard is not applied, when I am using it with @JmsListener and template.convertAndSend(...) like in my post above?Galadriel

1 Answers

3
votes

The idea behind virtual topics is that producers send to a topic in the usual JMS way and s consumer can consume from a physical queue for a logical topic subscription, allowing many consumers to be running on many machines & threads to load balance the load.

Artemis uses a queue per topic subscriber model internally and it is possibly to directly address the subscription queue using its Fully Qualified Queue name (FQQN).

For example, a default 5.x consumer destination for topic VirtualTopic.simple subscription A Consumer.A.VirtualTopic.simple would be replaced with an Artemis FQQN comprised of the address and queue VirtualTopic.simple::Consumer.A.VirtualTopic.simple.

However Artemis supports a virtual topic wildcard filter mechanism that will automatically convert the consumer destination into the corresponding FQQN. To enable filter mechanism the configuration string property virtualTopicConsumerWildcards could be used. It has has two parts separated by a ;, ie the default 5.x virtual topic with consumer prefix of Consumer.*., would require a virtualTopicConsumerWildcards filter of Consumer.*.>;2.

Artemis is configured by default to auto-create destinations requested by clients. They can specify a special prefix when connecting to an address to indicate which kind of routing type to use. They can be enabled by adding the configuration string property anycastPrefix and multicastPrefix to an acceptor, you can find more details at Using Prefixes to Determine Routing Type. For example adding to the acceptor anycastPrefix=anycast://;multicastPrefix=multicast://, if the client needs to send a message to only one of the ANYCAST queues should use the destination anycast:://VirtualTopic.simple, if the client needs to send a message to the MULTICAST should use the destination multicast:://VirtualTopic.simple.

Artemis acceptors support using a single port for all protocols, they will automatically detect which protocol is being used CORE, AMQP, STOMP or OPENWIRE, but it is possible to limit which protocols are supported by using the protocols parameter.

The following acceptor enables the anycast prefix anycast://, the multicast prefix multicast:// and the virtual topic consumer wildcards, disabling all protocols except OPENWIRE on the endpoint localhost:61616.

<acceptor name="artemis">tcp://localhost:61616?anycastPrefix=anycast://;multicastPrefix=multicast://;virtualTopicConsumerWildcards=Consumer.*.%3E%3B2;protocols=OPENWIRE</acceptor>

UPDATE: The following example application connects to an Artemis instance with the previous acceptor using the OpenWire protocol.

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

@SpringBootApplication
@EnableJms
public class Application {

   private final String BROKER_URL = "tcp://localhost:61616";
   private final String BROKER_USERNAME = "admin";
   private final String BROKER_PASSWORD = "admin";

   public static void main(String[] args) throws Exception {
      final ConfigurableApplicationContext context = SpringApplication.run(Application.class);
      System.out.println("********************* Sending message...");

      JmsTemplate jmsTemplate = context.getBean("jmsTemplate", JmsTemplate.class);
      JmsTemplate jmsTemplateAnycast = context.getBean("jmsTemplateAnycast", JmsTemplate.class);
      JmsTemplate jmsTemplateMulticast = context.getBean("jmsTemplateMulticast", JmsTemplate.class);

      jmsTemplateAnycast.convertAndSend("VirtualTopic.simple", "Hello world anycast!");
      jmsTemplate.convertAndSend("anycast://VirtualTopic.simple", "Hello world anycast using prefix!");
      jmsTemplateMulticast.convertAndSend("VirtualTopic.simple", "Hello world multicast!");
      jmsTemplate.convertAndSend("multicast://VirtualTopic.simple", "Hello world multicast using prefix!");

      System.out.print("Press any key to close the context");
      System.in.read();

      context.close();
   }

   @Bean
   public ActiveMQConnectionFactory connectionFactory(){
      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
      connectionFactory.setBrokerURL(BROKER_URL);
      connectionFactory.setUserName(BROKER_USERNAME);
      connectionFactory.setPassword(BROKER_PASSWORD);
      return connectionFactory;
   }

   @Bean
   public JmsTemplate jmsTemplate(){
      JmsTemplate template = new JmsTemplate();
      template.setConnectionFactory(connectionFactory());
      return template;
   }

   @Bean
   public JmsTemplate jmsTemplateAnycast(){
      JmsTemplate template = new JmsTemplate();
      template.setPubSubDomain(false);
      template.setConnectionFactory(connectionFactory());
      return template;
   }

   @Bean
   public JmsTemplate jmsTemplateMulticast(){
      JmsTemplate template = new JmsTemplate();
      template.setPubSubDomain(true);
      template.setConnectionFactory(connectionFactory());
      return template;
   }

   @Bean
   public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
      DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
      factory.setConnectionFactory(connectionFactory());
      factory.setConcurrency("1-1");
      return factory;
   }

   @JmsListener(destination = "Consumer.A.VirtualTopic.simple")
   public void receiveMessageFromA(String message) {
      System.out.println("*********************** MESSAGE RECEIVED FROM A: " + message);
   }

   @JmsListener(destination = "Consumer.B.VirtualTopic.simple")
   public void receiveMessageFromB(String message) {
      System.out.println("*********************** MESSAGE RECEIVED FROM B: " + message);
   }

   @JmsListener(destination = "VirtualTopic.simple")
   public void receiveMessageFromTopic(String message) {
      System.out.println("*********************** MESSAGE RECEIVED FROM TOPIC: " + message);
   }
}