1
votes

I'm trying to do stream processing and CEP on a Kafka message stream. For this I picked Apache Ignite to realise a prototype first. However I cannot connect to the queue:

Use kafka_2.11-0.10.1.0 apache-ignite-fabric-1.8.0-bin

bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Kafka works properly, I tested it with a consumer. Then I start ignite, then I run following in a spring boot commandline app.

    KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();

    Ignition.setClientMode(true);

    Ignite ignite = Ignition.start();

    Properties settings = new Properties();
    // Set a few key parameters
    settings.put("bootstrap.servers", "localhost:9092");
    settings.put("group.id", "test");
    settings.put("zookeeper.connect", "localhost:2181");
    settings.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    settings.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    settings.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    settings.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    // Create an instance of StreamsConfig from the Properties instance
    kafka.consumer.ConsumerConfig config = new ConsumerConfig(settings);

    IgniteCache<String, String> cache = ignite.getOrCreateCache("myCache");

    try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache")) {
        // allow overwriting cache data
        stmr.allowOverwrite(true);

        kafkaStreamer.setIgnite(ignite);
        kafkaStreamer.setStreamer(stmr);

        // set the topic
        kafkaStreamer.setTopic("test");

        // set the number of threads to process Kafka streams
        kafkaStreamer.setThreads(1);

        // set Kafka consumer configurations
        kafkaStreamer.setConsumerConfig(config);

        // set decoders
        StringDecoder keyDecoder = new StringDecoder(null);
        StringDecoder valueDecoder = new StringDecoder(null);

        kafkaStreamer.setKeyDecoder(keyDecoder);
        kafkaStreamer.setValueDecoder(valueDecoder);

        kafkaStreamer.start();
    } finally {
        kafkaStreamer.stop();
    }

When the application starts I get

2017-02-23 10:25:23.409 WARN 1388 --- [ main] kafka.utils.VerifiableProperties : Property bootstrap.servers is not valid 2017-02-23 10:25:23.410 INFO 1388 --- [ main] kafka.utils.VerifiableProperties : Property group.id is overridden to test 2017-02-23 10:25:23.410 WARN 1388 --- [ main] kafka.utils.VerifiableProperties : Property key.deserializer is not valid 2017-02-23 10:25:23.411 WARN 1388 --- [ main] kafka.utils.VerifiableProperties : Property key.serializer is not valid 2017-02-23 10:25:23.411 WARN 1388 --- [ main] kafka.utils.VerifiableProperties : Property value.deserializer is not valid 2017-02-23 10:25:23.411 WARN 1388 --- [ main] kafka.utils.VerifiableProperties : Property value.serializer is not valid 2017-02-23 10:25:23.411 INFO 1388 --- [ main] kafka.utils.VerifiableProperties : Property zookeeper.connect is overridden to localhost:2181

Then

2017-02-23 10:25:24.057 WARN 1388 --- [r-finder-thread] kafka.client.ClientUtils$ : Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,user.local,9092)] failed

java.nio.channels.ClosedChannelException: null at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) ~[kafka_2.11-0.10.0.1.jar:na] at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) ~[kafka_2.11-0.10.0.1.jar:na] at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) ~[kafka_2.11-0.10.0.1.jar:na] at kafka.producer.SyncProducer.send(SyncProducer.scala:124) ~[kafka_2.11-0.10.0.1.jar:na] at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) [kafka_2.11-0.10.0.1.jar:na] at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) [kafka_2.11-0.10.0.1.jar:na] at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) [kafka_2.11-0.10.0.1.jar:na] at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [kafka_2.11-0.10.0.1.jar:na]

And reading from the queue doesn't work. Does anyone have an idea how to fix this?

Edit: If I comment the contents of the finally block then following error comes

[2m2017-02-27 16:42:27.780[0;39m [31mERROR[0;39m [35m29946[0;39m [2m---[0;39m [2m[pool-3-thread-1][0;39m [36m [0;39m [2m:[0;39m Message is ignored due to an error [msg=MessageAndMetadata(test,0,Message(magic = 1, attributes = 0, CreateTime = -1, crc = 2558126716, key = java.nio.HeapByteBuffer[pos=0 lim=1 cap=79], payload = java.nio.HeapByteBuffer[pos=0 lim=74 cap=74]),15941704,kafka.serializer.StringDecoder@74a96647,kafka.serializer.StringDecoder@42849d34,-1,CreateTime)]

java.lang.IllegalStateException: Data streamer has been closed. at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.enterBusy(DataStreamerImpl.java:401) ~[ignite-core-1.8.0.jar:1.8.0] at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addDataInternal(DataStreamerImpl.java:613) ~[ignite-core-1.8.0.jar:1.8.0] at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:667) ~[ignite-core-1.8.0.jar:1.8.0] at org.apache.ignite.stream.kafka.KafkaStreamer$1.run(KafkaStreamer.java:180) ~[ignite-kafka-1.8.0.jar:1.8.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_111] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]

Thanks!

1

1 Answers

1
votes

I think this happens because KafkaStreamer is getting closed right after it's started (kafkaStreamer.stop() call in finally block). kafkaStreamer.start() is not synchronous, it just spins out threads to consume from Kafka and exits.