I have multiple servers from where the messages will be produced, and I need broker and consumer at one server. If I have both producer and consumer running on same server then it works fine, but not sure what changes need to be done to keep producers separate. I don't want any dependency of zookeeper and kafka servers at producer servers as there are many and they will increase. I tried with changing bootstrap server to the broker/consumer server like 192.168.0.1:9092 while setting up KafkaProducer but still not able to generate messages. Not sure what am I missing, please help me out here. I have followed https://github.com/mapr-demos/kafka-sample-programs for code.
Tried running both producer and consumer on same server, it works fine.
Producer.java
public class Producer {
public static void main(String[] args) throws IOException {
// set up the producer
KafkaProducer<String, String> producer;
try (InputStream props = Resources.getResource("producer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
producer = new KafkaProducer<>(properties);
}
try {
for (int i = 0; i < 1000000; i++) {
// send lots of messages
producer.send(new ProducerRecord<String, String>(
"fast-messages",
String.format("{\"type\":\"test\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)));
// every so often send to a different topic
if (i % 1000 == 0) {
producer.send(new ProducerRecord<String, String>(
"fast-messages",
String.format("{\"type\":\"marker\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)));
producer.send(new ProducerRecord<String, String>(
"summary-markers",
String.format("{\"type\":\"other\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)));
producer.flush();
System.out.println("Sent msg number " + i);
}
}
} catch (Throwable throwable) {
System.out.printf("%s", throwable.getStackTrace());
} finally {
producer.close();
}
}
prodcuer.props
bootstrap.servers=192.168.0.1:9092
acks=all
retries=0
batch.size=16384
auto.commit.interval.ms=1000
linger.ms=0
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
block.on.buffer.full=true
Consumer.java
public class Consumer {
public static void main(String[] args) throws IOException {
// set up house-keeping
ObjectMapper mapper = new ObjectMapper();
Histogram stats = new Histogram(1, 10000000, 2);
Histogram global = new Histogram(1, 10000000, 2);
// and the consumer
KafkaConsumer<String, String> consumer;
try (InputStream props = Resources.getResource("consumer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
if (properties.getProperty("group.id") == null) {
properties.setProperty("group.id", "group-" + new Random().nextInt(100000));
}
consumer = new KafkaConsumer<>(properties);
}
consumer.subscribe(Arrays.asList("fast-messages", "summary-markers"));
int timeouts = 0;
//noinspection InfiniteLoopStatement
while (true) {
// read records with a short timeout. If we time out, we don't really care.
ConsumerRecords<String, String> records = consumer.poll(200);
if (records.count() == 0) {
timeouts++;
} else {
System.out.printf("Got %d records after %d timeouts\n", records.count(), timeouts);
timeouts = 0;
}
for (ConsumerRecord<String, String> record : records) {
switch (record.topic()) {
case "fast-messages":
// the send time is encoded inside the message
JsonNode msg = mapper.readTree(record.value());
switch (msg.get("type").asText()) {
case "test":
long latency = (long) ((System.nanoTime() * 1e-9 - msg.get("t").asDouble()) * 1000);
stats.recordValue(latency);
global.recordValue(latency);
break;
case "marker":
// whenever we get a marker message, we should dump out the stats
// note that the number of fast messages won't necessarily be quite constant
System.out.printf("%d messages received in period, latency(min, max, avg, 99%%) = %d, %d, %.1f, %d (ms)\n",
stats.getTotalCount(),
stats.getValueAtPercentile(0), stats.getValueAtPercentile(100),
stats.getMean(), stats.getValueAtPercentile(99));
System.out.printf("%d messages received overall, latency(min, max, avg, 99%%) = %d, %d, %.1f, %d (ms)\n",
global.getTotalCount(),
global.getValueAtPercentile(0), global.getValueAtPercentile(100),
global.getMean(), global.getValueAtPercentile(99));
stats.reset();
break;
default:
throw new IllegalArgumentException("Illegal message type: " + msg.get("type"));
}
break;
case "summary-markers":
break;
default:
throw new IllegalStateException("Shouldn't be possible to get message on topic " + record.topic());
}
}
}
}
}
consumer.props
bootstrap.servers=192.168.0.1:9092
group.id=test
enable.auto.commit=true
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# fast session timeout makes it more fun to play with failover
session.timeout.ms=10000
# These buffer sizes seem to be needed to avoid consumer switching to
# a mode where it processes one bufferful every 5 seconds with multiple
# timeouts along the way. No idea why this happens.
fetch.min.bytes=50000
receive.buffer.bytes=262144
max.partition.fetch.bytes=2097152