1
votes

I'm a very newbie of Flink and cluster computing. I spent all day trying to parse correctly on Flink a stupid stream from Kafka with NONE results: It's a bit frustrating... I've in kafka a stream of JSON-LD messages identified with a string key. I simply would like to retrieve them in Flink and then separate messages with different keys.

1) Initially I considered to send messages as String instead of JSON-LD. I though was easier...

I tried every deserialiser but none works. The simple deserialiser obsviously works but it completely ignore keys.

I believed I had to use (Flink apparently has just two deserialiser which support keys):

DataStream<Object> stream = env
            .addSource(new FlinkKafkaConsumer010<>("topicTest", new TypeInformationKeyValueSerializationSchema(String.class, String.class, env.getConfig()), properties))
            .rebalance();

    stream.print();

But I obtain:

06/12/2017 02:09:12 Source: Custom Source(4/4) switched to FAILED java.io.EOFException at org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:306)

How can I receive stream messages without lose keys?

2) My kafka producer is implemented in javascript, since Flink support JSONDeserialization I though to send in kafka directly JSON Object. I'm not sure that's works correctly with JSON-LD but I've used:

json.parse(jsonld_message)

to serialize as json the message. Then I sent this with usual string key.

But in Flink this code doesn't work:

DataStream<ObjectNode> stream = env
            .addSource(new FlinkKafkaConsumer010<>("topicTest", new JSONKeyValueDeserializationSchema(false), properties))
            .rebalance();

    stream.print();

raising a

JsonParserException.

I think first approach is simpler and I prefer it because allows to consider one problem at time (first: receive data, second: reconvert string in JSON-LD with external library I guess).

3

3 Answers

1
votes

SOLVED:

Finally I decided to implement a custom deserializer implementing the KeyedDeserializedSchema interface.

0
votes

In order to use Flink's TypeInformationKeyValueSerializationSchema to read data from Kafka it must be written in a compatible way. Assuming that your key and value are of type String, then the key and value must be written in a way that Flink's StringSerializer understands the data.

Consequently, you have to make sure that your Kafka producer writes the data in a compatible way. Otherwise Flink' won't be able to read the data.

0
votes

** I faced similar issue. Ideally TypeInformationKeyValueSerializationSchema with String types for keys and values should have been able to read my kafka record which has both keys and values as Strings. but it was not able to and had a EOF exception as pointed out by above post.So this issue is easily reproducible and needs to be fixed. Please let me know if i can be of any help in this process.In the meantime implemented Custom Serializer using

Kafka Deserializer Schema

. Here is the code as there is little doc regarding it to read keys/values and additional things: **

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;

import org.apache.kafka.clients.consumer.ConsumerRecord;

public class CustomKafkaSerializer implements KafkaDeserializationSchema<Tuple2<String,String>> {


    @Override
    public boolean isEndOfStream(Tuple2<String,String> stringStringPair) {
        return false;
    }

    @Override
    public Tuple2<String,String> deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
        String key = new String(consumerRecord.key());
        String value = new String(consumerRecord.value());
        return new Tuple2<>(key,value);
    }

    @Override
    public TypeInformation<Tuple2<String,String>> getProducedType() {
        return TypeInformation.of(new TypeHint<Tuple2<String, String>>(){});
    }

}