10
votes

I have creating a simple Kafka Producer & Consumer.I am using kafka_2.11-0.9.0.0. Here is my Producer code.

public class KafkaProducerTest {
public static String topicName = "test-topic-2";
public static void main(String[] args) {
    // TODO Auto-generated method stub
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer",
            StringSerializer.class.getName());
    props.put("value.serializer",
            StringSerializer.class.getName());

    Producer<String, String> producer = new KafkaProducer(props);
    for (int i = 0; i < 100; i++) {
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
                topicName, Integer.toString(i), Integer.toString(i));
        System.out.println(producerRecord);
        producer.send(producerRecord);
    }

    producer.close();
}

}

While starting the bundle I a facing the below error:

2016-05-20 09:44:57,792 | ERROR | nsole user karaf | ShellUtil                        | 44 - org.apache.karaf.shell.core - 4.0.3 | Exception caught while executing command
org.apache.karaf.shell.support.MultiException: Error executing command on bundles:
    Error starting bundle162: Activator start error in bundle NewKafkaArtifact [162].
    at org.apache.karaf.shell.support.MultiException.throwIf(MultiException.java:61)
    at org.apache.karaf.bundle.command.BundlesCommand.doExecute(BundlesCommand.java:69)[24:org.apache.karaf.bundle.core:4.0.3]
    at org.apache.karaf.bundle.command.BundlesCommand.execute(BundlesCommand.java:54)[24:org.apache.karaf.bundle.core:4.0.3]
    at org.apache.karaf.shell.impl.action.command.ActionCommand.execute(ActionCommand.java:83)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.karaf.shell.impl.console.osgi.secured.SecuredCommand.execute(SecuredCommand.java:67)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.karaf.shell.impl.console.osgi.secured.SecuredCommand.execute(SecuredCommand.java:87)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.felix.gogo.runtime.Closure.executeCmd(Closure.java:480)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.felix.gogo.runtime.Closure.executeStatement(Closure.java:406)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.felix.gogo.runtime.Pipe.run(Pipe.java:108)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.felix.gogo.runtime.Closure.execute(Closure.java:182)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.felix.gogo.runtime.Closure.execute(Closure.java:119)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.felix.gogo.runtime.CommandSessionImpl.execute(CommandSessionImpl.java:94)[44:org.apache.karaf.shell.core:4.0.3]
    at org.apache.karaf.shell.impl.console.ConsoleSessionImpl.run(ConsoleSessionImpl.java:270)[44:org.apache.karaf.shell.core:4.0.3]
    at java.lang.Thread.run(Thread.java:745)[:1.8.0_66]
Caused by: java.lang.Exception: Error starting bundle162: Activator start error in bundle NewKafkaArtifact [162].
    at org.apache.karaf.bundle.command.BundlesCommand.doExecute(BundlesCommand.java:66)[24:org.apache.karaf.bundle.core:4.0.3]
    ... 12 more
Caused by: org.osgi.framework.BundleException: Activator start error in bundle NewKafkaArtifact [162].
    at org.apache.felix.framework.Felix.activateBundle(Felix.java:2276)[org.apache.felix.framework-5.4.0.jar:]
    at org.apache.felix.framework.Felix.startBundle(Felix.java:2144)[org.apache.felix.framework-5.4.0.jar:]
    at org.apache.felix.framework.BundleImpl.start(BundleImpl.java:998)[org.apache.felix.framework-5.4.0.jar:]
    at org.apache.karaf.bundle.command.Start.executeOnBundle(Start.java:38)[24:org.apache.karaf.bundle.core:4.0.3]
    at org.apache.karaf.bundle.command.BundlesCommand.doExecute(BundlesCommand.java:64)[24:org.apache.karaf.bundle.core:4.0.3]
    ... 12 more
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.StringSerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.StringSerializer could not be found.
    at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:255)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
    at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:145)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:49)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
    at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:317)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:181)[141:kafka-examples:1.0.0.SNAPSHOT-jar-with-dependencies]
    at com.NewKafka.NewKafkaArtifact.KafkaProducerTest.main(KafkaProducerTest.java:25)[162:NewKafkaArtifact:0.0.1.SNAPSHOT]
    at com.NewKafka.NewKafkaArtifact.StartKafka.start(StartKafka.java:11)[162:NewKafkaArtifact:0.0.1.SNAPSHOT]
    at org.apache.felix.framework.util.SecureAction.startActivator(SecureAction.java:697)[org.apache.felix.framework-5.4.0.jar:]
    at org.apache.felix.framework.Felix.activateBundle(Felix.java:2226)[org.apache.felix.framework-5.4.0.jar:]
    ... 16 more

