0
votes

I want to use Kaa Notification in conjunction with Kaa Data Collection feature.

How can I implement my scenario for getting the logs?!

Scenario is:

1- Server send the notification to endpoint (using endpoint ID), then endpoint replies sending data with data collection feature.

2- Server wait a bit and check the timestamp of last appender record (I try with MongoDB log appender) for the endpoint (by endpoint ID).

3- "Add a notification listener that listens to all notifications:"

  kaaClient.addNotificationListener(new NotificationListener() {
        @Override
        public void onNotification(long id, SecurityAlert sampleNotification) {
            LOG.info("Notification from the topic with id [{}] and name [{}] received.", id, getTopic(id).getName());
            LOG.info("Notification body: {} \n", sampleNotification.getAlertMessage());
            LOG.info("Notification alert type: {} \n", sampleNotification.getAlertType());

            inputTopicIdMessage();
        }
    });

Requirement setting for logging:

1- Added Configuration schema:

{
"type": "record",
"name": "Configuration",
"namespace": "org.kaaproject.kaa.schema.sample",
"fields": [
    {
        "name": "samplingPeriod",
        "type": "int",
        "by_default": "1"
    }
]}

2- Added Log Schema:

{
"type":"record",
"name":"Data",
"namespace":"org.kaaproject.kaa.scheme.sample",
"fields":[
    {
        "name":"topicId",
        "type":"int"
    },
    {
        "name":"timeStamp",
        "type":"long"
    }
],
"displayName":"Logging scheme"

}

3- Added Log Appender with MongoDB.

After that I want to enable see logs with following commands:

db.logs_ApplicationToken.find();

Updated (2017-12-03):

I have run the following code similar to the Kaa Data Collection code for the Kaa Notification code:

    private static class MeasureSender implements Runnable {
    KaaClient kaaClient;

    MeasureSender(KaaClient kaaClient) {
        this.kaaClient = kaaClient;
    }

    @Override
    public void run() {
        sentRecordsCount.incrementAndGet();
        DataLogging record = generateTopicId();
        RecordFuture future = kaaClient.addLogRecord(record); // submit log record for sending to Kaa node
        LOG.info("Log record {} submitted for sending", record.toString());
        try {
            RecordInfo recordInfo = future.get(); // wait for log record delivery error
            BucketInfo bucketInfo = recordInfo.getBucketInfo();
            LOG.info("Received log record delivery info. Bucket Id [{}]. Record delivery time [{} ms].",
                    bucketInfo.getBucketId(), recordInfo.getRecordDeliveryTimeMs());
            confirmationsCount.incrementAndGet();
        } catch (Exception e) {
            LOG.error("Exception was caught while waiting for log's delivery report.", e);
        }
    }
}

private static DataLogging generateTopicId() {
    //TODO: Logic for get topicId
    return new DataLogging(topicId, System.currentTimeMillis());
}

After running application, When I run below command on Kaa server:

db.logs_18693008741969774929.find();

I get the result:

{ "_id" : ObjectId("5a228679ef540e07f3e73cd6"), "header" : { "endpointKeyHash" :{ "string" : "dXhbOD271Qtg9+FhxHXfrjE9bw4=" }, "applicationToken" : { "string" : "18693008741969774929" }, "headerVersion" : { "int" : 1 }, "timestamp" : { "long": NumberLong("1512212089541") }, "logSchemaVersion" : { "int" : 5 } }, "event": { "topicId" : 0, "timeStamp": 0 } }

As a result, it shows that "topicId" has not been received. Because it is equal to 0.

As you can see in the last above method private static DataLogging generateTopicId(){},
I need some logic for doing it.


Updated (2017-12-06):

