1
votes

Full disclosure: I also published a variant of this question here.

I have an embedded device as part of a heating system that is publishing two temperature values, each to an individual MQTT topic, every 5 seconds via a mosquitto MQTT broker. "mydevice/sensor1" is the pre-heated temperature, and "mydevice/sensor2" is post-heating temperature. The values are published at almost the same time, so there's typically never more than half a second of delay between the two messages - but they aren't synchronised exactly.

Telegraf is subscribed to the same broker and is happily putting these measurements into an InfluxDB database called "telegraf.autogen". The measurements both appear under a single measurement called "mqtt_consumer" with a field called "value". In InfluxDB I can differentiate between topic-tagged values by filtering with the "topic" tag:

SELECT mean("value") AS "mean_value" FROM "telegraf"."autogen"."mqtt_consumer" WHERE time > now() - 1m AND "topic"='mydevice/sensor1' GROUP BY time(5s)

This all seems to be working correctly.

What I want to do is calculate the difference between these two topic values, for each pair of incoming values, in order to calculate the temperature differential and eventually calculate the energy being transferred by the heating system (the flow rate is constant and known). I tried to do this with InfluxDB queries in Grafana but it seemed quite difficult (I failed), so I thought I'd try and use TICKscript to break down my process into small steps.

I have been putting together a TICKscript to calculate the difference based on this example:

https://docs.influxdata.com/kapacitor/v1.3/guides/join_backfill/#stream-method

However in my case I don't have two separate measurements. Instead, I create two separate streams from the single "mqtt_consumer" measurement, using the topic tag as a filter. Then I attempt to join these with a 1s tolerance (values are always published close enough in time). I'm using httpOut to generate a view for debugging (Aside: this only updates every 10 seconds, missing every second value, even though my stream operates at 5 second intervals - why is that? I can see in the new db that the values are all present though).

Once I have them joined, I would evaluate the difference in values, and store this in a new database under a measurement called "diff".

Here's my script so far:

var sensor1 = stream
    |from()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('mqtt_consumer')
        .where(lambda: "topic" == 'mydevice/sensor1')
        .groupBy(*)
    |httpOut('sensor1')

var sensor2 = stream
    |from()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('mqtt_consumer')
        .where(lambda: "topic" == 'mydevice/sensor2')
        .groupBy(*)
    |httpOut('sensor2')

sensor1
    |join(sensor2)
        .as('value1', 'value2')
        .tolerance(1s)
    |httpOut('join')
    |eval(lambda: "sensor1.value1" - "sensor1.value2")
        .as('diff')
    |httpOut('diff')
    |influxDBOut()
        .create()
        .database('mydb')
        .retentionPolicy('myrp')
        .measurement('diff')

Unfortunately my script is failing to pass any items through the join node. In kapacitor show I can see that the httpOut nodes are both passing items to the join node, but it isn't passing any on. The kapacitor logs don't show anything obvious either. An HTTP GET for httpOut('join') returns:

{"series":null}

I have two questions:

  1. is this approach, using Kapacitor with a TICKscript for calculating energy based on the difference between two values in a single measurement, valid? Or is there a better/simpler way to do this?
  2. why isn't the join node producing any output? What can I do to debug this further?
1
Have you resolved your issue? Because I guess I have similar one, join not producing (or receiving any output).Bunyk
No, not really. I ended up using a different approach where I calculated the values I needed prior to sending to Telegraf.davidA

1 Answers

0
votes

Try to add |mean node, to calculate the mean of the field, in both sensors:

var sensor1 = stream
    |from()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('mqtt_consumer')
        .where(lambda: "topic" == 'mydevice/sensor1')
        .groupBy(*)
    |mean('field1')
    |httpOut('sensor1')

After the join, you should use the newly assigned name to the streams nor the original ones:

sensor1
    |join(sensor2)
        .as('value1', 'value2')
        .tolerance(1s)
    |httpOut('join')
    |eval(lambda: "value1.field1" - "value2.field2")
        .as('diff')
    |httpOut('diff')
    |influxDBOut()
        .create()
        .database('mydb')
        .retentionPolicy('myrp')
        .measurement('diff')

Where mean fields are the field calculated on my previous comment. Try it out!

Also, to further debugging, try to add log nodes where you want to put your eye.

Hope this helps! Regards