I have this topology:
Topology topology = new Topology();
//WS connection processor
topology.addSource(WS_CONNECTION_SOURCE, new StringDeserializer(), new WebSocketConnectionEventDeserializer(), KafkaTopics.WS_CONNECTION_EVENTS_TOPIC)
.addProcessor(SESSION_PROCESSOR, WSUserSessionProcessor::new, WS_CONNECTION_SOURCE)
.addStateStore(sessionStoreBuilder, SESSION_PROCESSOR)
.addSink(WS_STATUS_SINK, KafkaTopics.WS_USER_ONLINE_STATUS_TOPIC, stringSerializer, stringSerializer, SESSION_PROCESSOR)
//WS session routing
.addSource(WS_ROUTING_BY_SESSION_SOURCE, new StringDeserializer(), new StringDeserializer(),
KafkaTopics.WS_DELIVERY_TOPIC)
.addProcessor(WS_ROUTING_BY_SESSION_PROCESSOR, WSSessionRoutingProcessor::new,
WS_ROUTING_BY_SESSION_SOURCE)
.addStateStore(userConnectedNodesStoreBuilder, WS_ROUTING_BY_SESSION_PROCESSOR, SESSION_PROCESSOR)
//WS delivery
.addSource(WS_DELIVERY_SOURCE, new StringDeserializer(), new StringDeserializer(),
INSTANCE_SPECIFIC_TOPIC)
.addProcessor(WS_DELIVERY_PROCESSOR, WSDeliveryProcessor::new, WS_DELIVERY_SOURCE);
The source mentioned last in the topology is topic specific to each app instance. I want that topic to be processed only by that instance. The data to this topic is pushed by the previous processor based on which instance has to process that message.
But once stream is started, it is trying to assign instance specific topic partitions also to other instances. Can we achieve this requirement in Kafka streams?
I want one topic to be processed only by specific instance.
if (specificInstance) { topology. addSource(.., INSTANCE_SPECIFIC_TOPIC) }
– Vasyl Sarzhynskyi