I am running kafka, zookeeper and kafdrop on docker using the below docker-compose.yaml
:
version: "3.8"
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.0.2
container_name: zookeeper
ports:
- 2181:2181
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
volumes:
- ~/.docker/data/zookeeper:/var/lib/zookeeper
kafka:
image: confluentinc/cp-kafka:5.5.3
hostname: kafka
container_name: kafka
ports:
- 9092:9092
environment:
KAFKA_ADVERTISED_HOST_NAME: "${DOCKERHOST}"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://${DOCKERHOST}:9092"
KAFKA_ZOOKEEPER_CONNECT: "${DOCKERHOST}:2181"
KAFKA_LOG_DIRS: /kafka/logs
KAFKA_LOG_RETENTION_MINUTES: 10
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_MESSAGE_MAX_BYTES: 4194304
KAFKA_MAX_REQUEST_SIZE: 4194304
depends_on:
- zookeeper
volumes:
- ~/.docker/data/kafka/:/kafka/
extra_hosts:
- "dockerhost:$DOCKERHOST"
kafdrop:
image: obsidiandynamics/kafdrop
container_name: kafdrop
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka:9092"
depends_on:
- kafka
I have a Spring Boot Producer application with the below configuration - KafkaTopicConfig.java
:
@Configuration
public class KafkaTopicConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
}
And in my KafkaProducerConfig.java
I have the following:
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value(value = "${kafka.maxRequestSize}")
private String maxRequestSize;
@Value(value = "${kafka.batchSize}")
private String batchSize;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
configProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 120000);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
This is a separate application and I call the Kafka Producer like this in my service:
kafkaTemplate.send("test-topic-local", base64string);
In an all together different spring boot application, I have a consumer like this KafkaConsumerConfig.java
:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value(value = "${kafka.groupId}")
private String groupId;
public ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props);
}
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(groupId));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> testKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory(groupId);
}
}
And in my service, I have a KafkaListener
like below:
@Service
@Slf4j
public class KafkaConsumerService {
@KafkaListener(topics = "topic", containerFactory = "testKafkaListenerContainerFactory")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
log.info(consumerRecord.toString());
}
}
I can see the consumer is being connected to the broker but there are logs for the message. Below is the full log I can see:
2021-03-08 17:15:20.218 INFO 4805 --- [ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8081 (http)
2021-03-08 17:15:20.237 INFO 4805 --- [ restartedMain] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2021-03-08 17:15:20.238 INFO 4805 --- [ restartedMain] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.43]
2021-03-08 17:15:20.362 INFO 4805 --- [ restartedMain] o.a.c.c.C.[.[localhost].[/api/v1] : Initializing Spring embedded WebApplicationContext
2021-03-08 17:15:20.362 INFO 4805 --- [ restartedMain] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 2939 ms
2021-03-08 17:15:20.974 INFO 4805 --- [ restartedMain] o.hibernate.jpa.internal.util.LogHelper : HHH000204: Processing PersistenceUnitInfo [name: default]
2021-03-08 17:15:21.210 INFO 4805 --- [ restartedMain] org.hibernate.Version : HHH000412: Hibernate ORM core version 5.4.28.Final
2021-03-08 17:15:21.555 INFO 4805 --- [ restartedMain] o.hibernate.annotations.common.Version : HCANN000001: Hibernate Commons Annotations {5.1.2.Final}
2021-03-08 17:15:21.792 INFO 4805 --- [ restartedMain] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2021-03-08 17:15:22.351 INFO 4805 --- [ restartedMain] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2021-03-08 17:15:22.410 INFO 4805 --- [ restartedMain] org.hibernate.dialect.Dialect : HHH000400: Using dialect: org.hibernate.dialect.MySQL8Dialect
2021-03-08 17:15:22.938 INFO 4805 --- [ restartedMain] o.h.e.t.j.p.i.JtaPlatformInitiator : HHH000490: Using JtaPlatform implementation: [org.hibernate.engine.transaction.jta.platform.internal.NoJtaPlatform]
2021-03-08 17:15:22.964 INFO 4805 --- [ restartedMain] j.LocalContainerEntityManagerFactoryBean : Initialized JPA EntityManagerFactory for persistence unit 'default'
2021-03-08 17:15:23.125 WARN 4805 --- [ restartedMain] JpaBaseConfiguration$JpaWebConfiguration : spring.jpa.open-in-view is enabled by default. Therefore, database queries may be performed during view rendering. Explicitly configure spring.jpa.open-in-view to disable this warning
2021-03-08 17:15:23.362 INFO 4805 --- [ restartedMain] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2021-03-08 17:15:24.007 INFO 4805 --- [ restartedMain] o.s.b.d.a.OptionalLiveReloadServer : LiveReload server is running on port 35729
2021-03-08 17:15:24.017 INFO 4805 --- [ restartedMain] o.s.b.a.e.web.EndpointLinksResolver : Exposing 2 endpoint(s) beneath base path '/actuator'
2021-03-08 17:15:24.208 INFO 4805 --- [ restartedMain] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-group-01-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = group-01
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2021-03-08 17:15:24.453 INFO 4805 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.0
2021-03-08 17:15:24.454 INFO 4805 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 62abe01bee039651
2021-03-08 17:15:24.454 INFO 4805 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1615203924449
2021-03-08 17:15:24.459 INFO 4805 --- [ restartedMain] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-group-01-1, groupId=group-01] Subscribed to topic(s): test-topic-local
2021-03-08 17:15:24.479 INFO 4805 --- [ restartedMain] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2021-03-08 17:15:24.538 INFO 4805 --- [ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8081 (http) with context path '/api/v1'
2021-03-08 17:15:24.572 INFO 4805 --- [ restartedMain] c.r.i.t.m.f.FeedConsumerApplication : Started FeedConsumerApplication in 7.929 seconds (JVM running for 8.652)
2021-03-08 17:15:25.302 INFO 4805 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-group-01-1, groupId=group-01] Cluster ID: fq0PTZmwTnKQcCtUDMiQBg