0
votes

I have defined a KafkaReceiver usind WSO2-CEP in management console. It doesn't work and I can read this error on server-console log

    [2016-02-26 10:17:25,412]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver kafkaReceiver
[2016-02-26 10:17:25,415] ERROR {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Error initializing Input Adapter 'kafkaReceiver, hence this will be suspended indefinitely, Cannot access kafka context due to missing jars
org.wso2.carbon.event.input.adapter.core.exception.InputEventAdapterRuntimeException: Cannot access kafka context due to missing jars
    at org.wso2.carbon.event.input.adapter.kafka.KafkaEventAdapter.createConsumerConfig(KafkaEventAdapter.java:114)
    at org.wso2.carbon.event.input.adapter.kafka.KafkaEventAdapter.createKafkaAdaptorListener(KafkaEventAdapter.java:132)
    at org.wso2.carbon.event.input.adapter.kafka.KafkaEventAdapter.connect(KafkaEventAdapter.java:66)
    at org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime.start(InputAdapterRuntime.java:72)
    at org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime.startPolling(InputAdapterRuntime.java:62)
    at org.wso2.carbon.event.input.adapter.core.internal.CarbonInputEventAdapterService.startPolling(CarbonInputEventAdapterService.java:187)
    at org.wso2.carbon.event.receiver.core.internal.CarbonEventReceiverService.startPolling(CarbonEventReceiverService.java:620)
    at org.wso2.carbon.event.receiver.core.internal.CarbonEventReceiverManagementService.startPolling(CarbonEventReceiverManagementService.java:99)
    at org.wso2.carbon.event.processor.manager.core.internal.CarbonEventManagementService$2.run(CarbonEventManagementService.java:184)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoClassDefFoundError: kafka/consumer/ConsumerConfig
    at org.wso2.carbon.event.input.adapter.kafka.KafkaEventAdapter.createConsumerConfig(KafkaEventAdapter.java:112)
    ... 15 more
Caused by: java.lang.ClassNotFoundException: kafka.consumer.ConsumerConfig cannot be found by org.wso2.carbon.event.input.adapter.kafka_5.0.3
    at org.eclipse.osgi.internal.loader.BundleLoader.findClassInternal(BundleLoader.java:501)
    at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:421)
    at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:412)
    at org.eclipse.osgi.internal.baseadaptor.DefaultClassLoader.loadClass(DefaultClassLoader.java:107)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    ... 16 more

If we read the class source at this link https://github.com/wso2/carbon-analytics-common/blob/master/components/event-receiver/event-input-adapters/org.wso2.carbon.event.input.adapter.kafka/src/main/java/org/wso2/carbon/event/input/adapter/kafka/KafkaEventAdapter.java

I read that the error is in ConsumerConfig creation

1

1 Answers

1
votes

In order to use kafka you need to add kafka download and copy following client JAR files to /repository/components/lib/ directory

  • kafka_2.10-0.8.2.1.jar
  • zkclient-0.3.jar
  • scala-library-2.10.4.jar
  • zookeeper-3.4.6.jar
  • metrics-core-2.2.0.jar
  • kafka-clients-0.8.2.1.jar

[1]. https://docs.wso2.com/display/CEP400/Supporting+Different+Transports#SupportingDifferentTransports-KafkaTransport