3
votes

I have a few questions regarding the parallelism of flink. This is my setup:

I have 1 master node and 2 slaves. In flink I have created 3 kafka consumers which each consume from a different topic.
Since the order of the elements is important to me, each topic only has one partition and I have flink setup to use the event time.

Then I run the following pipeline (in pseudo code) on each of the data streams:

source
.map(deserialize)
.window
.apply
.map(serialize)
.writeTo(sink)

Up until now I started my flink program with the argument -p 2 assuming that this would allow me to use both of my nodes. The result is not what I was hoping for, since the order of my output is messed up sometimes.

After reading through the flink documentation and trying to understand it better, could someone please confirm my following “learnings"?

1.) Passing -p 2 configures the task parallelism only, i.e. the maximum number of parallel instances a task (such as map(deserialize)) will be split into. If I want to keep the order through the whole pipeline I have to use -p 1.

2.) This to me seems contradictory/confusing: even if the parallelism is set to 1, different tasks can still be run in parallel (at the same time). Therefore my 3 pipelines will also be run in parallel if I pass -p 1.

And as a follow up question: Is there any way to figure out which tasks were mapped to which task slot so that I could confirm the parallel execution myself?

I would appreciate any input!

Update

Here is flink's execution plan for -p 2.

3

3 Answers

5
votes

After having asked the question on the Apache Flink user email list here is the answer:

1.) The -p option defines the task parallelism per job. If the parallelism is chosen higher than 1 and data gets redistributed (e.g. via rebalance() or keyBy()) the order is not guaranteed.

2.) With -p set to 1 only 1 task slot, i.e. 1 CPU Core, is used. Therefore there might be multiple threads running on one core concurrently but not in parallel.

As for my requirements: In order to run multiple pipelines in parallel and still keep the order I can just run multiple Flink Jobs instead of running all pipelines within the same Flink Job.

0
votes

I'll try to answer with what I know.

1) Yes, with the CLI client, the parallelism parameter can be specified with -p. You are right to say this is the maximum number of parallel instances. However, I don't see the link between the parallelism and the order ? As far as I know, the order is managed by Flink with the timestamp provided in the event or his own ingestion timestamp. If you want to maintain order wihtin different data sources, it seems complicated to me or you might merge these different data sources into one.

2) Your 3 pipelines can run in parallel if you have parallelism set to 3. I think here parallelism means on different slots.

Follow up question) You can check which tasks are mapped to which task slot on the JobManager’s web frontend at http://localhost:8081.

0
votes

Please find below an example of scaling locally using side-outputs and slot groups.

package org.example

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

/**
  * This example shows an implementation of WordCount with data from a text socket.
  * To run the example make sure that the service providing the text data is already up and running.
  *
  * To start an example socket text stream on your local machine run netcat from a command line,
  * where the parameter specifies the port number:
  *
  * {{{
  *   nc -lk 9999
  * }}}
  *
  * Usage:
  * {{{
  *   SocketTextStreamWordCount <hostname> <port> <output path>
  * }}}
  *
  * This example shows how to:
  *
  *   - use StreamExecutionEnvironment.socketTextStream
  *   - write a simple Flink Streaming program in scala.
  *   - write and use user-defined functions.
  */
object SocketTextStreamWordCount {

  def main(args: Array[String]) {
    if (args.length != 2) {
      System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>")
      return
    }

    val hostName = args(0)
    val port = args(1).toInt
    val outputTag1 = OutputTag[String]("side-1")
    val outputTag2 = OutputTag[String]("side-2")

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.getConfig.enableObjectReuse()

    //Create streams for names and ages by mapping the inputs to the corresponding objects
    val text = env.socketTextStream(hostName, port).slotSharingGroup("processElement")
    val counts = text.flatMap {
      _.toLowerCase.split("\\W+") filter {
        _.nonEmpty
      }
    }
      .process(new ProcessFunction[String, String] {
        override def processElement(
                                     value: String,
                                     ctx: ProcessFunction[String, String]#Context,
                                     out: Collector[String]): Unit = {
          if (value.head <= 'm') ctx.output(outputTag1, String.valueOf(value))
          else ctx.output(outputTag2, String.valueOf(value))
        }
      })

    val sideOutputStream1: DataStream[String] = counts.getSideOutput(outputTag1)
    val sideOutputStream2: DataStream[String] = counts.getSideOutput(outputTag2)

    val output1 = sideOutputStream1.map {
      (_, 1)
    }.slotSharingGroup("map1")
      .keyBy(0)
      .sum(1)

    val output2 = sideOutputStream2.map {
      (_, 1)
    }.slotSharingGroup("map2")
      .keyBy(0)
      .sum(1)

    output1.print()
    output2.print()

    env.execute("Scala SocketTextStreamWordCount Example")
  }

}