0
votes

I am using mqtt android client to stream locations. This works fine until when the internet connection is lost. The mqtt client does not reconnect as expected. What should I do to make sure the paho mqtt reconnects to the broker. I want to force the mqtt client to reconnect to the broker and continue to publish locations when a connection returns.

Below is my code.

public class MqttClientHelperService2 extends Service {
    private MqttAndroidClient mqttAndroidClient;
    BroadcastReceiver broadcastReceiver;
    private final String SERVER_URL = "tcp://000.102.110.**:1883";
    private final String CLIENT_ID = "client_id";
    private final String MQTT_TOPIC = "livelocations/local";
    LiveLocation liveLocation;

    @Override
    public void onCreate() {
        super.onCreate();
        Thread newThread = new Thread(){
            public void run(){
                init();
            }
        };

        newThread.start();

        if (Build.VERSION.SDK_INT >= 26) {
            String CHANNEL_ID = "my_channel_01";
            NotificationChannel channel = new NotificationChannel(CHANNEL_ID,
                    "Channel human readable title",
                    NotificationManager.IMPORTANCE_LOW);

            ((NotificationManager) Objects.requireNonNull(getSystemService(Context.NOTIFICATION_SERVICE))).
                    createNotificationChannel(channel);
            Notification notification = new NotificationCompat.Builder(this, CHANNEL_ID)
                    .setContentTitle("")
                    .setContentText("")
                    .setSmallIcon(R.mipmap.ic_launcher).build();

            startForeground(1, notification);
        }
    }

    private void init() {
        mqttAndroidClient = new MqttAndroidClient(getApplicationContext(), SERVER_URL, CLIENT_ID);
        mqttAndroidClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {

            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {

            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {

            }


        });
    }


    @Override
    public IBinder onBind(Intent intent) {
        return null;
    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        liveLocation= new LiveLocation(
                intent.getIntExtra("driver_id", 0),
                intent.getDoubleExtra("latitude", 0),
                intent.getDoubleExtra("longitude", 0),
                "");
        connectMqtt();
        return START_STICKY;
    }


    private MqttConnectOptions getMqttOptions(){
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setCleanSession(false);
        return mqttConnectOptions;
    }

    private void connectMqtt() {
        try {
           IMqttToken iMqttToken =  mqttAndroidClient.connect(getMqttOptions());
           iMqttToken.setActionCallback(new IMqttActionListener() {
               @Override
               public void onSuccess(IMqttToken asyncActionToken) {
                    Log.e("phanuel-log", "connecting ......." + Calendar.getInstance().getTime());
                   publishToServer(liveLocation);
               }

               @Override
               public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.e("phanuel-log-error", "connection error");
               }
           });
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void onDestroy() {
        if(mqttAndroidClient != null){
            try {
                mqttAndroidClient.unregisterResources();
                mqttAndroidClient.close();
                mqttAndroidClient.disconnect();
            } catch (Exception e){
                e.printStackTrace();
            }
        }
        unregisterReceiver(broadcastReceiver);
        broadcastReceiver = null;
        super.onDestroy();
    }

    private boolean isNetworkAvailable() {
        ConnectivityManager connectivityManager
                = (ConnectivityManager) getSystemService(Context.CONNECTIVITY_SERVICE);
        assert  connectivityManager != null;
        NetworkInfo activeNetworkInfo = connectivityManager.getActiveNetworkInfo();
        return activeNetworkInfo != null && activeNetworkInfo.isConnectedOrConnecting();
    }

    void publishToServer(final LiveLocation location) {
        try {
            if (location.getDriverId() > 0){
                mqttAndroidClient.connect(getMqttOptions(), null, new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {
                        JSONObject locationJsonObject = new JSONObject();
                        try {
                            locationJsonObject.put("driverId", location.getDriverId());
                            locationJsonObject.put("driverLatitude", location.getLatitude());
                            locationJsonObject.put("driverLongitude", location.getLongitude());
                            locationJsonObject.put("driverTimeStamp", Calendar.getInstance().getTime());
                        } catch (JSONException e) {
                            e.printStackTrace();
                        }

                        MqttMessage mqttMessage = new MqttMessage();
                        mqttMessage.setPayload(locationJsonObject.toString().getBytes());
                        mqttMessage.setQos(0);
                        mqttMessage.setRetained(false);

                        try {
                            mqttAndroidClient.publish(MQTT_TOPIC, mqttMessage);
                        }catch (MqttException e){
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                        Log.e("mqtt-client", "Failed to connect");
                        Log.e("mqtt-cleint", exception.toString());
                    }
                });
            }
        } catch (MqttException e){
            e.printStackTrace();
        }
    }
}
1

1 Answers

0
votes

There are multiple ways to handle your scenario.

  1. If you are using mqtt only to publish from android side then you can call init() function (connect to mqtt server) before publishing in mqtt. After message published you can disconnect from mqtt server.
  2. If you always need alive mqtt connection then whenever mqtt connection is broken then connectionLost method is called. You can reconnect at this method.
  3. You can have thread keep checking whether mqtt connection is alive and you can reconnect if disconnected