0
votes

What I did was to read in a message from kafka in json format. E.g.

{"a":1,"b":2}

Then I applied a filter to this message to make sure the value corresponding to a is 1, the value of b is 2. Finally, I want to output the result stream to a downstream kafka. However, I don't know why the compiler says type mismatch.

My code is as follows:

val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topic"),
new JSONDeserializationSchema(),
params.getProperties)

val messageStream = env.addSource(kafkaConsumer).rebalance
val filteredStream: DataStream[ObjectNode] = messageStream.filter(jsonNode => jsonNode.get("a").asText.equals("1")
                      && jsonNode.get("b").asText.equals("2"))

filteredStream.addSink(new FlinkKafkaProducer010[Object](params.getRequired("output-topic"), new SimpleStringSchema, params.getProperties))

The error I got is shown in the image below: enter image description here

I refer to the flink kafka connector document to write the kafka outstream code: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html

2

2 Answers

1
votes

You have a stream DataStream of type ObjectNode, so you need to provide FlinkKafkaProducer010[ObjectNode] e.g:

stream1.addSink(new FlinkKafkaProducer010[ObjectNode](params.getRequired("output-topic"), new SerializationSchema[ObjectNode] {
  override def serialize(element: ObjectNode): Array[Byte] = ???
} ), params.getProperties) 

All generic types in java are invariant in type, that is why you cannot just pass FlinkKafkaProducer010[Object].

Another problem you may encounter further is that you also need to provide SerializationSchema[ObjectNode] whereas the SimpleStringSchema implements SerializationSchema[String].

1
votes

Adding to what @Dawid already pointed out, you can provide the serialization schema for the ObjectNode (Assuming it's a POJO, as I haven't tested it for other objects) as follows:

TypeInformation<ObjectNode> typeInfo =
        TypeInformation.of(new TypeHint<ObjectNode>() {});
TypeInformationSerializationSchema<ObjectNode> serdeSchema =
        new TypeInformationSerializationSchema<>(typeInfo, env.getConfig());

and then use serdeschema as follows for the KafkaPrducer sink:

FlinkKafkaProducer010<RecordReadEventType> kafkaSink =
                new FlinkKafkaProducer010<>(
                                BOOTSTRAP_SERVERS,
                                "output-topic",
                                serdeSchema);

Hopefully, this would solve your issues with kafka sink conflicts.