I'm a very newbie of Flink and cluster computing. I spent all day trying to parse correctly on Flink a stupid stream from Kafka with NONE results: It's a bit frustrating... I've in kafka a stream of JSON-LD messages identified with a string key. I simply would like to retrieve them in Flink and then separate messages with different keys.
1) Initially I considered to send messages as String instead of JSON-LD. I though was easier...
I tried every deserialiser but none works. The simple deserialiser obsviously works but it completely ignore keys.
I believed I had to use (Flink apparently has just two deserialiser which support keys):
DataStream<Object> stream = env
.addSource(new FlinkKafkaConsumer010<>("topicTest", new TypeInformationKeyValueSerializationSchema(String.class, String.class, env.getConfig()), properties))
.rebalance();
stream.print();
But I obtain:
06/12/2017 02:09:12 Source: Custom Source(4/4) switched to FAILED java.io.EOFException at org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:306)
How can I receive stream messages without lose keys?
2) My kafka producer is implemented in javascript, since Flink support JSONDeserialization I though to send in kafka directly JSON Object. I'm not sure that's works correctly with JSON-LD but I've used:
json.parse(jsonld_message)
to serialize as json the message. Then I sent this with usual string key.
But in Flink this code doesn't work:
DataStream<ObjectNode> stream = env
.addSource(new FlinkKafkaConsumer010<>("topicTest", new JSONKeyValueDeserializationSchema(false), properties))
.rebalance();
stream.print();
raising a
JsonParserException.
I think first approach is simpler and I prefer it because allows to consider one problem at time (first: receive data, second: reconvert string in JSON-LD with external library I guess).