1
votes

I am trying to print avro messages on a kafka topic using kafka-avro-console-consumer in a log4j format.

For that I use the following kafka-avro-console-consumer command:

 bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic avro-test -property print.key=true --formatter kafka.tools.LoggingMessageFormatter 

I have exported KAFKA_OPTS via the following command:

 export $KAFKA_OPTS= -Dlog4j.configuration=file:/path/to/file/kafka-console-consumer-log4j.properties

Now if I run regular kafka-console-consumer,using the following command:

bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic avro-test -property print.key=true --formatter kafka.tools.LoggingMessageFormatter

I am able to produce a log4j enabled output:

[2018-07-17 19:09:40,514] INFO [Consumer clientId=consumer-1, groupId=console-consumer-10597] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-07-17 19:09:40,522] INFO [Consumer clientId=consumer-1, groupId=console-consumer-10597] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-07-17 19:09:40,523] INFO [Consumer clientId=consumer-1, groupId=console-consumer-10597] Setting newly assigned partitions [avro-test-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-07-17 19:09:40,531] INFO [Consumer clientId=consumer-1, groupId=console-consumer-10597] Resetting offset for partition avro-test-0 to offset 23. (org.apache.kafka.clients.consumer.internals.Fetcher)

However this formatting option does not kick in if I use a avro consumer using the following command:

 bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic avro-test -property print.key=true --formatter kafka.tools.LoggingMessageFormatter 

It just resorts to a default formatter.

Is there something I may be missing here?

1

1 Answers

1
votes

I think if you override the --formatter, you won't get Avro messages anymore, as kafka.tools.LoggingMessageFormatter doesn't understand how to deserialize Avro

Ref - source code

DEFAULT_AVRO_FORMATTER="--formatter io.confluent.kafka.formatter.AvroMessageFormatter"

...

for OPTION in "$@"
do 
  case $OPTION in
    --formatter)
DEFAULT_AVRO_FORMATTER=""

...

exec $(dirname $0)/schema-registry-run-class kafka.tools.ConsoleConsumer $DEFAULT_AVRO_FORMATTER ...

so, it should run kafka.tools.ConsoleConsumer --formatter kafka.tools.LoggingMessageFormatter, as expected becuase the default is being unassigned, and schema-registry-run-class is defining KAFKA_OPTS, but you need to not have spaces or dollar signs on that line

export KAFKA_OPTS='-Dlog4j.configuration=file:/path/to/file/kafka-console-consumer-log4j.properties'
bin/kafka-avro-console-consumer ...