I want to use Kaa Notification in conjunction with Kaa Data Collection feature.
How can I implement my scenario for getting the logs?!
Scenario is:
1- Server send the notification to endpoint (using endpoint ID), then endpoint replies sending data with data collection feature.
2- Server wait a bit and check the timestamp of last appender record (I try with MongoDB log appender) for the endpoint (by endpoint ID).
3- "Add a notification listener that listens to all notifications:"
kaaClient.addNotificationListener(new NotificationListener() {
@Override
public void onNotification(long id, SecurityAlert sampleNotification) {
LOG.info("Notification from the topic with id [{}] and name [{}] received.", id, getTopic(id).getName());
LOG.info("Notification body: {} \n", sampleNotification.getAlertMessage());
LOG.info("Notification alert type: {} \n", sampleNotification.getAlertType());
inputTopicIdMessage();
}
});
Requirement setting for logging:
1- Added Configuration schema:
{
"type": "record",
"name": "Configuration",
"namespace": "org.kaaproject.kaa.schema.sample",
"fields": [
{
"name": "samplingPeriod",
"type": "int",
"by_default": "1"
}
]}
2- Added Log Schema:
{
"type":"record",
"name":"Data",
"namespace":"org.kaaproject.kaa.scheme.sample",
"fields":[
{
"name":"topicId",
"type":"int"
},
{
"name":"timeStamp",
"type":"long"
}
],
"displayName":"Logging scheme"
}
3- Added Log Appender with MongoDB.
After that I want to enable see logs with following commands:
db.logs_ApplicationToken.find();
Updated (2017-12-03):
I have run the following code similar to the Kaa Data Collection code for the Kaa Notification code:
private static class MeasureSender implements Runnable {
KaaClient kaaClient;
MeasureSender(KaaClient kaaClient) {
this.kaaClient = kaaClient;
}
@Override
public void run() {
sentRecordsCount.incrementAndGet();
DataLogging record = generateTopicId();
RecordFuture future = kaaClient.addLogRecord(record); // submit log record for sending to Kaa node
LOG.info("Log record {} submitted for sending", record.toString());
try {
RecordInfo recordInfo = future.get(); // wait for log record delivery error
BucketInfo bucketInfo = recordInfo.getBucketInfo();
LOG.info("Received log record delivery info. Bucket Id [{}]. Record delivery time [{} ms].",
bucketInfo.getBucketId(), recordInfo.getRecordDeliveryTimeMs());
confirmationsCount.incrementAndGet();
} catch (Exception e) {
LOG.error("Exception was caught while waiting for log's delivery report.", e);
}
}
}
private static DataLogging generateTopicId() {
//TODO: Logic for get topicId
return new DataLogging(topicId, System.currentTimeMillis());
}
After running application, When I run below command on Kaa server:
db.logs_18693008741969774929.find();
I get the result:
{ "_id" : ObjectId("5a228679ef540e07f3e73cd6"), "header" : { "endpointKeyHash" :{ "string" : "dXhbOD271Qtg9+FhxHXfrjE9bw4=" }, "applicationToken" : { "string" : "18693008741969774929" }, "headerVersion" : { "int" : 1 }, "timestamp" : { "long": NumberLong("1512212089541") }, "logSchemaVersion" : { "int" : 5 } }, "event": { "topicId" : 0, "timeStamp": 0 } }
As a result, it shows that "topicId" has not been received. Because it is equal to 0.
As you can see in the last above method private static DataLogging generateTopicId(){}
,
I need some logic for doing it.
Updated (2017-12-06):
public class NotificationDemo {
private static final Logger LOG = LoggerFactory.getLogger(NotificationDemo.class);
private static KaaClient kaaClient;
private static final int LOGS_DEFAULT_THRESHOLD = 1;
private static int samplePeriodInSeconds = 1;
private static volatile AtomicInteger sentRecordsCount = new AtomicInteger(0);
private static volatile AtomicInteger confirmationsCount = new AtomicInteger(0);
private JsonObjectParserImpl jsonObjectParser = new JsonObjectParserImpl();
private String StatusOfDevices = String.valueOf(jsonObjectParser.getGetStatusOfDevices());
private static ScheduledExecutorService executor;
private static ScheduledFuture<?> executorHandle;
/**
* The list of all available notification
* <p>
* private static int samplePeriodInSeconds = 1;
* private static volatile AtomicInteger sentRecordsCount = new AtomicInteger0t;
* private static volatile AtomicInteger confirmationsCount = new AtomicInteger(0);
* <p>
* private static Random rand = new Random(o;pics.
*/
private static List<Topic> topics;
/**
* Topics client subscribed
*/
private static List<Topic> subscribedTopics = new ArrayList<Topic>();
InputStreamReader inputStreamReader = new InputStreamReader(System.in);
BufferedReader keyboardInput = new BufferedReader(inputStreamReader);
public NotificationDemo() throws IOException, JSONException {
}
public static void main(String[] args) throws IOException, JSONException {
NotificationDemo main = new NotificationDemo();
main.config();
}
public void config() {
LOG.info("Notification demo started");
//kaaClient = Kaa.newClient(new DesktopKaaPlatformContext(), new SimpleKaaClientStateListener(), true);
KaaClient kaaClient = Kaa.newClient(new DesktopKaaPlatformContext(), new SimpleKaaClientStateListener() {
@Override
public void onStarted() {
LOG.info("--= Kaa client started =--");
}
@Override
public void onStopped() {
LOG.info("--= Kaa client stopped =--");
}
}, true);
/*
* Set record count strategy for uploading every log record as soon as it is created.
*/
kaaClient.setLogUploadStrategy(new RecordCountLogUploadStrategy(LOGS_DEFAULT_THRESHOLD));
/*
* A listener that listens to the notification topic list updates.
*/
kaaClient.addConfigurationListener(new ConfigurationListener() {
@Override
public void onConfigurationUpdate(Configuration configuration) {
LOG.info("--= Endpoint configuration was updated =--");
displayConfiguration(configuration);
Integer newSamplePeriod = configuration.getSamplingPeriod();
if ((newSamplePeriod != null) && (newSamplePeriod > 0)) {
changeMeasurementPeriod(kaaClient, newSamplePeriod);
} else {
LOG.warn("Sample period value (= {} in updated configuration is wrong, so ignore it.", newSamplePeriod);
}
}
});
NotificationTopicListListener topicListListener = new BasicNotificationTopicListListener();
kaaClient.addTopicListListener(topicListListener);
/*
* Add a notification listener that listens to all notifications.
*/
kaaClient.addNotificationListener(new NotificationListener() {
@Override
public void onNotification(long id, Notification notification) {
LOG.info("Notification from the topic with id [{}] and name [{}] received.", id, getTopic(id).getName());
LOG.info("Notification body: {} \n", notification.getMessage());
String commands = (notification.getMessage());
if (commands.equals("arm")) {
System.out.println("The Status of Devices:" + StatusOfDevices);
}
inputTopicIdMessage();
}
});
/*
* Start the Kaa client and connect it to the Kaa server.
*/
kaaClient.start();
topics = kaaClient.getTopics();
/*
* List the obtained notification topics.
*/
showTopics();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLong())
{
long topicId = scanner.nextLong();
if (getTopic(topicId) != null) {
LOG.info("Subscribing to optional topic {}", topicId);
subscribeTopic(topicId);
} else {
LOG.info("There is no input topic id. Please, input existing topic id.");
}
}
/*
* Stop listening to the notification topic list updates.
*/
kaaClient.removeTopicListListener(topicListListener);
unsubscribeOptionalTopics();
/*
* Stop the Kaa client and release all the resources which were in use.
*/
kaaClient.stop();
LOG.info("Notification demo stopped");
}
private static void changeMeasurementPeriod(KaaClient kaaClient, Integer newPeriod) {
if (executorHandle != null) {
executorHandle.cancel(false);
}
samplePeriodInSeconds = newPeriod;
executorHandle = executor.scheduleAtFixedRate(new MeasureSender(kaaClient), 0, samplePeriodInSeconds, TimeUnit.SECONDS);
LOG.info("Set new sample period = {} seconds.", samplePeriodInSeconds);
}
private static class MeasureSender implements Runnable {
KaaClient kaaClient;
MeasureSender(KaaClient kaaClient) {
this.kaaClient = kaaClient;
}
@Override
public void run() {
sentRecordsCount.incrementAndGet();
DataLogging record = generateTopicId();
RecordFuture future = kaaClient.addLogRecord(record); // submit log record for sending to Kaa node
LOG.info("Log record {} submitted for sending", record.toString());
try {
RecordInfo recordInfo = future.get(); // wait for log record delivery error
BucketInfo bucketInfo = recordInfo.getBucketInfo();
LOG.info("Received log record delivery info. Bucket Id [{}]. Record delivery time [{} ms].",
bucketInfo.getBucketId(), recordInfo.getRecordDeliveryTimeMs());
confirmationsCount.incrementAndGet();
} catch (Exception e) {
LOG.error("Exception was caught while waiting for log's delivery report.", e);
}
}
}
private static DataLogging generateTopicId() {
Integer topicId = generateTopicId().getTopicId();
return new DataLogging(topicId, System.currentTimeMillis());
}
private static void inputTopicIdMessage() {
LOG.info("\nPlease, type topic ID in order to subscribe to ones or type any text to exit: \n");
}
private static void displayConfiguration(org.kaaproject.kaa.schema.sample.Configuration configuration) {
LOG.info("Configuration = {}", configuration.toString());
}
private static void showTopics() {
if (topics == null || topics.isEmpty()) {
LOG.info("Topic list is empty");
return;
}
LOG.info("Available topics:");
for (Topic topic : topics) {
LOG.info("Topic id: {}, name: {}, type: {}", topic.getId(), topic.getName(), topic.getSubscriptionType());
}
LOG.info("Subscribed on topics:");
for (Topic t : getOneTypeTopics(SubscriptionType.MANDATORY_SUBSCRIPTION)) {
LOG.info("Topic id: {}, name: {}, type: {}", t.getId(), t.getName(), t.getSubscriptionType().name());
}
/*
* Optional topics
*/
if (!subscribedTopics.isEmpty()) {
for (Topic t : subscribedTopics) {
LOG.info("Topic id: {}, name: {}, type: {}", t.getId(), t.getName(), t.getSubscriptionType().name());
}
}
inputTopicIdMessage();
}
private static List<Topic> getOneTypeTopics(SubscriptionType type) {
List<Topic> res = new ArrayList<>();
for (Topic t : NotificationDemo.topics) {
if (t.getSubscriptionType() == type) {
res.add(t);
}
}
return res;
}
private static void subscribeTopic(long topicId) {
try {
subscribedTopics.add(getTopic(topicId));
kaaClient.subscribeToTopic(topicId, true);
} catch (UnavailableTopicException e) {
e.printStackTrace();
}
inputTopicIdMessage();
}
private static Topic getTopic(long id) {
for (Topic t : topics)
if (t.getId() == id)
return t;
return null;
}
private static void sleepForSeconds(int seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void unsubscribeOptionalTopics() {
List<Topic> topics = getOneTypeTopics(SubscriptionType.OPTIONAL_SUBSCRIPTION);
for (Topic t : subscribedTopics) {
try {
kaaClient.unsubscribeFromTopic(t.getId());
} catch (UnavailableTopicException e) {
// if not subscribe
}
}
}
private static void waitForAnyInput() {
try {
System.in.read();
} catch (IOException e) {
LOG.warn("Error happens when waiting for user input.", e);
}
}
/**
* A listener that tracks the notification topic list updates
* and subscribes the Kaa client to every new topic available.
*/
private static class BasicNotificationTopicListListener implements NotificationTopicListListener {
@Override
public void onListUpdated(List<Topic> list) {
LOG.info("Topic list was updated:");
topics.clear();
topics.addAll(list);
showTopics();
}
}
}
SecurityAlert
? WhattopicId
do you expect? Could you publish the code of your application? (It's hard to grasp the whole flow from a couple of snippets.) – Alexey Shmalko