1
votes

Assuming my Flink application has 3 components: Source, Map, and Sink. Due to some reasons (e.g. calling an API has very high latency), the sink needs to have very high parallelism (e.g. 20). Also assuming that Source and Map take very little CPU/IO. We know that the minimum available slots should be at least at large as the maximum parallelism of the application, in this case it is 20. There will be 2 ways to deploy this application:

  1. If I already have a Flink cluster, deploying this application will take up 20 slots. However, my Source and Map don't need much resources, so these 20 slots will be idle most of the time (waiting because sink has high latency for calling the API). In this case I am wasting resources.
  2. I can setup a per-job cluster for this application, and set the number of slots per taskmanager very high, to reduce the resource per slots. In this case, I will also need to set parallelism of my Map to a high value in order to get enough CPU capacity. However, because Map is CPU bounded, high parallelism will lead to performance degradation (threads context switch).

So my question is, what's the best practice in this case?

Previously I used Apache Storm. For Storm application, I need to specify worker number (slots), and parallelism of each operator. However, the available slots doesn't need to be at least at large as the maximum parallelism of the application, so for this application, I can set 2 workers, 2 parallelism for Source and Map, and 20 parallelism for Sink, and it will end up only take 2 slots, with each slot has 1 source, 1 Map, and 10 Sink bolts. I think in this way it not only satisfies the need to have high parallel sink, but also make good use of resources (only 2 Map). Why people want to design the Flink parallelism this way? Or my understanding is wrong?

1
In my answer i've assumed this is a streaming application. If it's batch, then the conversation will be different.David Anderson
Yes it is a streaming applicationZimou Zhang
What David suggested below with option #2 has worked well for us for similar situations. You still need to end with a DiscardingSink so Flink is satisfied you have a complete workflow, but all of the interesting tuning/parallelism happens via an AsyncIO function prior to that sink.kkrugler

1 Answers

1
votes

A couple of options, and the reasoning behind them:

  1. Use a parallelism of 20 across the whole job: source, map, sink. By doing this you can leverage operator chaining, and avoid the serialization/deserialization and network communication that you will otherwise have between the map and sink (where the parallelism would change from 2 to 20). You'd have to benchmark it to be sure, but normally the savings from avoiding ser/de and the network stack will be significant.

  2. Use a parallelism of 2 across the whole job (including the sink), and use an asynchronous client to talk to the external API so that each sink instance can handle a bunch of concurrent requests. You can probably use Flink's async i/o for this, though if you do, you'll have to tack on a dummy sink (since Flink insists that every job have a sink).

One problem with any of these schemes, other than #2 above, is that you'll be tying up a bunch of resources that are spending most of their time idling while waiting for the slow external API to respond. Also, it's problematic to do blocking/synchronous i/o in a Flink user function, since Flink's operators are single-threaded, and you can block checkpointing, etc., by doing this.