
Does the flink 1.7.2 dataset not support kafka sink ?

After doing the batch operation I need to publish the message to kafka, meaning source is my postgres and sink is my kafka.

Is it possible ?


2 Answers


You can create your own output format and use Kafka Producer to produce to Kafka. Check the code below.

data.output(new KafkaOPFormat());
import java.io.IOException;
import java.util.Properties;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaOPFormat extends RichOutputFormat<Tuple2<String, String>> {

  private final Properties properties = new Properties();
  private KafkaProducer<String, String> producer;

  public void configure(Configuration configuration) {
    properties.put("bootstrap.servers", "localhost:9092");
    properties.put("acks", "all");
    properties.put("retries", 0);
    properties.put("batch.size", 16384);
    properties.put("linger.ms", 1);
    properties.put("buffer.memory", 33554432);
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

  public void open(int i, int i1) throws IOException {
    producer = new KafkaProducer<String, String>(properties);

  public void writeRecord(Tuple2<String, String> record) throws IOException {
    producer.send(new ProducerRecord<>(record.f0, record.f1));

  public void close() throws IOException {

PS: I do not remember all the configs, do check for your configuration and alter accordingly.


Out of the box: not yet, you would have to work with data streams from the beginning onwards or create your own custom output format as mentioned.

There are plans however to unify the DataSet and DataStream APIs in the Apache Flink project for the long-term in Flink 2.0: https://flink.apache.org/roadmap.html