0
votes

I have a datastream in which the order of the events is important. The time characteristic is set to EventTime as the incoming records have a timestamp within them.

In order to guarantee the ordering, I set the parallelism for the program to 1. Could that become a problem, performance wise, when my program gets more complex?

If I understand correctly, I need to assign watermarks to my events, if I want to keep the stream ordered by timestamp. This is quite simple. But I'm reading that even that doesn't guarantee order? Later on, I want to do stateful computations over that stream. So, for that I use a FlatMap function, which needs the stream to be keyed. But if I key the stream, the order is lost again. AFAIK this is because of different stream partitions, which are "caused" by parallelism.

I have two questions:

  • Do I need parallelism? What factors do I need to consider here?
  • How would I achieve "ordered parallelism" with what I described above?
1
Explaining all of the possibilities is a bit much, so if you provide more context, it will be easier to respond. From where are you ingesting this data? To what extent is it ordered at the source? What do you want to do with it, and why does the stream need to be ordered (by timestamp, I assume)? Does there need to be a global ordering, or is it enough that it be ordered for each key?David Anderson
@DavidAnderson I'm getting the data from NiFi in JSON format, which I parse into a POJO via a map function. The data is real time data, so, using NiFi's queueing, it's ordered by timestamp. I want to do computations over the data, e.g. maxima, minima, averages. I also want to do stateful computations, as in calculating how long the machine has been running/not running. For that I feed the data's timestamps into two arrays (one for production time and one for non production time) and respectively subtract the earliest timestamp from the latest. (Non-)production is detected by machine speed.Necrophades

1 Answers

1
votes

Several points to consider:

Setting the parallelism to 1 for the entire job will prevent scaling your application, which will affect performance. Whether this actually matters depends on your application requirements, but it would certainly be limitation, and could be a problem.

If the aggregates you've mentioned are meant to be computed globally across all the event records then operating in parallel will require doing some pre-aggregation in parallel. But in this case you will then have to reduce the parallelism to 1 in the later stages of your job graph in order to produce the ultimate (global) results.

If on the other hand these aggregates are to be computed independently for each value of some key, then it makes sense to consider keying the stream and to use that partitioning as the basis for operating in parallel.

All of the operations you mention require some state, whether computing max, min, averages, or uptime and downtime. For example, you can't compute the maximum without remembering the maximum encountered so far.

If I understand correctly how Flink's NiFi source connector works, then if the source is operating in parallel, keying the stream will result in out-of-order events.

However, none of the operations you've mentioned require that the data be delivered in-order. Computing uptime (and downtime) on an out-of-order stream will require some buffering -- these operations will need to wait for out-of-order data to arrive before they can produce results -- but that's certainly doable. That's exactly what watermarks are for; they define how long to wait for out-of-order data. You can use an event-time timer in a ProcessFunction to arrange for an onTimer callback to be called when all earlier events have been processed.

You could always sort the keyed stream. Here's an example.

The uptime/downtime calculation should be easy to do with Flink's CEP library (which sorts its input, btw).

UPDATE:

It is true that after applying a ProcessFunction to a keyed stream the stream is no longer keyed. But in this case you could safely use reinterpretAsKeyedStream to inform Flink that the stream is still keyed.

As for CEP, this library uses state on your behalf, making it easier to develop applications that need to react to patterns.