1
votes

i am trying to publish MQTT messages concurrently using 5 java client, such that each java client publish 1000 messages on a particular topic concurrently to a MQTT broker(HIVEMQ)

i have opened multiple threads, each threads create a mqtt client and connect to broker using ssl and try to publish 1000 messages concurrently,messages are being sent but all the connections are not getting successful to broker and i keep on getting exception

Client is not connected (32104)
at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:31)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:199)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1355)
    at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:583)
    at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:575)
    at com.test.MqttPublishSample.publishMessages(MqttPublishSample.java:122)
    at com.test.MqttPublishSample.lambda$start$0(MqttPublishSample.java:74)
    at java.base/java.lang.Thread.run(Thread.java:834)
public class MqttPublishSample {

    public static void main(String... args) throws InterruptedException {

        new MqttPublishSample().start();

    }

  public void start() throws InterruptedException {


        for(int i=0;i<5;i++){

            new Thread(()->{
                MqttClient client = null;
                try {
                    client = obtainConnection();//code to obtain connection using MqttClient
                    publishMessages(client);//code to publish message using simple for loop 

                } catch (MqttException e) {
                    e.printStackTrace();
                }

            }).start();
        }
    }
public MqttClient obtainConnection() throws MqttException {
        String clientId = "sslTestClient"+ThreadLocalRandom.current().nextInt(0,5);
        MqttClient client = null;
        try {
            client = new MqttClient("ssl://localhost:8883", clientId, new MemoryPersistence());
        } catch (MqttException e) {
            e.printStackTrace();
        }

        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName("user1");
        mqttConnectOptions.setPassword("pass1".toCharArray());
        try {
            mqttConnectOptions.setSocketFactory(getTruststoreFactory());
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println("connecting...");
        client.connect(mqttConnectOptions);
        return client;
    }

i am expecting all clients gets successfully connected to broker and publish message without the exception

1
Can you post the obtainConnection method ?Sami Tahri
updated the methodtriples13
you might be hitting the number of allowed maximum inflight messages, which is 10, you could change this by using MqttConnectOptions.setMaxInflight to a higher value. If that doesn't help, some more debug information might be useful by turning on Logging to get some more useful information out of the client: wiki.eclipse.org/Paho/Log_and_Debug_in_the_Java_clientidan

1 Answers

2
votes

It might be that you are using the same clientID over you thread, thus, the server will disconnect duplicate. As you are using LocalThreadRandom, there is a chance of collision (big enough as there are only 5 choices). You might use a unique identifier provided by generateClientId() or share a method between thread that keep trace of them.