I am trying to consume from a Kafka Topic using confluent-kafka-python to a brand-new consumer-group(GROUP_NAME_CONNECT5 in the sample shown below). It does not seem to work unless I first consume using the kafka-console-consumer using this new consumer-group ! After I consume using the kafka-console-consumer just 1-time, then the confluent-kafka-python consumer works fine !! Any idea why ?
Properties:
[Kafka]
bootstrap_servers=
ssl_ca_location=
max_wait_cycles=30
group_name=GROUP_NAME_CONNECT5
client_id=CLIENT_ID
auto_commit_interval_ms=5000
Code:
kafkaConsumerConfig = {
'bootstrap.servers': config.BOOTSTRAP_SERVERS,
'group.id': config.GROUP_NAME,
'client.id': config.CLIENT_ID,
'session.timeout.ms': 60000,
'heartbeat.interval.ms': 3000,
'security.protocol': 'SASL_SSL',
'sasl.kerberos.service.name': 'kafka',
'sasl.mechanisms': 'GSSAPI',
'ssl.ca.location': config.SSL_CA_LOCATION,
'sasl.kerberos.kinit.cmd': 'kinit -S {0} {1} -k -t {2}'.format(config.KEYTAB_PRINCIPAL, config.KEYTAB_USER, config.KEYTAB_PATH),
'default.topic.config': {
'enable.auto.commit': 'false',
'enable.auto.offset.store': 'false'
}
c = Consumer(**self.kafkaConsumerConfig)
c.subscribe([self.TOPIC_NAME])
while True:
kafka_msg = c.poll(1.0)
process_message(kafka_msg)
Logs: after running the Python Consumer for 5 minutes (and then killed it). The Python-consumer was not able to consume any messages !
%7|1579623452.300|INIT|CLIENT_ID#consumer-1| [thrd:app]: librdkafka v1.2.1 (0x10201ff) CLIENT_ID#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,sasl_oauthbearer, GCC GXX PKGCONFIG INSTALL GNULD LDS LIBDL PLUGINS ZLIB SSL SASL_CYRUS HDRHISTOGRAM SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x2000) %7|1579623452.303|SUBSCRIBE|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5": subscribe to new subscription of 1 topics (join state init) %7|1579623452.303|REBALANCE|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5" is rebalancing in state init (join-state init) without assignment: unsubscribe %7|1579623453.441|JOIN|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5": postponing join until up-to-date metadata is available %7|1579623453.443|REJOIN|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5": subscription updated from metadata change: rejoining group %7|1579623453.443|REBALANCE|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5" is rebalancing in state up (join-state init) without assignment: group rejoin %7|1579623455.300|JOIN|CLIENT_ID#consumer-1| [thrd:main]: sasl_ssl://xxxx/159: Joining group "GROUP_NAME_CONNECT5" with 1 subscribed topic(s) %7|1579623458.305|ASSIGNOR|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5": "range" assignor run for 1 member(s) %7|1579623458.309|ASSIGN|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5": new assignment of 8 partition(s) in join state wait-assign-rebalance_cb %7|1579623458.309|OFFSET|CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/159: Fetch committed offsets for 8/8 partition(s) %7|1579623458.312|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [2] start fetching at offset 123698 %7|1579623458.313|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [7] start fetching at offset 116555 %7|1579623458.465|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [4] start fetching at offset 106800 %7|1579623458.484|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [1] start fetching at offset 107557 %7|1579623458.485|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [6] start fetching at offset 109805 %7|1579623458.486|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [0] start fetching at offset 91465 %7|1579623458.487|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [3] start fetching at offset 102042 %7|1579623458.487|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [5] start fetching at offset 117214
ran kafka-consumer-groups
../bin/kafka-consumer-groups.sh --bootstrap-server xxx --command-config producer.properties --group GROUP_NAME_CONNECT5 --describe
Consumer group 'GROUP_NAME_CONNECT5' has no active members.
then, I run the kafka-console-consumer using the same new consumer-group (GROUP_NAME_CONNECT5). It consumes all the messages in the Topic.
../bin/kafka-consumer-groups.sh --bootstrap-server xxx --command-config producer.properties --group GROUP_NAME_CONNECT5 --describe OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N Consumer group 'GROUP_NAME_CONNECT5' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID che_silo_cnsld_rpt_mthly 1 112644 112644 0 - - - che_silo_cnsld_rpt_mthly 5 123099 123099 0 - - - che_silo_cnsld_rpt_mthly 0 95715 95715 0 - - - che_silo_cnsld_rpt_mthly 3 106932 106932 0 - - - che_silo_cnsld_rpt_mthly 4 112588 112588 0 - - - che_silo_cnsld_rpt_mthly 7 122047 122047 0 - - - che_silo_cnsld_rpt_mthly 2 129940 129940 0 - - - che_silo_cnsld_rpt_mthly 6 115050 115050 0 - - -
Then I produce some new messages into the topic and run the python-consumer...and, this time the python-consumer runs successfully !
%7|1579624630.644|INIT|CLIENT_ID#consumer-1| [thrd:app]: librdkafka v1.2.1 (0x10201ff) CLIENT_ID#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,sasl_oauthbearer, GCC GXX PKGCONFIG INSTALL GNULD LDS LIBDL PLUGINS ZLIB SSL SASL_CYRUS HDRHISTOGRAM SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x2000) %7|1579624630.648|SUBSCRIBE|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5": subscribe to new subscription of 1 topics (join state init) %7|1579624630.648|REBALANCE|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5" is rebalancing in state init (join-state init) without assignment: unsubscribe %7|1579624631.807|JOIN|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5": postponing join until up-to-date metadata is available %7|1579624631.808|REJOIN|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5": subscription updated from metadata change: rejoining group %7|1579624631.808|REBALANCE|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5" is rebalancing in state up (join-state init) without assignment: group rejoin %7|1579624633.644|JOIN|CLIENT_ID#consumer-1| [thrd:main]: sasl_ssl://cilhdkfs0304.sys.cigna.com:9095/159: Joining group "GROUP_NAME_CONNECT5" with 1 subscribed topic(s) %7|1579624636.650|ASSIGNOR|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5": "range" assignor run for 1 member(s) %7|1579624636.654|ASSIGN|CLIENT_ID#consumer-1| [thrd:main]: Group "GROUP_NAME_CONNECT5": new assignment of 8 partition(s) in join state wait-assign-rebalance_cb %7|1579624636.654|OFFSET|CLIENT_ID#consumer-1| [thrd:main]: GroupCoordinator/159: Fetch committed offsets for 8/8 partition(s) %7|1579624636.656|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [0] start fetching at offset 91465 %7|1579624636.656|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [1] start fetching at offset 107557 %7|1579624636.656|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [2] start fetching at offset 123698 %7|1579624636.656|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [3] start fetching at offset 102042 %7|1579624636.656|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [4] start fetching at offset 106800 %7|1579624636.656|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [5] start fetching at offset 117214 %7|1579624636.656|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [6] start fetching at offset 109805 %7|1579624636.656|FETCH|CLIENT_ID#consumer-1| [thrd:main]: Partition che_silo_cnsld_rpt_mthly [7] start fetching at offset 116555
auto.offset.reset
... Can you consume something with Python then usekafka-consumer-groups
to describe it? Also, you're doing nothing with the polled message, and you may want to poll for longer than a second – OneCricketeer