1
votes

I am new to Apache Flink and am trying to understand some best practices regarding scaling Flink streaming jobs along side with Kafka. Some questions I am not able to find suitable answers for include:

  1. How many streaming jobs can/should you be running? Is there a scalability issue with running too many streams? How much is too many?
  2. If we do run let's say 2,000 streams to meet a business need, what is the best way to manage these streams?
  3. What is the preferred way to read stream data from one stream to another? Can we join streams, perform continuous queries, etc...?

Thanks in advance for any support and apologize if these questions seems somewhat basic, but I'm trying to get a better handle on this technology. I've read through much of the documentation, but admittedly might not be putting some concepts together due to my lack of experience in this area. thanks for any help!

1

1 Answers

1
votes
  • There is no limitation on the number of streams, flink will scale depending on the memory / CPU of the Job Manager / Task Manager, the parallelization being used and the number of slots. I use YARN for managing the resources. If number of streams being connected is high, then we need to be little bit cautious that not all / bulk of the processing is taking place on some task managers as that will slow down the process. There can be lags in the kafka stream itself or internal lags due to some task managers being heavily loaded can definitely arise and preventive checks need to be put in place for that.

  • Continuous Queries support has been built as part of latest flink version, you can check the flink documentation for it.

  • If by reading one stream of data to another you mean connecting two streams in flink terminology then we can connect them on a common key and maintain an value state. Note that the value state is maintained in a task manager and is not shared across task managers. Else, if you imply union of two or more stream than we can build the flatmapfunctions in such a way that the data from such streams comes in a standard format.

    Example of union:

val stream1: DataStream[UserBookingEvent] = BookingClosure.getSource(runmode).getSource(env)
      .map(new ClosureMapFunction)

val stream2: DataStream[UserBookingEvent] = BookingCancel.getSource(runmode).getSource(env)
      .map(new CancelMapFunction)

val unionStream: DataStream[UserBookingEvent] = stream1.union(stream2)

---

import org.apache.flink.api.common.functions.MapFunction
import org.json4s.jackson.JsonMethods.{parse => _, parseOpt => _}
import org.json4s.native.JsonMethods._
import org.slf4j.{Logger, LoggerFactory}

class CancelMapFunction extends MapFunction[String, Option[UserBookingEvent]] {
  override def map(in: String): Option[UserBookingEvent] = {
    val LOG: Logger = LoggerFactory.getLogger(classOf[CancelMapFunction])
    try {
      implicit lazy val formats = org.json4s.DefaultFormats

      val json = parse(in)
      ..............
     } catch {
      case e: Exception => {
        LOG.error("Could not parse Cancel Event= " + in + e.getMessage)
        None
      }
    }
  }
}