0
votes

I am trying to consume from a Kafka Topic using confluent-kafka-python to a brand-new consumer-group(GROUP_NAME_CONNECT5 in the sample shown below). It does not seem to work unless I first consume using the kafka-console-consumer using this new consumer-group ! After I consume using the kafka-console-consumer just 1-time, then the confluent-kafka-python consumer works fine !! Any idea why ?

Properties:

[Kafka]

bootstrap_servers=

ssl_ca_location=

max_wait_cycles=30

group_name=GROUP_NAME_CONNECT5

client_id=CLIENT_ID

auto_commit_interval_ms=5000

Code:

kafkaConsumerConfig = {
    'bootstrap.servers': config.BOOTSTRAP_SERVERS,
    'group.id': config.GROUP_NAME,
    'client.id': config.CLIENT_ID,
    'session.timeout.ms': 60000,
    'heartbeat.interval.ms': 3000,

    'security.protocol': 'SASL_SSL',
    'sasl.kerberos.service.name': 'kafka',

    'sasl.mechanisms': 'GSSAPI',
    'ssl.ca.location': config.SSL_CA_LOCATION,
    'sasl.kerberos.kinit.cmd': 'kinit -S {0} {1} -k -t {2}'.format(config.KEYTAB_PRINCIPAL, config.KEYTAB_USER, config.KEYTAB_PATH),
    'default.topic.config': {
        'enable.auto.commit': 'false',
        'enable.auto.offset.store': 'false'
    }

c = Consumer(**self.kafkaConsumerConfig)

c.subscribe([self.TOPIC_NAME])

while True:
    kafka_msg = c.poll(1.0)
    process_message(kafka_msg)

Logs: after running the Python Consumer for 5 minutes (and then killed it). The Python-consumer was not able to consume any messages !

%7|1579623452.300|INIT|CLIENT_ID#consumer-1| [thrd:app]: librdkafka v1.2.1 (0x10201ff) CLIENT_ID#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,sasl_oauthbearer, GCC GXX PKGCONFIG INSTALL GNULD LDS LIBDL PLUGINS ZLIB SSL SASL_CYRUS HDRHISTOGRAM SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x2000) %7|1579623452.303|SUBSCRIBE|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5": subscribe to new subscription of 1 topics (join state init) %7|1579623452.303|REBALANCE|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5" is rebalancing in state init (join-state init) without assignment: unsubscribe %7|1579623453.441|JOIN|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5": postponing join until up-to-date metadata is available %7|1579623453.443|REJOIN|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5": subscription updated from metadata change: rejoining group %7|1579623453.443|REBALANCE|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5" is rebalancing in state up (join-state init) without assignment: group rejoin %7|1579623455.300|JOIN|CLIENT_ID#consumer-1| [thrd:main]: sasl_ssl://xxxx/159: Joining group "GROUP_NAME_CONNECT5" with 1 subscribed topic(s) %7|1579623458.305|ASSIGNOR|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5": "range" assignor run for 1 member(s) %7|1579623458.309|ASSIGN|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5": new assignment of 8 partition(s) in join state wait-assign-rebalance_cb %7|1579623458.309|OFFSET|CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/159: Fetch committed offsets for 8/8 partition(s) %7|1579623458.312|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [2] start fetching at offset 123698 %7|1579623458.313|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [7] start fetching at offset 116555 %7|1579623458.465|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [4] start fetching at offset 106800 %7|1579623458.484|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [1] start fetching at offset 107557 %7|1579623458.485|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [6] start fetching at offset 109805 %7|1579623458.486|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [0] start fetching at offset 91465 %7|1579623458.487|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [3] start fetching at offset 102042 %7|1579623458.487|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [5] start fetching at offset 117214

ran kafka-consumer-groups

../bin/kafka-consumer-groups.sh --bootstrap-server xxx --command-config producer.properties --group GROUP_NAME_CONNECT5 --describe

Consumer group 'GROUP_NAME_CONNECT5' has no active members.

then, I run the kafka-console-consumer using the same new consumer-group (GROUP_NAME_CONNECT5). It consumes all the messages in the Topic.

../bin/kafka-consumer-groups.sh --bootstrap-server xxx --command-config producer.properties --group GROUP_NAME_CONNECT5 --describe OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N Consumer group 'GROUP_NAME_CONNECT5' has no active members.

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID che_silo_cnsld_rpt_mthly 1 112644 112644 0 - - - che_silo_cnsld_rpt_mthly 5 123099 123099 0 - - - che_silo_cnsld_rpt_mthly 0 95715 95715 0 - - - che_silo_cnsld_rpt_mthly 3 106932 106932 0 - - - che_silo_cnsld_rpt_mthly 4 112588 112588 0 - - - che_silo_cnsld_rpt_mthly 7 122047 122047 0 - - - che_silo_cnsld_rpt_mthly 2 129940 129940 0 - - - che_silo_cnsld_rpt_mthly 6 115050 115050 0 - - -

Then I produce some new messages into the topic and run the python-consumer...and, this time the python-consumer runs successfully !

%7|1579624630.644|INIT|CLIENT_ID#consumer-1| [thrd:app]: librdkafka v1.2.1 (0x10201ff) CLIENT_ID#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,sasl_oauthbearer, GCC GXX PKGCONFIG INSTALL GNULD LDS LIBDL PLUGINS ZLIB SSL SASL_CYRUS HDRHISTOGRAM SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x2000) %7|1579624630.648|SUBSCRIBE|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5": subscribe to new subscription of 1 topics (join state init) %7|1579624630.648|REBALANCE|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5" is rebalancing in state init (join-state init) without assignment: unsubscribe %7|1579624631.807|JOIN|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5": postponing join until up-to-date metadata is available %7|1579624631.808|REJOIN|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5": subscription updated from metadata change: rejoining group %7|1579624631.808|REBALANCE|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5" is rebalancing in state up (join-state init) without assignment: group rejoin %7|1579624633.644|JOIN|CLIENT_ID#consumer-1| [thrd:main]: sasl_ssl://cilhdkfs0304.sys.cigna.com:9095/159: Joining group "GROUP_NAME_CONNECT5" with 1 subscribed topic(s) %7|1579624636.650|ASSIGNOR|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5": "range" assignor run for 1 member(s) %7|1579624636.654|ASSIGN|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5": new assignment of 8 partition(s) in join state wait-assign-rebalance_cb %7|1579624636.654|OFFSET|CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/159: Fetch committed offsets for 8/8 partition(s) %7|1579624636.656|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [0] start fetching at offset 91465 %7|1579624636.656|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [1] start fetching at offset 107557 %7|1579624636.656|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [2] start fetching at offset 123698 %7|1579624636.656|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [3] start fetching at offset 102042 %7|1579624636.656|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [4] start fetching at offset 106800 %7|1579624636.656|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [5] start fetching at offset 117214 %7|1579624636.656|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [6] start fetching at offset 109805 %7|1579624636.656|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [7] start fetching at offset 116555

1
Please show your code and the configurationsOneCricketeer
Are you using Kafka authorization (ACLs)?mazaneicha
added code, so hopefully makes more sense now ?Sanjay Das
Did you already try to debug your consumer? To debug your consumer, you should add a key to kafkaConsumerConfig: "debug": "consumer"ffosilva
You didn't set auto.offset.reset... Can you consume something with Python then use kafka-consumer-groups to describe it? Also, you're doing nothing with the polled message, and you may want to poll for longer than a secondOneCricketeer

1 Answers

0
votes

I guess your problem is related to 'auto.offset.reset' configuration. When your consumer joins the cluster, this configuration defines from which offset your consumer will start consuming. Try putting:

"auto.offset.reset": "earliest"

This configuration will make your consumer start consuming from the first (earliest) offset/message. The default configuration is "largest", it means the consumer will start consuming when there are new messages available on the topic. Check this for more details.