public class NotificationDemo {

private static final Logger LOG = LoggerFactory.getLogger(NotificationDemo.class);
private static KaaClient kaaClient;
private static final int LOGS_DEFAULT_THRESHOLD = 1;
private static int samplePeriodInSeconds = 1;
private static volatile AtomicInteger sentRecordsCount = new AtomicInteger(0);
private static volatile AtomicInteger confirmationsCount = new AtomicInteger(0);
private JsonObjectParserImpl jsonObjectParser = new JsonObjectParserImpl();
private String StatusOfDevices = String.valueOf(jsonObjectParser.getGetStatusOfDevices());

private static ScheduledExecutorService executor;
private static ScheduledFuture<?> executorHandle;
/**
 * The list of all available notification
 * <p>
 * private static int samplePeriodInSeconds = 1;
 * private static volatile AtomicInteger sentRecordsCount = new AtomicInteger0t;
 * private static volatile AtomicInteger confirmationsCount = new AtomicInteger(0);
 * <p>
 * private static Random rand = new Random(o;pics.
 */
private static List<Topic> topics;
/**
 * Topics client subscribed
 */
private static List<Topic> subscribedTopics = new ArrayList<Topic>();

InputStreamReader inputStreamReader = new InputStreamReader(System.in);
BufferedReader keyboardInput = new BufferedReader(inputStreamReader);

public NotificationDemo() throws IOException, JSONException {
}

public static void main(String[] args) throws IOException, JSONException {

    NotificationDemo main = new NotificationDemo();
    main.config();

}

public void config() {

    LOG.info("Notification demo started");

    //kaaClient = Kaa.newClient(new DesktopKaaPlatformContext(), new SimpleKaaClientStateListener(), true);
    KaaClient kaaClient = Kaa.newClient(new DesktopKaaPlatformContext(), new SimpleKaaClientStateListener() {
        @Override
        public void onStarted() {
            LOG.info("--= Kaa client started =--");
        }

        @Override
        public void onStopped() {
            LOG.info("--= Kaa client stopped =--");
        }
    }, true);

    /*
    * Set record count strategy for uploading every log record as soon as it is created.
     */
    kaaClient.setLogUploadStrategy(new RecordCountLogUploadStrategy(LOGS_DEFAULT_THRESHOLD));
/*
 * A listener that listens to the notification topic list updates.
 */
    kaaClient.addConfigurationListener(new ConfigurationListener() {
        @Override
        public void onConfigurationUpdate(Configuration configuration) {
            LOG.info("--= Endpoint configuration was updated =--");
            displayConfiguration(configuration);

            Integer newSamplePeriod = configuration.getSamplingPeriod();
            if ((newSamplePeriod != null) && (newSamplePeriod > 0)) {
                changeMeasurementPeriod(kaaClient, newSamplePeriod);
            } else {
                LOG.warn("Sample period value (= {} in updated configuration is wrong, so ignore it.", newSamplePeriod);
            }
        }
    });
    NotificationTopicListListener topicListListener = new BasicNotificationTopicListListener();
    kaaClient.addTopicListListener(topicListListener);
    /*
     * Add a notification listener that listens to all notifications.
     */
    kaaClient.addNotificationListener(new NotificationListener() {
        @Override
        public void onNotification(long id, Notification notification) {
            LOG.info("Notification from the topic with id [{}] and name [{}] received.", id, getTopic(id).getName());
            LOG.info("Notification body: {} \n", notification.getMessage());
            String commands = (notification.getMessage());
            if (commands.equals("arm")) {
                System.out.println("The Status of Devices:" + StatusOfDevices);
            }
            inputTopicIdMessage();
        }
    });
    /*
     * Start the Kaa client and connect it to the Kaa server.
     */
    kaaClient.start();


    topics = kaaClient.getTopics();

        /*
         * List the obtained notification topics.
         */
    showTopics();

    Scanner scanner = new Scanner(System.in);
    while (scanner.hasNextLong())

    {
        long topicId = scanner.nextLong();
        if (getTopic(topicId) != null) {
            LOG.info("Subscribing to optional topic {}", topicId);
            subscribeTopic(topicId);
        } else {
            LOG.info("There is no input topic id. Please, input existing topic id.");
        }
    }
    /*
     * Stop listening to the notification topic list updates.
     */
    kaaClient.removeTopicListListener(topicListListener);

    unsubscribeOptionalTopics();

    /*
     * Stop the Kaa client and release all the resources which were in use.
     */
    kaaClient.stop();
    LOG.info("Notification demo stopped");
}

private static void changeMeasurementPeriod(KaaClient kaaClient, Integer newPeriod) {
    if (executorHandle != null) {
        executorHandle.cancel(false);
    }
    samplePeriodInSeconds = newPeriod;
    executorHandle = executor.scheduleAtFixedRate(new MeasureSender(kaaClient), 0, samplePeriodInSeconds, TimeUnit.SECONDS);
    LOG.info("Set new sample period = {} seconds.", samplePeriodInSeconds);
}

private static class MeasureSender implements Runnable {
    KaaClient kaaClient;

    MeasureSender(KaaClient kaaClient) {
        this.kaaClient = kaaClient;
    }

