5
votes

As per my understanding Kafka consumer reads messages from an assigned partition sequentially...

We are planning to have multiple Kafka consumer (Java) which has same group I'd ..so if it reads sequentially from an assigned partition then how we can achieve high throughput ..i.e. For Example Producer publishes messages like 40 per sec ... Consumer process msg 1 per sec ..though we can have multiple consumers but cannot have 40 rt??? Correct me if I'm wrong...

And in our case consumer have to commit offset only after message is processed successfully ..else message will be reprocessed... Is there any better solution???

3
Your question is a bit ambiguous, your title seems to suggest you're asking whether a Kafka Consumer can read in batch, but your actual question talks more about how to balance certain processing requirements in Kafka. Are you wanting to know if Consumers can batch messages? Or are you asking how to get around your Producer (40 per sec) and Consumer (1 per sec) time/processing requirements?Morgan Kenyon
i guess my question is related to the title...ill try to explain why...if single kafka consumer read messages in batch i can achieve similar throughput as producer.....so in my question i was asking that how to achieve high throughput with each consumer ...that can happen only if each consumer reads messages in batches...but as per my understanding kafka consumer reads messages sequentially from a partition...shiv455

3 Answers

16
votes

Based on your question clarification.

A Kafka Consumer can read multiple messages at a time. But a Kafka Consumer doesn't really read messages, its more correct to say a Consumer reads a certain number of bytes and then based on the size of the individual messages, that determines how many messages will be read. Reading through the Kafka Consumer Configs, you're not allowed to specify how many messages to fetch, you specify a max/min data size that a consumer can fetch. However many messages fit inside that range is how many you will get. You will always get messages sequentially as you have pointed out.

Related Consumer Configs (for 0.9.0.0 and greater)

  • fetch.min.bytes
  • max.partition.fetch.bytes

UPDATE

Using your example in the comments, "my understanding is if i specify in config to read 10 bytes and if each message is 2 bytes the consumer reads 5 messages at a time." That is true. Your next statement, "that means the offsets of these 5 messages were random with in partition" that is false. Reading sequential doesn't mean one by one, it just means that they remain ordered. You are able to batch items and have them remain sequential/ordered. Take the following examples.

In a Kafka log, if there are 10 messages (each 2 bytes) with the following offsets, [0,1,2,3,4,5,6,7,8,9].

If you read 10 bytes, you'll get a batch containing the messages at offsets [0,1,2,3,4].

If you read 6 bytes, you'll get a batch containing the messages at offsets [0,1,2].

If you read 6 bytes, then another 6 bytes, you'll get two batches containing the messages [0,1,2] and [3,4,5].

If you read 8 bytes, then 4 bytes, you'll get two batches containing the messages [0,1,2,3] and [4,5].

Update: Clarifying Committing

I'm not 100% sure how committing works, I've mainly worked with Kafka from a Storm environment. The provided KafkaSpout automatically commits Kafka messages.

But looking through the 0.9.0.1 Consumer APIs, which I would recommend you do to. There seems to be three methods in particular that are relevant to this discussion.

  • poll(long timeout)
  • commitSync()
  • commitSync(java.util.Map offsets)

The poll method retrieves messages, could be only 1, could be 20, for your example lets say 3 messages were returned [0,1,2]. You now have those three messages. Now it's up you to determine how to process them. You could process them 0 => 1 => 2, 1 => 0 => 2, 2 => 0 => 1, it just depends. However you process them, after processing you'll want to commit which tells the Kafka server you're done with those messages.

Using the commitSync() commits everything returned on last poll, in this case it would commit offsets [0,1,2].

On the other hand, if you choose to use commitSync(java.util.Map offsets), you can manually specify which offsets to commit. If you're processing them in order, you can process offset 0 then commit it, process offset 1 then commit it, finally process offset 2 and commit.

All in all, Kafka gives you the freedom to process messages how to desire, you can choose to process them sequentially or entirely random at your choosing.

1
votes

To achieve parallelism, which seems to be what you're asking, you use topic partitions (you split topic on N parts which are called partitions). Then, in the consumer, you spawn multiple threads to consume from those partitions.

On the Producer side, you publish messages to random partition (default) or you provide Kafka with some message attribute to calculate hash (if ordering is required), which makes sure that all msgs with the same hash go to the same partition.

EDIT (example of offset commit request):
This is how I did it. All methods that are not provided are non-essential.

 /**
   * Commits the provided offset for the current client (i.e. unique topic/partition/clientName combination)
   * 
   * @param offset
   * @return {@code true} or {@code false}, depending on whether commit succeeded
   * @throws Exception
   */
  public static boolean commitOffset(String topic, int partition, String clientName, SimpleConsumer consumer,
      long offset) throws Exception {
    try {
      TopicAndPartition tap = new TopicAndPartition(topic, partition);
      OffsetAndMetadata offsetMetaAndErr = new OffsetAndMetadata(offset, OffsetAndMetadata.NoMetadata(), -1L);
      Map<TopicAndPartition, OffsetAndMetadata> mapForCommitOffset = new HashMap<>(1);
      mapForCommitOffset.put(tap, offsetMetaAndErr);

      kafka.javaapi.OffsetCommitRequest offsetCommitReq = new kafka.javaapi.OffsetCommitRequest(
          ConsumerContext.getMainIndexingConsumerGroupId(), mapForCommitOffset, 1, clientName,
          ConsumerContext.getOffsetStorageType());

      OffsetCommitResponse offsetCommitResp = consumer.commitOffsets(offsetCommitReq);
      Short errCode = (Short) offsetCommitResp.errors().get(tap);
      if (errCode != 0) {
        processKafkaOffsetCommitError(tap, offsetCommitResp, BrokerInfo.of(consumer.host()));
        ErrorMapping.maybeThrowException(errCode);
      }
      LOG.debug("Successfully committed offset [{}].", offset);
    } catch (Exception e) {
      LOG.error("Error while committing offset [" + offset + "].", e);
      throw e;
    }
    return true;
  }
0
votes

You can consume the messages in batches and process them in a batched manner. batch.max.wait.ms (property) the consumer will wait this amount of time and polls for new message