What I did was to read in a message from kafka in json format. E.g.
{"a":1,"b":2}
Then I applied a filter to this message to make sure the value corresponding to a is 1, the value of b is 2. Finally, I want to output the result stream to a downstream kafka. However, I don't know why the compiler says type mismatch.
My code is as follows:
val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topic"),
new JSONDeserializationSchema(),
params.getProperties)
val messageStream = env.addSource(kafkaConsumer).rebalance
val filteredStream: DataStream[ObjectNode] = messageStream.filter(jsonNode => jsonNode.get("a").asText.equals("1")
&& jsonNode.get("b").asText.equals("2"))
filteredStream.addSink(new FlinkKafkaProducer010[Object](params.getRequired("output-topic"), new SimpleStringSchema, params.getProperties))
The error I got is shown in the image below:
I refer to the flink kafka connector document to write the kafka outstream code: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html