    @Override
    public void run() {
        sentRecordsCount.incrementAndGet();
        DataLogging record = generateTopicId();
        RecordFuture future = kaaClient.addLogRecord(record); // submit log record for sending to Kaa node
        LOG.info("Log record {} submitted for sending", record.toString());
        try {
            RecordInfo recordInfo = future.get(); // wait for log record delivery error
            BucketInfo bucketInfo = recordInfo.getBucketInfo();
            LOG.info("Received log record delivery info. Bucket Id [{}]. Record delivery time [{} ms].",
                    bucketInfo.getBucketId(), recordInfo.getRecordDeliveryTimeMs());
            confirmationsCount.incrementAndGet();
        } catch (Exception e) {
            LOG.error("Exception was caught while waiting for log's delivery report.", e);
        }
    }
}

private static DataLogging generateTopicId() {
    Integer topicId = generateTopicId().getTopicId();
    return new DataLogging(topicId, System.currentTimeMillis());
}

private static void inputTopicIdMessage() {
    LOG.info("\nPlease, type topic ID in order to subscribe to ones or type any text to exit: \n");
}

private static void displayConfiguration(org.kaaproject.kaa.schema.sample.Configuration configuration) {
    LOG.info("Configuration = {}", configuration.toString());
}

private static void showTopics() {
    if (topics == null || topics.isEmpty()) {
        LOG.info("Topic list is empty");
        return;
    }

    LOG.info("Available topics:");
    for (Topic topic : topics) {
        LOG.info("Topic id: {}, name: {}, type: {}", topic.getId(), topic.getName(), topic.getSubscriptionType());
    }

    LOG.info("Subscribed on topics:");
    for (Topic t : getOneTypeTopics(SubscriptionType.MANDATORY_SUBSCRIPTION)) {
        LOG.info("Topic id: {}, name: {}, type: {}", t.getId(), t.getName(), t.getSubscriptionType().name());
    }
    /*
     * Optional topics
     */
    if (!subscribedTopics.isEmpty()) {
        for (Topic t : subscribedTopics) {
            LOG.info("Topic id: {}, name: {}, type: {}", t.getId(), t.getName(), t.getSubscriptionType().name());
        }
    }
    inputTopicIdMessage();
}


private static List<Topic> getOneTypeTopics(SubscriptionType type) {
    List<Topic> res = new ArrayList<>();
    for (Topic t : NotificationDemo.topics) {
        if (t.getSubscriptionType() == type) {
            res.add(t);
        }
    }
    return res;
}

private static void subscribeTopic(long topicId) {
    try {
        subscribedTopics.add(getTopic(topicId));
        kaaClient.subscribeToTopic(topicId, true);
    } catch (UnavailableTopicException e) {
        e.printStackTrace();
    }
    inputTopicIdMessage();
}

private static Topic getTopic(long id) {
    for (Topic t : topics)
        if (t.getId() == id)
            return t;
    return null;
}

private static void sleepForSeconds(int seconds) {
    try {
        TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

private static void unsubscribeOptionalTopics() {
    List<Topic> topics = getOneTypeTopics(SubscriptionType.OPTIONAL_SUBSCRIPTION);

    for (Topic t : subscribedTopics) {
        try {
            kaaClient.unsubscribeFromTopic(t.getId());
        } catch (UnavailableTopicException e) {
            // if not subscribe
        }
    }
}


private static void waitForAnyInput() {
    try {
        System.in.read();
    } catch (IOException e) {
        LOG.warn("Error happens when waiting for user input.", e);
    }
}

/**
 * A listener that tracks the notification topic list updates
 * and subscribes the Kaa client to every new topic available.
 */
private static class BasicNotificationTopicListListener implements NotificationTopicListListener {
    @Override
    public void onListUpdated(List<Topic> list) {
        LOG.info("Topic list was updated:");
        topics.clear();
        topics.addAll(list);

        showTopics();
    }
}

}

1
I need more info to help you. What's your notification schema? What's SecurityAlert? What topicId do you expect? Could you publish the code of your application? (It's hard to grasp the whole flow from a couple of snippets.)Alexey Shmalko
I've changed the code. I just to try for the understanding relationship between Notification and Data Collection (Logging). 1. Notification schema is: { "type":"record", "name":"ExampleNotification", "namespace":"org.kaaproject.kaa.schema.sample.notification", "fields":[ { "name":"message", "type":"string" } ] } 2. About "topicId"; I want to collect data. Do you have any good suggestion for doing it? 3. I published the code. (refer to the new update.)M. Asiyaban

1 Answers

1
votes

I'll try to answer though I am not completely sure. (It is completely unclear what you're trying to achieve.)

First of all, I must say that data logging and notifications are completely orthogonal features. That means they do not interact in any way.

As a result, it shows that "topicId" has not been received. Because it is equal to 0.

This is likely caused by your application not setting topicId at all. (e.g., calling new DataLogging() without arguments.)

  1. About "topicId"; I want to collect data. Do you have any good suggestion for doing it?

From Kaa perspective, this topicId is just an integer field and the server does not really care—it'll save in mongo whatever you send. If your goal is to collect any data, just put a random integer in there.

private static DataLogging generateTopicId() {
    Integer topicId = 42;
    return new DataLogging(topicId, System.currentTimeMillis());
}