My producer isnt throwing any errors but data is not being sent to the destination topic. Can you recommend any techniques to debug this situation.
I have call to a Confluent Python Avro Producer inside a synchronous loop to send data to a topic like so:
self.producer.produce(topic=test2, value=msg_dict)
After this call I have a piece of code like so to flush the queue:
num_messages_in_queue = self.producer.flush(timeout = 2.0)
print(f"flushed {num_messages_in_queue} messages from producer queue in iteration {num_iterations} ")
this executes without any error. But also there is no callback fired after this code executes. My producer is initiated as follows:
def __init__(self,broker_url=None,topic=None,schema_registry_url=None,schema_path=None):
try:
with open(schema_path, 'r') as content_file:
schema = avro.loads(content_file.read())
except Exception as e:
print(f"Error when trying to read avro schema file : {schema_path}")
self.conf = {
'bootstrap.servers': broker_url,
'on_delivery': self.delivery_report,
'schema.registry.url': schema_registry_url,
'acks': -1, #This guarantees that the record will not be lost as long as at least one in-sync replica remains alive.
'enable.idempotence': False, #
"error_cb":self.error_cb
}
self.topic = topic
self.schema_path = schema_path
self.producer = AvroProducer(self.conf,default_key_schema=schema, default_value_schema=schema)
My callback method is as follows:
def delivery_report(self, err, msg):
print(f"began delivery_report")
if err is None:
print(f"delivery_report --> Delivered msg.value = {msg.value()} to topic= {msg.topic()} offset = {msg.offset} without err.")
else:
print(f"conf_worker AvroProducer failed to deliver message {msg.value()} to topic {self.topic}. got error= {err}")
After this code is executed, I look at my topic on the schema registry container like so:
docker exec schema_registry_container kafka-avro-console-consumer --bootstrap-server kafka:29092 --topic test2 --from-beginning
I see this output:
[2020-04-03 15:48:38,064] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) [2020-04-03 15:48:38,742] INFO ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [kafka:29092] check.crcs = true client.dns.lookup = default client.id = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = console-consumer-49056 heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class >>org.apache.kafka.common.serialization.ByteArrayDeserializer (org.apache.kafka.clients.consumer.ConsumerConfig) [2020-04-03 15:48:38,887] INFO Kafka version : 2.1.0-cp1 (org.apache.kafka.common.utils.AppInfoParser) [2020-04-03 15:48:38,887] INFO Kafka commitId : bda8715f42a1a3db (org.apache.kafka.common.utils.AppInfoParser) [2020-04-03 15:48:39,221] INFO Cluster ID: KHKziPBvRKiozobbwvP1Fw (org.apache.kafka.clients.Metadata) [2020-04-03 15:48:39,224] INFO [Consumer clientId=consumer-1, groupId=console-consumer-49056] Discovered group coordinator kafka:29092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2020-04-03 15:48:39,231] INFO [Consumer clientId=consumer-1, groupId=console-consumer-49056] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2020-04-03 15:48:39,231] INFO [Consumer clientId=consumer-1, groupId=console-consumer-49056] (Re-)joining group >(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2020-04-03 15:48:42,264] INFO [Consumer clientId=consumer-1, groupId=console-consumer-49056] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2020-04-03 15:48:42,267] INFO [Consumer clientId=consumer-1, groupId=console-consumer-49056] Setting newly assigned partitions [test2-0] >(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2020-04-03 15:48:42,293] INFO [Consumer clientId=consumer-1, groupId=console-consumer-49056] Resetting offset for partition test2-0 to offset 0. >(org.apache.kafka.clients.consumer.internals.Fetcher)