4
votes

I'm using the 0.9 Kafka Java client in Scala.

scala> val kafkaProducer = new KafkaProducer[String, String](props)

ProducerRecord has several constructors that allow you to include or not include a key and/or partition.

scala> val keyedRecord = new ProducerRecord("topic", "key", "value")
scala> kafkaProducer.send(keyedRecord)

should have no problem.

However, an unkeyed ProducerRecord gives a type error.

scala> val unkeyedRecord = new ProducerRecord("topic", "value")
res8: org.apache.kafka.clients.producer.ProducerRecord[Nothing,String] =
        ProducerRecord(topic=topic, partition=null, key=null, value=value

scala> kafkaProducer.send(res8)
<console>:17: error: type mismatch;
 found   :   org.apache.kafka.clients.producer.ProducerRecord[Nothing,String]
 required: org.apache.kafka.clients.producer.ProducerRecord[String,String]
 Note: Nothing <: String, but Java-defined class ProducerRecord is invariant in type K.
 You may wish to investigate a wildcard type such as `_ <: String`. (SLS 3.2.10)
   kafkaProducer.send(res8)
                      ^

Is this against Kafka's rules or could it be an unnecessary precaution that has come from using this Java API in Scala?

More fundamentally, is it poor form to put keyed and unkeyed messages in the same Kafka topic?

Thank you

Javadoc: http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/package-summary.html


Edit

Could changing the variance of parameter K in KafkaProducer fix this?

1
I am not sure in the scala api, In the Java client when using a key in the KafkaRecord, kafka will try to resolved a partition for that record hashing that key by the number of partitions for that record. If no key is present it will assign partitions in a round-robin way. The only thing that this can cause is that some partitions have more data than others (so this is something that may affect the design of your consumers). But you can use it with out any "schema" restrictions. - Nautilus
Thanks. I've read how unkeyed records will be randomly assigned a partition, and so will not have the same ordering guarantee of keyed messages because there is no ordering guarantee between partitions. - Peter Becich
Also, this is the new Java client, used in Scala. I'm aware of the older Scala client, but am not using it. Thanks - Peter Becich
I suspect you hit this issue because the type annotations of ProducerRecord and KafkaProducer were inconsistent i.e. ProducerRecord should have been assigned the same type annotations as KafkaProducer. - Paul Carey

1 Answers

4
votes

It looks like the answer is in the comments, but to spell it out, Scala uses type inference when types are not explicitly provided. Since you wrote:

val unkeyedRecord = new ProducerRecord("topic", "value")

The key is not provided, and it becomes null, which Scala's type system infers is a Nothing instance. To fix that, declare the types explicitly:

val unkeyedRecord = new ProducerRecord[String,String]("topic", "value")