I am using Spring-Integration-Kafka and following is the sample for creating consumers dynamically to receive and print messages in the console. Consumer class:
public class Consumer1 {
private static final String CONFIG = "kafkaInboundMDCAdapterParserTests-context.xml";
static ClassPathXmlApplicationContext ctx;
public static void main(final String args[]) {
ctx = new ClassPathXmlApplicationContext(CONFIG, Consumer1.class);
ctx.start();
addConsumer("test19", "default8");
ctx = new ClassPathXmlApplicationContext(CONFIG, Consumer1.class);
ctx.start();
addConsumer("test19", "default10");
}
public static void addConsumer(String topicId, String groupId) {
MessageChannel inputChannel = ctx.getBean("inputFromKafka", MessageChannel.class);
ServiceActivatingHandler serviceActivator = new ServiceActivatingHandler(new MessageReceiver(), "processMessage");
((SubscribableChannel) inputChannel).subscribe(serviceActivator);
KafkaConsumerContext<String, String> kafkaConsumerContext = ctx.getBean("consumerContext", KafkaConsumerContext.class);
try {
TopicFilterConfiguration topicFilterConfiguration = new TopicFilterConfiguration(topicId, 1, false);
ConsumerMetadata<String,String> consumerMetadata = new ConsumerMetadata<String, String>();
consumerMetadata.setGroupId(groupId);
consumerMetadata.setTopicFilterConfiguration(topicFilterConfiguration);
consumerMetadata.setConsumerTimeout("1000");
consumerMetadata.setKeyDecoder(new AvroReflectDatumBackedKafkaDecoder<String>(java.lang.String.class));
consumerMetadata.setValueDecoder(new AvroReflectDatumBackedKafkaDecoder<String>(java.lang.String.class));
ZookeeperConnect zkConnect = ctx.getBean("zookeeperConnect", ZookeeperConnect.class);
ConsumerConfigFactoryBean<String, String> consumer = new ConsumerConfigFactoryBean<String, String>(consumerMetadata,
zkConnect);
ConsumerConnectionProvider consumerConnectionProvider = new ConsumerConnectionProvider(consumer.getObject());
MessageLeftOverTracker<String,String> messageLeftOverTracker = new MessageLeftOverTracker<String, String>();
ConsumerConfiguration<String, String> consumerConfiguration = new ConsumerConfiguration<String, String>(consumerMetadata, consumerConnectionProvider, messageLeftOverTracker);
kafkaConsumerContext.getConsumerConfigurations().put(groupId, consumerConfiguration);
} catch (Exception exp) {
exp.printStackTrace();
}
}
}
inbound config file:
<int:channel id="inputFromKafka"/>
<int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="localhost:2181"
zk-connection-timeout="6000"
zk-session-timeout="6000"
zk-sync-time="2000"/>
<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
kafka-consumer-context-ref="consumerContext"
auto-startup="false"
channel="inputFromKafka">
<int:poller fixed-delay="1" time-unit="MILLISECONDS"/>
</int-kafka:inbound-channel-adapter>
<bean id="kafkaReflectionDecoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaDecoder">
<constructor-arg type="java.lang.Class" value="java.lang.String"/>
</bean>
<int-kafka:consumer-context id="consumerContext"
consumer-timeout="1000"
zookeeper-connect="zookeeperConnect">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration group-id="default1"
value-decoder="kafkaReflectionDecoder"
key-decoder="kafkaReflectionDecoder"
max-messages="5000">
<int-kafka:topic id="mdc1" streams="1"/>
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
When i send any message to topic "test19", configured ServiceActivator "processMessage" method displayed two messages as configured two customers but the question here is i need to load inbound config file for each customer before adding to consumer context.. otherwise i am getting only one message in my console.. Is it correct way or do i need to change anything here?
Thanks.