0
votes

I am implementing a streaming application and one of the stateful operators is trying to capture a “owner has items” relationship. The state, keyed per owner consists of details about the owner and each of the items. Ownership of an item can change and I would like to be able associate each item to its correct owner. Since operator state for different owners could be in different subtasks and these subtasks are intended to operate independently, I want to know what’s the best way to share state. One solution that I was able to think of was to create a keyed datastream from the side output of a subtask and have it sent to the correct owner and clear the state from the original owner. Essentially:

  1. Subtask1 with state about OldOwner that has Item1, Item2, … , ItemN
  2. Subtask1 writes to a message to side output (OldOwner, NewOwner, List[ItemsToTransfer])
  3. (Optional) Clear state about List[ItemsToTransfer] from state about OldOwner.
  4. Create a datastream from the side output and send it back to the same operator, but potentially different subtask that has state about NewOwner.
  5. Update the state of NewOwner by adding the new set of items

Since side outputs are intended for a very different purpose (logging, etc.), I want to know whether this will work. Do the same fault tolerance guarantees apply to side outputs as to regular data streams? Is there a limit to how many side output messages can be buffered in a subtask?

An alternative approach might be to take the output of the first subtask and feed it back to the same operator. Both these approaches, in theory violate the property that a flink job is a DAG, although for my use case, there would never be a cyclic data transfer.

1

1 Answers

0
votes

Your proposal will create a cycle in the topology because you need to send the events from the side output back to the same/different subtask of the same operator.

What you effectively want is support for multi-key/multi-state transactions in Flink. Flink itself does not come with out-of-the-box support for it. However, Flink provides all the tools necessary to build such a feature on top of it. In fact, the streaming ledger does exactly this. It allows you to do multi-state transactions with exactly once processing guarantees on top of Flink.