1
votes

I am publishing data to MQTT topics with the Mosquitto broker, and am trying to pipe the data points through to my InfluxDB database. Currently I am trying to do this using the mqtt_consumer input plugin for Telegraf.

A simplified version of my telegraf.conf file looks like this:

# Global Agent Configuration
[agent]
  interval = "1m"

# Input Plugins 
[[inputs.mqtt_consumer]]
  name_prefix = "sensors_"
  servers = ["localhost:1883"]
  qos = 0
  connection_timeout = "30s"
  topics = [
    "/inside/kitchen/temperature"
  ]
  persistent_session = false
  data_format = "value"
  data_type = "float"

# Output Plugins
[[outputs.influxdb]]
  database = "telegraf"
  urls = [ "http://localhost:8086" ]

Now, I can manually publish a data point using MQTT via the following:

~ $ mosquitto_pub -d -t /inside/kitchen/temperature -m 23.5

where I have used the MQTT topic /inside/kitchen/temperature and a value of 23.5.

Examining my InfluxDB database, I see the following data points:

name: sensors_mqtt_consumer
time                 topic                        value
----                 -----                        -----
2020-06-27T20:08:50  /inside/kitchen/temperature  23.5
2020-06-27T20:08:40  /inside/kitchen/temperature  23.5

Is there any way that I can use the MQTT topic name description to properly allocate Influx tags? For example, I would like something like:

name: temperature
time                location  room     value
----                -----     -----    -----
2020-06-27T20:08:50 inside    kitchen  23.5
2020-06-27T20:08:40 inside    kitchen  23.5

Is there a way to do this with the InfluxDB/Mosquitto/Telegraf configurations? Later I will be adding more topics (locations, etc) and measurements (humidity, voltage, etc), so the solution should allow for this.

(I know that this can be done by by choosing data_format = "influx" as described here, where Telegraf interprets the message as InfluxDB line protocol and passes it through directly. However, then I would have to publish to the topic in this way:

mosquitto_pub -d -t /inside/kitchen/temperature -m "temperature,location=inside,room=kitchen value=23.5"

where you can see that most of the information has been input twice, even though it already existed. The same can be said for the data_format="json" option. What I need is more of a mapping).

2
I've been toying around with telegraf for a long time, trying do exactly what you explain. Never got it to work properly. Probably you'd need to write your own telegraf plugin to get this work - toying around with the configurations wouldn't solve this issue for me. Ended up writing a small python app that did the job.fxweidinger
I am doing exactly that as we speak. Probably landed upon the same (paho-mqtt + influxdb) python library combination as you?teeeeee

2 Answers

1
votes

New to telegraf but I have had some success with something along the lines of:

[[inputs.mqtt_consumer]]
  name_override = "sensors"
  servers = ["localhost:1883"]
  qos = 0
  connection_timeout = "30s"
  topics = [
    "/inside/kitchen/temperature"
  ]
  persistent_session = false
  data_format = "value"
  data_type = "float"


[[processors.regex]]
  namepass = ["sensors"]
  # Base topic present
  [[processors.regex.tags]]
    key = "topic"
    pattern = "(.*)/.*/.*/"
    replacement = "${1}"
    result_key = "location"
  [[processors.regex.tags]]
    key = "topic"
    pattern = ".*/(.*)/.*"
    replacement = "${1}"
    result_key = "room"
  [[processors.regex.tags]]
    key = "topic"
    pattern = ".*/.*/(.*)"
    replacement = "${1}"
    result_key = "value"

The regex processor uses golang syntax.

1
votes

I found a comprehensive answer in the telegraf repo's issue threads. Copied here for convenience. Since I had control over the message source I used the last option.


You have a few options, example configs below are untested and might need tweaked.

You can have multiple plugins, one for each type and use name_override to vary the measurement name:

[[inputs.mqtt_consumer]]
  name_override = "my_integer"
  topics = ["/XXX"]
  data_format = "value"
  data_type = "integer"

[[inputs.mqtt_consumer]]
  name_override = "my_string"
  topics = ["/SSS"]
  data_format = "value"
  data_type = "string"

You can have a single plugin reading all values as strings, and rename the fields and convert the integers using processors:

[[inputs.mqtt_consumer]]
  topics = ["/#"]
  data_format = "value"
  data_type = "string"

[[processors.rename]]
  order = 1
  [processors.rename.tagpass]
    topic = ["/SSS"]

  [[processors.rename.replace]]
    field = "value"
    dest = "my_string"

[[processors.rename]]
  order = 2
  [processors.rename.tagpass]
    topic = ["/XXX"]

  [[processors.rename.replace]]
    field = "value"
    dest = "my_integer"

[[processors.converter]]
  order = 3
  [processors.converter.tagpass]
    topic = ["/XXX"]

  [[processors.converter.fields]]
    integer = ["my_integer"]

Last "option" is to write a format that can be converted automatically on read, such as InfluxDB Line Protocol which will give you the most control over how the data looks. I recommend this if it is possible to modify the incoming data:

[[inputs.mqtt_consumer]]
  topics = ["/#"]
  data_format = "influx"