I am trying to decode and process protobuf-encoded MQTT messages (from an Eclipse Mosquitto broker) using Apache Beam. In addition to the encoded fields, I also want to process the full topic of each message for grouping and aggregations, as well as the timestamp.
What I have tried so far
I can connect to Mosquitto via
val options = PipelineOptionsFactory.create()
val pipeline = Pipeline.create(options)
val mqttReader: MqttIO.Read = MqttIO
.read()
.withConnectionConfiguration(
MqttIO.ConnectionConfiguration.create(
"tcp://localhost:1884",
"my/topic/+"
)
)
val readMessages = pipeline.apply<PCollection<ByteArray>>(mqttReader)
In order to decode the messages, I have compiled the .proto
schema (in my case quote.proto
containing the Quote
message) via Gradle, which allows my to transform ByteArray
into Quote
objects via Quote.parseFrom()
:
val quotes = readMessages
.apply(
ParDo.of(object : DoFn<ByteArray, QuoteOuterClass.Quote>() {
@ProcessElement
fun processElement(context: ProcessContext) {
val protoRow = context.element()
context.output(QuoteOuterClass.Quote.parseFrom(protoRow))
}
})
)
Using this, in the next apply
, I can then access individual fields with a ProcessFunction and a lambda, e.g. { quote -> "${quote.volume}" }
. However, there are two problems:
- With this pipeline I do not have access to the topic or timestamp of each message.
- After sending the decoded messages back to the broker with plain UTF8 encoding, I believe that they do not get decoded correctly.
Additional considerations
- Apache Beam provides a
ProtoCoder
class, but I cannot figure out how to use it in conjunction with MqttIO. I suspect that the implementation has to look similar to
val coder = ProtoCoder
.of(QuoteOuterClass.Quote::class.java)
.withExtensionsFrom(QuoteOuterClass::class.java)
Instead of a
PCollection<ByteArray>
, the Kafka IO reader provides aPCollection<KafkaRecord<Long, String>>
, which has all the relevant fields (including topic). I am wondering if something similar can be achieved with Mqtt + ProtoBuf.A similar implementation to what I want to achieve can be done in Spark Structured Streaming + Apache Bahir as follows:
val df_mqttStream = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", topic)
.load(brokerUrl)
val parsePayload = ProtoSQL.udf { bytes: Array[Byte] => Quote.parseFrom(bytes) }
val quotesDS = df_mqttStream.select("id", "topic", "payload")
.withColumn("quote", parsePayload($"payload"))
.select("id", "topic", "quote.*")
However, with Spark 2.4 (the latest supported version), accessing the message topic is broken (related issue, my ticket in Apache Jira).