My goal is to use kafka to read in a string in json format, do a filter to the string, select part of the message and sink the message out (still in json string format).
For testing purpose, my input string message looks like:
{"a":1,"b":2,"c":"3"}
And my code of implementation is:
def main(args: Array[String]): Unit = {
val inputProperties = new Properties()
inputProperties.setProperty("bootstrap.servers", "localhost:9092")
inputProperties.setProperty("group.id", "myTest2")
val inputTopic = "test"
val outputProperties = new Properties()
outputProperties.setProperty("bootstrap.servers", "localhost:9092")
val outputTopic = "test2"
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
// create a checkpoint every 5 seconds
env.enableCheckpointing(5000)
// create a Kafka streaming source consumer for Kafka 0.10.x
val kafkaConsumer = new FlinkKafkaConsumer010(
inputTopic,
new JSONDeserializationSchema(),
inputProperties)
val messageStream : DataStream[ObjectNode]= env
.addSource(kafkaConsumer).rebalance
val filteredStream: DataStream[ObjectNode] = messageStream.filter(node => node.get("a")
.asText.equals("1") && node.get("b").asText.equals("2"))
// Need help in this part, how to extract for instance a,c and
// get something like {"a":"1", "c":"3"}?
val testStream:DataStream[JsonNode] = filteredStream.map(
node => {
node.get("a")
}
)
testStream.addSink(new FlinkKafkaProducer010[JsonNode](
outputTopic,
new SerializationSchema[JsonNode] {
override def serialize(element: JsonNode): Array[Byte] = element.toString.getBytes()
}, outputProperties
))
env.execute("Kafka 0.10 Example")
}
As shown in the comment of this code, I am not sure how to properly select part of the message. I use map, but I don't know how to concatenate the whole message. For instance, what I did in the code can only give me a result as "1", but what I want is {"a":1, "c":"3"}
Or maybe there is a completely different way to solve this problem. The thing is in spark streaming there is a "select" API, however I cannot find it in Flink.
And thanks a lot for flink community's help! This is the last feature I would like to achieve in this small project.