4
votes

I'm currently evaluating Apache Kafka and I have a simple consumer that is supposed to read messages from a specific topic partition. Here is my client:

public static void main(String args[]) {

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

    TopicPartition partition0 = new TopicPartition("test_topic", Integer.parseInt(args[0]));

    ArrayList topicAssignment = new ArrayList();
    topicAssignment.add(partition0);
    consumer.assign(topicAssignment);

    //consumer.subscribe(Arrays.asList("test_topic"));
    int commitInterval = 200;
    List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            buffer.add(record);
            if (buffer.size() >= commitInterval) {
                process(buffer);
                consumer.commitSync();
                buffer.clear();
            }
        }
    }
}

static void process(List<ConsumerRecord<String, String>> buffers) {
   for (ConsumerRecord<String, String> buffer : buffers) {
       System.out.println(buffer);
   }
}

Here is the command that I use to start Apache Kafka:

bin/kafka-server-start.sh config/server.properties & bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic test_topic

As you can see here, I'm creating the topic with 2 partitions (p0 and p1)!

I'm then starting two instances of my consumer with the following commands:

For Consumer 1:

java -cp target/scala-2.11/kafka-consumer-0.1.0-SNAPAHOT.jar com.test.api.consumer.KafkaConsumer09Java 0

For Consumer 2:

java -cp target/scala-2.11/kafka-consumer-0.1.0-SNAPAHOT.jar com.test.api.consumer.KafkaConsumer09Java 1

Where 0 and 1 represent the actual partition from which I want my consumer's to read the messages from.

But what happens is that only my Consumer 1 is getting all the messages. I was under the impression that the messages from the producer end up equally on the partitions.

I used the following command to see how many partitions that I have for my topic test_topic:

Joes-MacBook-Pro:kafka_2.11-0.9.0.0 joe$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --broker-info --group test --topic test_topic --zookeeper localhost:2181
[2016-01-14 13:36:48,831] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group           Topic                          Pid Offset          logSize         Lag             Owner
test            test_topic                     0   10000           10000           0               none
BROKER INFO
0 -> 172.22.4.34:9092

Why is there only one partition even though I said to Kafka to create 2 partitions for the test_topic?

Here is my producer:

  def main(args: Array[String]) {
    //val conf = new SparkConf().setAppName("VPP metrics producer")
    //val sc = new SparkContext(conf)

    val props: Properties = new Properties()
      props.put("metadata.broker.list", "localhost:9092,localhost:9093")
      props.put("serializer.class", "kafka.serializer.StringEncoder")

    val config = new ProducerConfig(props)
    val producer = new Producer[String, String](config)

    1 to 10000 map {
      case i => 
        val jsonStr = getRandomTsDataPoint().toJson.toString
        println(s"sending message $i to kafka")
        producer.send(new KeyedMessage[String, String]("test_topic", jsonStr))
        println(s"sent message $i to kafka")
    }
  }
2
quite interesting! I also wanted to check this with no success. I've seen some more detailed output when there is an ownership for each partition but didnt know how to assign ownership...Jurudocs

2 Answers

4
votes

I'm not sure why you would have 1 partition if you created the topic with 2. Never happened to me, that's for sure.

Can you try this: bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic That should show you how many partitions are really there.

Then, if there's really 1 partition, maybe you could start over by creating a new topic with: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic test_topic_2

And then try: bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic_2 ... and report back the findings.

0
votes

You are just consuming from partition 0 but you also need to consume from partition 1. If you consume from 1 and commit you will see in column pid no also no 1.

But you also need a producer which writes into 1 also.