I have a few Kafka Streams application instances that I'd like to query remotely.
All of the instances are currently listening on a specified host:port pair, and each instance is able to query its own local state stores and communicate these values via a REST service.
+------------------+ +------------------+ +------------------+
| | | | | |
| instance1:9191 | | instance2:9292 | | instance3:9393 |
| | | | | |
+------------------+ +------------------+ +------------------+
I would like a separate application to be able to query the state stores in these instances:
consumer group separate application
+---------------------------------------+ _____
| [instance1] [instance2] [instance3] | <~------- | app |
+---------------------------------------+ -----
The separate app would utilize the same logic in StreamsMetadataState::getAllMetadataForStore
to get the all of the active host/port pairs for the running instances of my application, run the remote queries via the REST service, and aggregate the data inside it's own application logic.
However, I am having difficulty implementing this. Since the host/port pairs seem to be communicated via the consumer group protocol, it looks like I'm required to actually instantiate another Kafka Streams instance (i.e. another member of the consumer group) in order to take advantage of remote interactive queries.
My questions are:
- Is it possible to find the host/value pairs for all running instances of an application without also running a local Kafka Streams instance in the same consumer group? (I highlight running because I don't mind instantiating a dummy instance of the Kafka Streams app just to get the host/port meta data, but there is a
validateRunning
check that prevents me from doing so) - Are there problems with the above design (running a separate app to query all instances of a Kafka Streams app)? i.e. maybe the behavior I'm talking about isn't supported because what I'm doing has ramifications that I haven't considered yet?
It seems like maybe there should be a static method for getting state store meta data that would allow us to pass in the whatever values are being extracted from the builder object directly. i.e.
KafkaStreams::getMetaDataForStore(streamsConfig, storeName);