0
votes

I am writing a Kafka Producer using Spring Kafka 2.3.9 that suppose to publish around 200000 messages to a topic. For example, I have a list of 200000 objects that I fetched from a database and I want to publish json messages of those objects to a topic.

The producer that I have written is working fine for publishing, let's say, 1000 messages. Then it is creating some null pointer error(I have included the screen shot below).

During debugging, I found that the number of Kafka Producer Network Thread is very high. I could not count them but they are definitely more than 500.

I have read the thread Kafka Producer Thread, huge amound of threads even when no message is send and did a similar configuration by making producerPerConsumerPartition property false on DefaultKafkaProducerFactory. But still it is not decreasing the Kafka Producer Network Thread count.

Below are my code snippets, error and picture of those threads. I can't post all of the code segments since it is from a real project.

Code segments

public DefaultKafkaProducerFactory<String, String> getProducerFactory() throws IOException, IllegalStateException {
        Map<String, Object> configProps = getProducerConfigMap();
        DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(configProps);
        //defaultKafkaProducerFactory.transactionCapable();
        defaultKafkaProducerFactory.setProducerPerConsumerPartition(false);
        defaultKafkaProducerFactory.setProducerPerThread(false);
        return defaultKafkaProducerFactory;
    }
public Map<String, Object> getProducerConfigMap() throws IOException, IllegalStateException {   
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapAddress());
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.RETRIES_CONFIG, kafkaProperties.getKafkaRetryConfig());
        configProps.put(ProducerConfig.ACKS_CONFIG, kafkaProperties.getKafkaAcknowledgementConfig());
        configProps.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProperties.getKafkaClientId());
        configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 512 * 1024 * 1024);
        configProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10 * 1000);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //updateSSLConfig(configProps);
        return configProps;
    }

@Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        ProducerFactory<String, String> producerFactory = getProducerFactory();
        KafkaTemplate<String, String> kt = new KafkaTemplate<String, String>(stringProducerFactory, true);
        kt.setCloseTimeout(java.time.Duration.ofSeconds(5));
        return kt;
    }

Error

2020-12-07 18:14:19.249  INFO 26651 --- [onPool-worker-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=kafka-client-09f48ec8-7a69-4b76-a4f4-a418e96ff68e-1] Closing the Kafka producer with timeoutMillis = 0 ms.
2020-12-07 18:14:19.254 ERROR 26651 --- [onPool-worker-1] c.w.p.r.g.xxxxxxxx.xxx.KafkaPublisher   : Exception happened publishing to topic. Failed to construct kafka producer
2020-12-07 18:14:19.273  INFO 26651 --- [           main] ConditionEvaluationReportLoggingListener : 

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2020-12-07 18:14:19.281 ERROR 26651 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalStateException: Failed to execute CommandLineRunner
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:787) [spring-boot-2.2.8.RELEASE.jar:2.2.8.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768) [spring-boot-2.2.8.RELEASE.jar:2.2.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) [spring-boot-2.2.8.RELEASE.jar:2.2.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.2.8.RELEASE.jar:2.2.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) [spring-boot-2.2.8.RELEASE.jar:2.2.8.RELEASE]
    at xxx.xxx.xxx.Application.main(Application.java:46) [classes/:na]
Caused by: java.util.concurrent.CompletionException: java.lang.NullPointerException
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[na:1.8.0_144]
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[na:1.8.0_144]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) ~[na:1.8.0_144]
    at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) ~[na:1.8.0_144]
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[na:1.8.0_144]
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[na:1.8.0_144]
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[na:1.8.0_144]
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[na:1.8.0_144]
Caused by: java.lang.NullPointerException: null
    at com.xxx.xxx.xxx.xxx.KafkaPublisher.publishData(KafkaPublisher.java:124) ~[classes/:na]
    at com.xxx.xxx.xxx.xxx.lambda$0(Publisher.java:39) ~[classes/:na]
    at java.util.ArrayList.forEach(ArrayList.java:1249) ~[na:1.8.0_144]
    at com.xxx.xxx.xxx.xxx.publishData(Publisher.java:38) ~[classes/:na]
    at com.xxx.xxx.xxx.xxx.Application.lambda$0(Application.java:75) [classes/:na]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ~[na:1.8.0_144]
    ... 5 common frames omitted

Following is the code for publishing the message - line number 124 is when we actually call KafkaTemplate

public void publishData(Object object) {

        ListenableFuture<SendResult<String, String>> future = null;

        // Convert the Object to JSON
        String json = convertObjectToJson(object);

        // Generate unique key for the message
        String key = UUID.randomUUID().toString();

        // Post the JSON to Kafka
        try {
            future = kafkaConfig.kafkaTemplate().send(kafkaProperties.getTopicName(), key, json);
            
        } catch (Exception e) {
            logger.error("Exception happened publishing to topic. {}", e.getMessage());
        }

        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

                @Override
                public void onSuccess(SendResult<String, String> result) {
                    logger.info("Sent message with key=[" + key + "]");
                }

                @Override
                public void onFailure(Throwable ex) {
                    logger.error("Unable to send message=[ {} due to {}", json, ex.getMessage());
                }
        });
        kafkaConfig.kafkaTemplate().flush();
}

============================

Threads

I am not sure if this error is causing by those many network threads.

After posting the data, I have called KafkaTemplate flush method. It did not work. I also called ProducerFactory closeThreadBoundProducer, reset, destroy methods. None of them seems working.

Am I missing any configuration?

1
>Caused by: java.lang.NullPointerException: null at xxx - truncating the stack trace like that makes it useless; you need to show the root cause.Gary Russell
Sorry about that. Error is updated. com.xxx.xxx.xxx is the package name for the company where I work, which is not related to java, kafka etc.biswas
So the NPE is in your code. You need to show the code around line 124.Gary Russell
I added the actual code that I used for calling the publisher.biswas
The only way I can see an NPE occurring there is if kafkaConfig.kafkaTemplate() or kafkaProperties.getTopicName() returns null.Gary Russell

1 Answers

0
votes

The null pointer issue was not actually related to Spring Kafka. We were reading the topic name from a different location connected by a network. That network connection was failing for few cases and that caused null pointer issue which ultimately caused the above error.