I'm struggling with how to use the partition key mechanism properly. My logic is set the partition number as 3, then create three partition keys as "0", "1", "2", then use the partition keys to create three KeyedMessage such as
- KeyedMessage(topic, "0", message)
- KeyedMessage(topic, "1", message)
- KeyedMessage(topic, "2", message)
After this, creating a producer instance to send out all the KeyedMessage.
I expecting each KeyedMessage should enter to different partitions according to the different partition keys, which means
- KeyedMessage(topic, "0", message) go to Partition 0
- KeyedMessage(topic, "1", message) go to Partition 1
- KeyedMessage(topic, "2", message) go to Partition 2
I'm using Kafka-web-console to watch the topic status, but the result is not like what I'm expecting. KeyedMessage still go to partitions randomly, some times two KeyedMessage will enter the same partition even they have different partition keys.
To make my question more clear, I would like to post some Scala codes currently I have, and I'm using Kafka 0.8.2-beta, and Scala 2.10.4.
Here is the producer codes, I didn't use the custom partitioner.class :
val props = new Properties()
val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
props.put("compression.codec", codec.toString)
props.put("producer.type", if(synchronously) "sync" else "async")
props.put("metadata.broker.list", brokerList)
props.put("batch.num.messages", batchSize.toString)
props.put("message.send.max.retries", messageSendMaxRetries.toString)
props.put("request.required.acks",requestRequiredAcks.toString)
props.put("client.id",clientId.toString)
val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
def kafkaMesssage(message: Array[Byte], partition: Array[Byte]): KeyedMessage[AnyRef, AnyRef] = {
if (partition == null) {
new KeyedMessage(topic,message)
} else {
new KeyedMessage(topic,partition,message)
}
}
def send(message: String, partition: String = null): Unit = send(message.getBytes("UTF8"), if (partition == null) null else partition.getBytes("UTF8"))
def send(message: Array[Byte], partition: Array[Byte]): Unit = {
try {
producer.send(kafkaMesssage(message, partition))
} catch {
case e: Exception =>
e.printStackTrace
System.exit(1)
}
}
And here is how I use the producer, create a producer instance, then use this instance to send three message. Currently I create the partition key as Integer, then convert it to Byte Arrays:
val testMessage = UUID.randomUUID().toString
val testTopic = "sample1"
val groupId_1 = "testGroup"
print("starting sample broker testing")
val producer = new KafkaProducer(testTopic, "localhost:9092")
val numList = List(0,1,2);
for (a <- numList) {
// Create a partition key as Byte Array
var key = java.nio.ByteBuffer.allocate(4).putInt(a).array()
//Here I give a Array[Byte] key
//so the second "send" function of producer will be called
producer.send(testMessage.getBytes("UTF8"), key)
}
Not sure whether my logic is incorrect or I didn't understand the partition key mechanism correctly. Anyone could provides some sample code or explanation would be great!
stealthly/scala-kafka
library? It looks like a bug, can you please open an issue on github for that? I'll try to get this fixed by the end of the week or so. – serejja