0
votes

To ask my question i first have to show you my data and my proposed solution to the dual key problem:

Data has 1 of 2 keys x and y. Sometimes x is pressent sometimes y. One type of event has both.

  • Type 1: Key x and y
  • Type 2: key x
  • Type 3: Key y

To have the full session at the end of the pipeline we need all data under one key: x+y.

To achieve this, I copy the messages with both keys and key one of them by x and the other by y. Then in the following Processor I enrich type x and y.

Each message looks like this: [Flink key, potentialX, potentialY, rest of msg...]

Pipeline

Here is my scenario: I have a close session message which is type 2. This will be propagated to the key X processor. Here it will be enriched and we can shut down appropriate processors in the rest of the pipeline. However key y is never evicted because it never gets the close session message.

Close msg flow

Now for the question: How can i close the state in the Y processor?

Initially i thought to duplicate the type 2 msg in the enricher, and make a sideoutput for it, grab that sideoutput before the keyby, and therefore have it go to the correct processor. This is not possible as the sideoutput can only be used after the processor where it was created. Then i found some jira-tickets about side-inputs, but that seems to not be an actual feature yet.

Lastly i thought i might make a sink for the sideoutput mentioned above, and a source at the keyby. This seems a bit hacky tho.

I really hope someone can help!

Edit:

Adding new diagram, to try to clarify the original flow. In the original drawings i tried to make make the flow of data easier to understand by making 2 boxes for the Enrichment processor. I've tried to make the flow more correct with this new drawing:

Improved drawing

1

1 Answers

0
votes

That's a bit complicated to follow, but I've seen this pattern before when trying to unify logged-out sessions with logged-in sessions from web logs. If I've understood the details well enough, I think you could take a side output from the X processor, and feed it into the Y processor, like this:

                             +------------+                          +-------+
                             |            +-------------------------->       |
+--------+     +-------+  X  |   X proc   |                          |       |
|        |     |       +----->            | sideout +-----------+    | X + Y |
|        |     |       |     |            +--------->           |    |       |
| source +-----> split |     +------------+         |           +---->       |
|        |     |       |                            |  Y proc   |    +-------+
|        |     |       +---------------------------->           |
+--------+     +-------+             Y              +-----------+