0
votes

I am new to Flink Stream processing, and need some help with Flink Kafka producer as cannot find much related to it after some time of searching. I am currently reading stream from a Kafka topic and then after performing some calculation I want to write this to new a seperate topic in Kafka. But the problem I am facing is that I cannot send key to the Kafka topic. I am using Flink Kafka connector which gives me FlinkKafkaConsumer and FlinkKafkaProducer. For more detailed look below is my code, what can I change in my code that it could work, currently on Kafka that I am producing my message are going with null in Key where as the value is what that I need:

Properties consumerProperties = new Properties();
    
    consumerProperties.setProperty("bootstrap.servers", serverURL);
    consumerProperties.setProperty("group.id", groupID);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(consumerTopicName,
            new SimpleStringSchema(), consumerProperties);

    kafkaConsumer.setStartFromEarliest();
    DataStream<String> kafkaConsumerStream = env.addSource(kafkaConsumer);
    final int[] tVoteCount = {0};
    
    DataStream<String> kafkaProducerStream = kafkaConsumerStream.map(new MapFunction<String, String>() {
        @Override
        public String map(String value) throws InterruptedException, IOException {
            JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
            Tcount = Tcount + jsonNode.get(key1).asInt();
            int nameCandidate = jsonNode.get(key2).asInt();
            System.out.println(Tcount);
            String tCountT = Integer.toString(Tcount);
            //tVoteCount = tVoteCount + voteCount;
             
            //waitForEventTime(timeStamp);
            return tCountT;
        }
    });
    kafkaConsumerStream.print();
    System.out.println("sdjknvksjdnv"+Tcount);
    Properties producerProperties = new Properties();
    producerProperties.setProperty("bootstrap.servers", serverURL);
    FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(producerTopicName,
            new SimpleStringSchema(), producerProperties);
    kafkaProducerStream.addSink(kafkaProducer);
    env.execute();

Thanks.

2

2 Answers

1
votes

In this blog you will find an example on how to write key and topic into a topic:

You need to replace your creation of a new FlinkKafkaProducer with something like below:

FlinkKafkaProducer<KafkaRecord> kafkaProducer = 
  new FlinkKafkaProducer<KafkaRecord>(
    producerTopicName, 
    ((record, timestamp) -> new ProducerRecord<byte[], byte[]>(producerTopicName, record.key.getBytes(), record.value.getBytes())), 
    producerProperties
  );
0
votes

If you provide your own KafkaSerializationSchema rather than using a SimpleStringSchema, then you will have complete control over what's written. @mike has provided an example of how to do that in his answer.