6
votes

I have a few Kafka Streams application instances that I'd like to query remotely.

All of the instances are currently listening on a specified host:port pair, and each instance is able to query its own local state stores and communicate these values via a REST service.

+------------------+  +------------------+  +------------------+
|                  |  |                  |  |                  |
|  instance1:9191  |  |  instance2:9292  |  |  instance3:9393  |
|                  |  |                  |  |                  |
+------------------+  +------------------+  +------------------+

I would like a separate application to be able to query the state stores in these instances:

             consumer group                         separate application
+---------------------------------------+              _____
|   [instance1] [instance2] [instance3] |  <~-------  | app |
+---------------------------------------+              -----

The separate app would utilize the same logic in StreamsMetadataState::getAllMetadataForStore to get the all of the active host/port pairs for the running instances of my application, run the remote queries via the REST service, and aggregate the data inside it's own application logic.

However, I am having difficulty implementing this. Since the host/port pairs seem to be communicated via the consumer group protocol, it looks like I'm required to actually instantiate another Kafka Streams instance (i.e. another member of the consumer group) in order to take advantage of remote interactive queries.

My questions are:

  • Is it possible to find the host/value pairs for all running instances of an application without also running a local Kafka Streams instance in the same consumer group? (I highlight running because I don't mind instantiating a dummy instance of the Kafka Streams app just to get the host/port meta data, but there is a validateRunning check that prevents me from doing so)
  • Are there problems with the above design (running a separate app to query all instances of a Kafka Streams app)? i.e. maybe the behavior I'm talking about isn't supported because what I'm doing has ramifications that I haven't considered yet?

It seems like maybe there should be a static method for getting state store meta data that would allow us to pass in the whatever values are being extracted from the builder object directly. i.e.

KafkaStreams::getMetaDataForStore(streamsConfig, storeName);
2
i have a similar use-case . But i am getting difficulties to write that separate app without instantiating a KStream application. Can you provide something if you got this work?Justin
I'm considering a similar approach, where a unified public "front door" (REST) API queries each local StateStore's, with different stored data, directly. Having a request possibly go through multiple host seems "wasteful", e.g. front door -> random partition instance -> actual partition instance.ballzak

2 Answers

5
votes
  • Is it possible to find the host/value pairs for all running instances of an application without also running a local Kafka Streams instance in the same consumer group? (I highlight running because I don't mind instantiating a dummy instance of the Kafka Streams app just to get the host/port meta data, but there is a validateRunning check that prevents me from doing so)

Why don't you add a new REST API method to your (first) Kafka Streams application that exposes the currently active host/port pairs to your second app? The app instances would be of course have this information readily available.

The second app -- "the separate app" -- can then query any of the (first) Kafka Streams app instances via this REST method to discover all the running app instances. Here, you wouldn't need to run a dummy KafkaStreams instance in the second app at all.

  • Are there problems with the above design (running a separate app to query all instances of a Kafka Streams app)? i.e. maybe the behavior I'm talking about isn't supported because what I'm doing has ramifications that I haven't considered yet?

See above. Nothing stops you from adding further methods to the REST layer of the Kafka Streams application. After all, the part of your (first) application that uses the Kafka Streams API doesn't need to be the only part of that app. :-) I think your problem might be that your thinking is kinda "boxed in" in the sense that you feel obliged to having to do everything in your app through the Kafka Streams API -- but this is not so. After all, one of the motivations behind the Kafka Streams API was to let you mix it with other APIs and libraries that you would like to leverage in your application.

1
votes

There is no API support.

The recommended pattern would be, to add a second RPC (different port than IQ port, let's call it bootstrap-port) to all instances. Thus, it would be sufficient to know one host/bootstrap-port pair to get the information of all IQ host/port pairs.

Update

You could also use a Kafka topic to propagate host/port information. Each instance writes it's host/port into the topic at startup and you read this information from there. The tricky part would be, to expire/delete from the topic. If you enable compaction, each instance could write a tombstone message for it's host/port message. However, if an instance crashes, you would get old information that is not deleted.

On the other hand, you could combine this and the first approach (ie, write host/bootstrap-port instead of IQ host/port to the topic). It's sufficient to get one valid host/bootstrap-port -- if you use an invalid one, your request would time out and you can write a tombstone from you query app to clean up.

End Update

If this does not work either, you could "hack" around this limitation. After a successful rebalance, all metadata is stored in topic __consumer_offsets and you could theoretically read the information from there and extract all host/port pairs. However, you would rely on internal implementation details and thus your code might break on an upgrade.