0
votes

Does the flink 1.7.2 dataset not support kafka sink ?

After doing the batch operation I need to publish the message to kafka, meaning source is my postgres and sink is my kafka.

Is it possible ?

2

2 Answers

1
votes

You can create your own output format and use Kafka Producer to produce to Kafka. Check the code below.

...
data.output(new KafkaOPFormat());
env.execute();
import java.io.IOException;
import java.util.Properties;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaOPFormat extends RichOutputFormat<Tuple2<String, String>> {

  private final Properties properties = new Properties();
  private KafkaProducer<String, String> producer;

  @Override
  public void configure(Configuration configuration) {
    properties.put("bootstrap.servers", "localhost:9092");
    properties.put("acks", "all");
    properties.put("retries", 0);
    properties.put("batch.size", 16384);
    properties.put("linger.ms", 1);
    properties.put("buffer.memory", 33554432);
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  }

  @Override
  public void open(int i, int i1) throws IOException {
    producer = new KafkaProducer<String, String>(properties);
  }

  @Override
  public void writeRecord(Tuple2<String, String> record) throws IOException {
    producer.send(new ProducerRecord<>(record.f0, record.f1));
  }

  @Override
  public void close() throws IOException {
    producer.close();
  }
}

PS: I do not remember all the configs, do check for your configuration and alter accordingly.

1
votes

Out of the box: not yet, you would have to work with data streams from the beginning onwards or create your own custom output format as mentioned.

There are plans however to unify the DataSet and DataStream APIs in the Apache Flink project for the long-term in Flink 2.0: https://flink.apache.org/roadmap.html