I wanted to setup a basic producer-consumer with Flink on Kafka but I am having trouble producing data to an existing consumer via Java.
CLI solution
I setup a
Kafka broker
usingkafka_2.11-2.4.0
zip fromhttps://kafka.apache.org/downloads
with commandsbin/zookeeper-server-start.sh config/zookeeper.properties
and
bin/kafka-server-start.sh config/server.properties
I create a topic called transactions1 using
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic transactions1
Now I can use a producer and consumer on the command line to see that the topic has been created and works.
To setup consumer I run
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic transactions1 --from-beginning
Now if any producer sends data to the topic
transactions1
I will see it in the consumer console.I test that the consumer is working by running
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic transactions1
and enter the following data lines in the producer in cli which also show up in consumer cli.
{"txnID":1,"amt":100.0,"account":"AC1"}
{"txnID":2,"amt":10.0,"account":"AC2"}
{"txnID":3,"amt":20.0,"account":"AC3"}
Now I want to replicate step 3 i.e producer and consumer in Java code which is the core problem of this question.
- So I setup a gradle java8 project with build.gradle
...
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
compile group: 'org.apache.flink', name: 'flink-connector-kafka_2.11', version: '1.9.0'
compile group: 'org.apache.flink', name: 'flink-core', version: '1.9.0'
// https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.12', version: '1.9.2'
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.5'
compile group: 'com.twitter', name: 'chill-thrift', version: '0.7.6'
compile group: 'org.apache.thrift', name: 'libthrift', version: '0.11.0'
compile group: 'com.twitter', name: 'chill-protobuf', version: '0.7.6'
compile group: 'org.apache.thrift', name: 'protobuf-java', version: '3.7.0'
}
...
- I setup a Custom Class
Transactions.class
where you can suggest changes to the Serialization Logic using Kryo, Protobuf or TbaseSerializer by extending classes relevant to Flink.
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
public class Transaction {
public final int txnID;
public final float amt;
public final String account;
public Transaction(int txnID, float amt, String account) {
this.txnID = txnID;
this.amt = amt;
this.account = account;
}
public String toJSONString() {
Gson gson = new Gson();
return gson.toJson(this);
}
public static Transaction fromJSONString(String some) {
Gson gson = new Gson();
return gson.fromJson(some, Transaction.class);
}
public static MapFunction<String, String> mapTransactions() {
MapFunction<String, String> map = new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
if (value != null || value.trim().length() > 0) {
try {
return fromJSONString(value).toJSONString();
} catch (Exception e) {
return "";
}
}
return "";
}
};
return map;
}
@Override
public String toString() {
return "Transaction{" +
"txnID=" + txnID +
", amt=" + amt +
", account='" + account + '\'' +
'}';
}
}
- Now time to use Flink to Produce and Consume streams on topic
transactions1
.
public class SetupSpike {
public static void main(String[] args) throws Exception {
System.out.println("begin");
List<Transaction> txns = new ArrayList<Transaction>(){{
add(new Transaction(1, 100, "AC1"));
add(new Transaction(2, 10, "AC2"));
add(new Transaction(3, 20, "AC3"));
}};
// This list txns needs to be serialized in Flink as Transaction.class->String->ByteArray
//via producer and then to the topic in Kafka broker
//and deserialized as ByteArray->String->Transaction.class from the Consumer in Flink reading Kafka broker.
String topic = "transactions1";
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", topic);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//env.getConfig().addDefaultKryoSerializer(Transaction.class, TBaseSerializer.class);
// working Consumer logic below which needs edit if you change serialization
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), properties);
myConsumer.setStartFromEarliest(); // start from the earliest record possible
DataStream<String> stream = env.addSource(myConsumer).map(Transaction::toJSONString);
//working Producer logic below which works if you are sinking a pre-existing DataStream
//but needs editing to work with Java List<Transaction> datatype.
System.out.println("sinking expanded stream");
MapFunction<String, String> etl = new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
if (value != null || value.trim().length() > 0) {
try {
return fromJSONString(value).toJSONString();
} catch (Exception e) {
return "";
}
}
return "";
}
};
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(topic,
new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
try {
System.out.println(element);
return new ProducerRecord<byte[], byte[]>(topic, stringToBytes(etl.map(element)));
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}, properties, Semantic.EXACTLY_ONCE);
// stream.timeWindowAll(Time.minutes(1));
stream.addSink(myProducer);
JobExecutionResult execute = env.execute();
}
}
As you can see I am not able to do this with the List txns
provided. The above is working code I could gather from Flink documentation to redirect topic stream data and sending data manually via Cli producer. The problem is writing KafkaProducer code in java that sends data to the topic, which is further compounded with issues like
- Adding Timestamps, Watermarks
- KeyBy operations
- GroupBy/WindowBy operations
- Adding custom ETL logic before Sinking.
- Serialization/Deserialization logic in Flink
Can someone who has worked with Flink please help me with how to Produce the txns
List to transactions1
topic in Flink and then verify that it works with Consumer?
Also any help on the issues of adding timestamp or some processing before sinking will be of great help. You can find source code on https://github.com/devssh/kafkaFlinkSpike and the intent is generate Flink boilerplate to add details of "AC1" from an in-memory store and join it with the Transaction event coming in real time to send expanded output to user.