8
votes

I'm trying to design a streaming architecture for streaming analytics. Requirements:

  • RT and NRT streaming data input
  • Stream processors implementing some financial analysis
  • RT and NRT analysis output stream
  • Reference data requests during stream processing

I'm exploring Kafka and Kafka Streams for stream processing and RT/NRT realtime messaging. My question is: I need to perform some query to external systems (info providers, MongoDB etc etc) during stream pocessing. These queries could be both sync and async req-response, based on the external system characteristics.

I've read this post explaining how to join KStream and KTable during processing and it's very interesting but in this scenario KTable is not dependant on input parameters coming from the KStream, it's just a streaming representation of a table.

I need to query the external system foreach KStream message, passing some message fields as query parameters and enrich the streaming message with query result, then publish the enriched message to an output topic. Is there any consolidated paradigm to design this stream processing? Is there any specific technology I'd better to use? Remember that queries can be sync and async.

I'd also like to design wrappers to these external systems, implementing a sort of distributed RPC, callable from a Kafka Stream processing. Could you suggest any technology/framework? I was considering Akka actors for distributing query responders but I can't understand if Akka fits well with the request-response paradigm.

Thanks

1

1 Answers

14
votes

About the querying pattern to external systems, there are multiple possibilities you have:

  1. Recommended: Use Kafka Connect to import your data from external systems into Kafka, and read those topics as KTables to do the KStream-KTable lookup join.
  2. You can implement your own custom lookup join within your UDF code. Depending on the details, you can use KStream methods #mapValues(), #map(), or lower level methods like #transform() or #process(). Thus, you manually open a connection to your external system and issue a lookup query for each record you process.
    • sync lookups: if you do sync calls to external systems there is nothing else you need to consider (you can use #mapValues() for example to implement this)
    • async lookpus: for async calls to external systems, it's more tricky to get right (and you should be quite careful -- it's not a recommended pattern, because there is no library support at the moment). First, you need to remember all async calls you issue in a reliable way (ie, you need to attach a state and write each request you want to issue into the state before you actually fire it up). Second, on each callback, you need to buffer the result somehow, and process it later when the same operator issuing the request is called again (it's not possible to produce a downstream result in an async callback handler, but only within UDF code). After downstream emit, you can remove the request from the state. Third, in recovery after a failure case, you need to check your state for unfinished requests and issue those request again. Also keep in mind, that this kind of async processing, breaks some internal Streams assumptions, like guaranteed processing order with regard to record topic offsets.

Compare this question about failure handling in streams with regard to offset commits: How to handle error and don't commit when use Kafka Streams DSL