1
votes

I have a data stream like :

DataStream[myTuple(topic, value)]

And I wanted to send a specific value in the associated topic.

So I try to do somthing like that :

new FlinkKafkaProducer010[myTuple](
  "default_topic",
  new KeyedSerializationSchema[myTuple](){
    override def getTargetTopic(element: myTuple): String = element.topic
    override def serializeKey(element: myTuple): Array[Byte] = null
    override def serializeValue(element: myTuple): Array[Byte] = new SimpleStringSchema().serialize(element.value)
  },
  properties)

but it doesn't work and I have this warning:

WARN  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase  - Overwriting the 'key.serializer' is not recommended
WARN  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase  - Overwriting the 'value.serializer' is not recommended

I have no idea how to do it, by another way. thx for your help.

1

1 Answers

0
votes

You probably set key.serializer and value.serializer in your properties. You should not do this, cause this way you overwrite serializers (ByteArraySerializers) used internally by Flink. Remove those properties and your code should work.