I want to use Kaa Notification in conjunction with Kaa Data Collection feature.
How can I implement my scenario for getting the logs?!
Scenario is:
1- Server send the notification to endpoint (using endpoint ID), then endpoint replies sending data with data collection feature.
2- Server wait a bit and check the timestamp of last appender record (I try with MongoDB log appender) for the endpoint (by endpoint ID).
3- "Add a notification listener that listens to all notifications:"
kaaClient.addNotificationListener(new NotificationListener() {
public void onNotification(long id, SecurityAlert sampleNotification) {
LOG.info("Notification from the topic with id [{}] and name [{}] received.", id, getTopic(id).getName());
LOG.info("Notification body: {} \n", sampleNotification.getAlertMessage());
LOG.info("Notification alert type: {} \n", sampleNotification.getAlertType());
Requirement setting for logging:
1- Added Configuration schema:
"type": "record",
"name": "Configuration",
"namespace": "org.kaaproject.kaa.schema.sample",
"fields": [
"name": "samplingPeriod",
"type": "int",
"by_default": "1"
2- Added Log Schema:
"displayName":"Logging scheme"
3- Added Log Appender with MongoDB.
After that I want to enable see logs with following commands:
Updated (2017-12-03):
I have run the following code similar to the Kaa Data Collection code for the Kaa Notification code:
private static class MeasureSender implements Runnable {
KaaClient kaaClient;
MeasureSender(KaaClient kaaClient) {
this.kaaClient = kaaClient;
public void run() {
DataLogging record = generateTopicId();
RecordFuture future = kaaClient.addLogRecord(record); // submit log record for sending to Kaa node
LOG.info("Log record {} submitted for sending", record.toString());
try {
RecordInfo recordInfo = future.get(); // wait for log record delivery error
BucketInfo bucketInfo = recordInfo.getBucketInfo();
LOG.info("Received log record delivery info. Bucket Id [{}]. Record delivery time [{} ms].",
bucketInfo.getBucketId(), recordInfo.getRecordDeliveryTimeMs());
} catch (Exception e) {
LOG.error("Exception was caught while waiting for log's delivery report.", e);
private static DataLogging generateTopicId() {
//TODO: Logic for get topicId
return new DataLogging(topicId, System.currentTimeMillis());
After running application, When I run below command on Kaa server:
I get the result:
{ "_id" : ObjectId("5a228679ef540e07f3e73cd6"), "header" : { "endpointKeyHash" :{ "string" : "dXhbOD271Qtg9+FhxHXfrjE9bw4=" }, "applicationToken" : { "string" : "18693008741969774929" }, "headerVersion" : { "int" : 1 }, "timestamp" : { "long": NumberLong("1512212089541") }, "logSchemaVersion" : { "int" : 5 } }, "event": { "topicId" : 0, "timeStamp": 0 } }
As a result, it shows that "topicId" has not been received. Because it is equal to 0.
As you can see in the last above method private static DataLogging generateTopicId(){}
I need some logic for doing it.
Updated (2017-12-06):
public class NotificationDemo {
private static final Logger LOG = LoggerFactory.getLogger(NotificationDemo.class);
private static KaaClient kaaClient;
private static final int LOGS_DEFAULT_THRESHOLD = 1;
private static int samplePeriodInSeconds = 1;
private static volatile AtomicInteger sentRecordsCount = new AtomicInteger(0);
private static volatile AtomicInteger confirmationsCount = new AtomicInteger(0);
private JsonObjectParserImpl jsonObjectParser = new JsonObjectParserImpl();
private String StatusOfDevices = String.valueOf(jsonObjectParser.getGetStatusOfDevices());
private static ScheduledExecutorService executor;
private static ScheduledFuture<?> executorHandle;
* The list of all available notification
* <p>
* private static int samplePeriodInSeconds = 1;
* private static volatile AtomicInteger sentRecordsCount = new AtomicInteger0t;
* private static volatile AtomicInteger confirmationsCount = new AtomicInteger(0);
* <p>
* private static Random rand = new Random(o;pics.
private static List<Topic> topics;
* Topics client subscribed
private static List<Topic> subscribedTopics = new ArrayList<Topic>();
InputStreamReader inputStreamReader = new InputStreamReader(System.in);
BufferedReader keyboardInput = new BufferedReader(inputStreamReader);
public NotificationDemo() throws IOException, JSONException {
public static void main(String[] args) throws IOException, JSONException {
NotificationDemo main = new NotificationDemo();
public void config() {
LOG.info("Notification demo started");
//kaaClient = Kaa.newClient(new DesktopKaaPlatformContext(), new SimpleKaaClientStateListener(), true);
KaaClient kaaClient = Kaa.newClient(new DesktopKaaPlatformContext(), new SimpleKaaClientStateListener() {
public void onStarted() {
LOG.info("--= Kaa client started =--");
public void onStopped() {
LOG.info("--= Kaa client stopped =--");
}, true);
* Set record count strategy for uploading every log record as soon as it is created.
kaaClient.setLogUploadStrategy(new RecordCountLogUploadStrategy(LOGS_DEFAULT_THRESHOLD));
* A listener that listens to the notification topic list updates.
kaaClient.addConfigurationListener(new ConfigurationListener() {
public void onConfigurationUpdate(Configuration configuration) {
LOG.info("--= Endpoint configuration was updated =--");
Integer newSamplePeriod = configuration.getSamplingPeriod();
if ((newSamplePeriod != null) && (newSamplePeriod > 0)) {
changeMeasurementPeriod(kaaClient, newSamplePeriod);
} else {
LOG.warn("Sample period value (= {} in updated configuration is wrong, so ignore it.", newSamplePeriod);
NotificationTopicListListener topicListListener = new BasicNotificationTopicListListener();
* Add a notification listener that listens to all notifications.
kaaClient.addNotificationListener(new NotificationListener() {
public void onNotification(long id, Notification notification) {
LOG.info("Notification from the topic with id [{}] and name [{}] received.", id, getTopic(id).getName());
LOG.info("Notification body: {} \n", notification.getMessage());
String commands = (notification.getMessage());
if (commands.equals("arm")) {
System.out.println("The Status of Devices:" + StatusOfDevices);
* Start the Kaa client and connect it to the Kaa server.
topics = kaaClient.getTopics();
* List the obtained notification topics.
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLong())
long topicId = scanner.nextLong();
if (getTopic(topicId) != null) {
LOG.info("Subscribing to optional topic {}", topicId);
} else {
LOG.info("There is no input topic id. Please, input existing topic id.");
* Stop listening to the notification topic list updates.
* Stop the Kaa client and release all the resources which were in use.
LOG.info("Notification demo stopped");
private static void changeMeasurementPeriod(KaaClient kaaClient, Integer newPeriod) {
if (executorHandle != null) {
samplePeriodInSeconds = newPeriod;
executorHandle = executor.scheduleAtFixedRate(new MeasureSender(kaaClient), 0, samplePeriodInSeconds, TimeUnit.SECONDS);
LOG.info("Set new sample period = {} seconds.", samplePeriodInSeconds);
private static class MeasureSender implements Runnable {
KaaClient kaaClient;
MeasureSender(KaaClient kaaClient) {
this.kaaClient = kaaClient;
public void run() {
DataLogging record = generateTopicId();
RecordFuture future = kaaClient.addLogRecord(record); // submit log record for sending to Kaa node
LOG.info("Log record {} submitted for sending", record.toString());
try {
RecordInfo recordInfo = future.get(); // wait for log record delivery error
BucketInfo bucketInfo = recordInfo.getBucketInfo();
LOG.info("Received log record delivery info. Bucket Id [{}]. Record delivery time [{} ms].",
bucketInfo.getBucketId(), recordInfo.getRecordDeliveryTimeMs());
} catch (Exception e) {
LOG.error("Exception was caught while waiting for log's delivery report.", e);
private static DataLogging generateTopicId() {
Integer topicId = generateTopicId().getTopicId();
return new DataLogging(topicId, System.currentTimeMillis());
private static void inputTopicIdMessage() {
LOG.info("\nPlease, type topic ID in order to subscribe to ones or type any text to exit: \n");
private static void displayConfiguration(org.kaaproject.kaa.schema.sample.Configuration configuration) {
LOG.info("Configuration = {}", configuration.toString());
private static void showTopics() {
if (topics == null || topics.isEmpty()) {
LOG.info("Topic list is empty");
LOG.info("Available topics:");
for (Topic topic : topics) {
LOG.info("Topic id: {}, name: {}, type: {}", topic.getId(), topic.getName(), topic.getSubscriptionType());
LOG.info("Subscribed on topics:");
for (Topic t : getOneTypeTopics(SubscriptionType.MANDATORY_SUBSCRIPTION)) {
LOG.info("Topic id: {}, name: {}, type: {}", t.getId(), t.getName(), t.getSubscriptionType().name());
* Optional topics
if (!subscribedTopics.isEmpty()) {
for (Topic t : subscribedTopics) {
LOG.info("Topic id: {}, name: {}, type: {}", t.getId(), t.getName(), t.getSubscriptionType().name());
private static List<Topic> getOneTypeTopics(SubscriptionType type) {
List<Topic> res = new ArrayList<>();
for (Topic t : NotificationDemo.topics) {
if (t.getSubscriptionType() == type) {
return res;
private static void subscribeTopic(long topicId) {
try {
kaaClient.subscribeToTopic(topicId, true);
} catch (UnavailableTopicException e) {
private static Topic getTopic(long id) {
for (Topic t : topics)
if (t.getId() == id)
return t;
return null;
private static void sleepForSeconds(int seconds) {
try {
} catch (InterruptedException e) {
private static void unsubscribeOptionalTopics() {
List<Topic> topics = getOneTypeTopics(SubscriptionType.OPTIONAL_SUBSCRIPTION);
for (Topic t : subscribedTopics) {
try {
} catch (UnavailableTopicException e) {
// if not subscribe
private static void waitForAnyInput() {
try {
} catch (IOException e) {
LOG.warn("Error happens when waiting for user input.", e);
* A listener that tracks the notification topic list updates
* and subscribes the Kaa client to every new topic available.
private static class BasicNotificationTopicListListener implements NotificationTopicListListener {
public void onListUpdated(List<Topic> list) {
LOG.info("Topic list was updated:");
? WhattopicId
do you expect? Could you publish the code of your application? (It's hard to grasp the whole flow from a couple of snippets.) – Alexey Shmalko