1
votes

I am using Kafka v0.10.0.0 and created Producer & Consumer Java code. But code is stuck on producer.send without any exception in logs.

Can anyone please help. Thank in advance.

I am using/modifying "mapr - kakfa sample program". You can look at the full code here. https://github.com/panwars87/kafka-sample-programs

**Important: I changed the kafka-client version to 0.10.0.0 in maven dependencies and running Kafka 0.10.0.0 in my local.

public class Producer {
public static void main(String[] args) throws IOException {
    // set up the producer
    KafkaProducer<String, String> producer;
    System.out.println("Starting Producers....");
    try (InputStream props = Resources.getResource("producer.props").openStream()) {
        Properties properties = new Properties();
        properties.load(props);
        producer = new KafkaProducer<>(properties);
        System.out.println("Property loaded successfully ....");
    }

    try {
        for (int i = 0; i < 20; i++) {
            // send lots of messages
            System.out.println("Sending record one by one....");
            producer.send(new ProducerRecord<String, String>("fast-messages","sending message - "+i+" to fast-message."));

            System.out.println(i+" message sent....");
            // every so often send to a different topic
            if (i % 2 == 0) {
                producer.send(new ProducerRecord<String, String>("fast-messages","sending message - "+i+" to fast-message."));
                producer.send(new ProducerRecord<String, String>("summary-markers","sending message - "+i+" to summary-markers."));
                producer.flush();
                System.out.println("Sent msg number " + i);
            }
        }
    } catch (Throwable throwable) {
        System.out.printf("%s", throwable.getStackTrace());
        throwable.printStackTrace();
    } finally {
        producer.close();
    }

  }
}

public class Consumer {
public static void main(String[] args) throws IOException {

    // and the consumer
    KafkaConsumer<String, String> consumer;
    try (InputStream props = Resources.getResource("consumer.props").openStream()) {
        Properties properties = new Properties();
        properties.load(props);
        if (properties.getProperty("group.id") == null) {
            properties.setProperty("group.id", "group-" + new Random().nextInt(100000));
        }
        consumer = new KafkaConsumer<>(properties);
    }
    consumer.subscribe(Arrays.asList("fast-messages", "summary-markers"));
    int timeouts = 0;
    //noinspection InfiniteLoopStatement
    while (true) {
        // read records with a short timeout. If we time out, we don't really care.
        ConsumerRecords<String, String> records = consumer.poll(200);
        if (records.count() == 0) {
            timeouts++;
        } else {
            System.out.printf("Got %d records after %d timeouts\n", records.count(), timeouts);
            timeouts = 0;
        }
        for (ConsumerRecord<String, String> record : records) {
            switch (record.topic()) {
                case "fast-messages":
                    System.out.println("Record value for fast-messages is :"+ record.value());            
                        break;
        case "summary-markers":
            System.out.println("Record value for summary-markers is :"+ record.value());
                        break;
                default:
                    throw new IllegalStateException("Shouldn't be possible to get message on topic ");
            }
        }
    }
   }
}
2
There's a lot going on there - loading configuration, a loop that sends multiple messages to multiple topics, a flush call, etc. Can you reduce this to something smaller that produces the bug and/or give more detail - where exactly does it "get stuck"? Does the first send succeed? The 2nd? How do you know it's stuck? Have you added logging to see which sends work and which don't.Oliver Dain
You mentioned that the producer got stuck but pasted the code for consumer?amethystic
I added both Producer & Consumer.PanwarS87
@OliverDain Thank you Oliver for highlighting the points. I am debugging the KafkaProducer code to find out the right error. I will reduce the code to one topic.PanwarS87

2 Answers

0
votes

The code you're running is for a demo of mapR which is not Kafka. MapR claims API compatibility with Kafka 0.9, but even then mapR treats message offsets differently that does Kafka (offsets are byte offsets of messages rather than incremental offsets), etc.. The mapR implementation is also very, very different to say the least. This means that if you're lucky, a Kafka 0.9 app might just happen to run on mapR and vise versa. There is no such guarantee for other releases.

0
votes

Thank you everyone for all your inputs. I resolved this by tweaking Mapr code and referring few other posts. Link for the solution api:

https://github.com/panwars87/hadoopwork/tree/master/kafka/kafka-api