0
votes

I am implementing a simple Kafka consumer in Java. Here is the code:

public class TestConsumer {

    public static void main(String []a) throws Exception{
        Properties props = new Properties();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("partition.assignment.strategy", "round-robin");
        props.put("group.id", "test");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        try{
            consumer.subscribe("ay_sparktopic");
            Map<String, ConsumerRecords<String, String>> msg = consumer.poll(100);
            System.out.println(msg);                
        }catch(Exception e){
            System.out.println("Exception");
        }
    }
}

Above consumer gives following error message:

16/03/30 18:01:07 WARN ConsumerConfig: The configuration group.id = test was supplied but isn't a known config. 16/03/30 18:01:07 WARN ConsumerConfig: The configuration partition.assignment.strategy = round-robin was supplied but isn't a known config.

Any documentation I check online gives either range or roundrobin as possible assignment strategies and groupId is a custom name to my knowledge. Not sure what would be right config values here.

2

2 Answers

2
votes

It looks like you´re trying to use the new consumer API that´s only available in Kafka 0.9+. To use the older API you have to import classes from the kafka.javaapi.consumer.* package instead of the new org.apache.kafka.clients.consumer package.

consumer.subscribe and consumer.poll relates to the new API so if you really want to use the old API, you need to change your code accordingly. If you instead want to use the new consumer API, you need to run Kafka 0.9 or later.

0
votes

Using the below dependency resolves the issue.

libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.9.0.0"

Even when you are having previous version running E.g.,kafka 0.8.2.1.