Does the flink 1.7.2 dataset not support kafka sink ?
After doing the batch operation I need to publish the message to kafka, meaning source is my postgres and sink is my kafka.
Is it possible ?
You can create your own output format and use Kafka Producer to produce to Kafka. Check the code below.
...
data.output(new KafkaOPFormat());
env.execute();
import java.io.IOException;
import java.util.Properties;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaOPFormat extends RichOutputFormat<Tuple2<String, String>> {
private final Properties properties = new Properties();
private KafkaProducer<String, String> producer;
@Override
public void configure(Configuration configuration) {
properties.put("bootstrap.servers", "localhost:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}
@Override
public void open(int i, int i1) throws IOException {
producer = new KafkaProducer<String, String>(properties);
}
@Override
public void writeRecord(Tuple2<String, String> record) throws IOException {
producer.send(new ProducerRecord<>(record.f0, record.f1));
}
@Override
public void close() throws IOException {
producer.close();
}
}
PS: I do not remember all the configs, do check for your configuration and alter accordingly.
Out of the box: not yet, you would have to work with data streams from the beginning onwards or create your own custom output format as mentioned.
There are plans however to unify the DataSet
and DataStream
APIs in the Apache Flink project for the long-term in Flink 2.0: https://flink.apache.org/roadmap.html