I have tried setting the key.serializer and value.serializer like below:

props.put("key.serializer",StringSerializer.class.getName());
props.put("value.serializer",StringSerializer.class.getName());

Also like, But still getting the same error. What is I am doing wrong here.

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
6
hey @Sanjeev try using props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); I have posted full exampleVipul Gulhane

6 Answers

6
votes

Its issue with the version you are using. It was also suggested to version 0.8.2.2_1. Suggest you to adjust the version of kafka you are using and give a try. code wise, I cross checked many code samples in kafka dev list and seems like you have written in right way.

i.e Thread.currentThread().setContextClassLoader(null);

5
votes

I find the reason by reading the kafka client source code.

kafka client use Class.forName(trimmed, true, Utils.getContextOrKafkaClassLoader()) to get the Class object, and the create the instance, the key point is the classLoader, which is specified by the last param, the implementation of method Utils.getContextOrKafkaClassLoader() is

public static ClassLoader getContextOrKafkaClassLoader() {
    ClassLoader cl = Thread.currentThread().getContextClassLoader();
    if (cl == null)
        return getKafkaClassLoader();
    else
        return cl;
}

so, by default, the Class object of org.apache.kafka.common.serialization.StringSerializer is load by the applicationClassLoader, if your target class is not loaded by the applicationClassLoader, this problem will happend !

to solve the problem, simply set the ContextClassLoader of current thread to null before new KafkaProducer instance like this

Thread.currentThread().setContextClassLoader(null);
Producer<String, String> producer = new KafkaProducer(props);

hope my answer can let you know what happend .

4
votes

The issue appears to be with the class loader, as @Ram Ghadiyaram indicated in his answer. In order to get this working with kafka-clients 2.x, I had to do the following:

public Producer<String, String> createProducer() {
            ClassLoader original = Thread.currentThread().getContextClassLoader();
    Thread.currentThread().setContextClassLoader(null);
    Properties props = new Properties();

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
        BOOTSTRAP_SERVERS);
    ... etc ...

    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    Thread.currentThread().setContextClassLoader(original);
    return producer;

}

This allows the system to continue loading additional classes with the original classloader. This was needed in Wildfly/JBoss (the specific app I'm working with is Keycloak).

3
votes

try using these props instead of yours props.

  props.put("key.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");

  props.put("value.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");

Here is full Kafka Producer Example:-

import java.util.Properties; 
import org.apache.kafka.clients.producer.Producer;    
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class FxDateProducer {

   public static void main(String[] args) throws Exception{

      if(args.length == 0){
         System.out.println("Enter topic name”);
         return;
      }

      String topicName = args[0].toString(); 
      Properties props = new Properties();

      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");

      //Set acknowledgements for producer requests.      
      props.put("acks", “all");

      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);

      //Specify buffer size in config
      props.put("batch.size", 16384);

      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);

      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);

      props.put("key.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");

      props.put("value.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");

      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);

      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}
2
votes

Recently i found the solution. Setting the Thead Context loader to null resolved the issue for me. Thanks.

Thread.currentThread().setContextClassLoader(null);
Producer<String, String> producer = new KafkaProducer(props);
0
votes

It happens because of kafka-version issue. Make sure, you use the correct kafka version. The version that I used is 'kafka_2.12-1.0.1'

But try using below properties in your code .This fixed my issue.

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

Earlier I was using below properties which was causing the issue.

//props.put("key.serializer","org.apache.kafka.common.serialization.Stringserializer");
//props.put("value.serializer","org.apache.kafka.common.serialization.Stringserializer");