0
votes

I want to send Json Object to my kafka topic but I am facing some problem

I use pojo with single instance variable as fileName where I am setting the filename and sending to Kafka Topic.

 KafkaJsonSend objSend= new KafkaJsonSend();
            objSend.setFileName(filename);

          //Configure the Producer
            Properties configProperties = new Properties();
            configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.10.51.10:9092");
            configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);

            Producer<String, JsonNode> producer = new KafkaProducer<String, JsonNode>(configProperties);

            JsonNode  jsonNode = objectMapper.valueToTree(objSend);
            ProducerRecord<String, JsonNode> rec = new 

            ProducerRecord<String, JsonNode>("BlueShifts",jsonNode);
            producer.send(rec);

            producer.close();

But when I run this code I am getting exception which is continuously getting logged in my console.

Error: Uncaught error in kafka producer I/O thread

IllegalStateException: No entry found for connection 0

Kafka Error in java

I have also tried with spring kafka but I got this in console

2019-02-04 16:43:13.938  INFO 4432 --- [nio-6020-exec-1]
o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
        acks = 1
        batch.size = 16384
        block.on.buffer.full = false
        bootstrap.servers = [10.0.2.15:9092]
        buffer.memory = 33554432
        client.id =
        compression.type = none
        connections.max.idle.ms = 540000
        interceptor.classes = null
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.fetch.timeout.ms = 60000
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 0
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        timeout.ms = 30000
        value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer

2019-02-04 16:43:14.158  INFO 4432 --- [nio-6020-exec-1]
o.a.kafka.common.utils.AppInfoParser     : Kafka  version : 0.10.2.0
2019-02-04 16:43:14.160  INFO 4432 --- [nio-6020-exec-1]
o.a.kafka.common.utils.AppInfoParser     : Kafka  commitId :
576d93a8dc0cf421 2019-02-04 16:44:14.206 ERROR 4432 ---
[nio-6020-exec-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and
payload='KafkaJsonSend [fileName=a0a7caf336e8481fb
6db2de70d39029e_1549278789987.mp3]' to topic BlueShifts:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
1
When I am using Spring Kafka Template I am getting Exception thrown when sending a message with key='null' and payload='KafkaJsonSend [fileName=9aa7b9d2181b4137 b1f86762357e036d_1549274276827.mp3]' to topic BlueShifts: - Kramer
can you post entire stack trace when using spring kafka template ? - kalimba
@kalimba added in my question at the bottom - Kramer
thanks for showing error. can you confirm once kafka broker is actually running/accessible on 10.10.51.10. If possible to log in to machine, try to monitor broker logs also. Timeout exception usually is indicator of some connection problem. - kalimba
@kalimba pasteboard.co/HZAOcR0.png I have uploaded the broker list here. And how do I check if it's running on 10.10.51.10 ? - Kramer

1 Answers

0
votes

Worked added 10.10.51.10 kafka at C:\Windows\System32\drivers\etc in hosts file and it worked in both ways....can you tell why it's mapped with kafka why it couldn't find with ip

You need to configure your Kafka brokers correctly for your network: https://rmoff.net/2018/08/02/kafka-listeners-explained/