3
votes

I am using google cloud Dataproc Spark cluster to run Spark streaming job, which reads data from multiple PubSub subscriptions and writes into BigQuery. The PubSub has 5 million elements, with sliding window of 2 mins and batch/window of 30 sec, I am getting only approximately 200,000 elements per batch. I wish to get all 5 million in first batch. The size of each element is approximately 140 bytes and is in Avro message format.

I have achieved speed of 1 million elements per second in Dataflow, but want to do the same with Dataproc. I tried with autoscaling option of Dataproc and also tried with the same Beam pipeline code which worked on Dataflow. If I increase number of subscriptions then it may give me more throughput. Is it possible to get 1M elements / sec throughput from a single subscription?

The following is my Scala code :

// Reading from multiple PubSub.
for (a <- 0 to Integer.parseInt(subs)) {
  logger.info("SKCHECK : Creating stream : " + subscription + a)
  val everysub  = PubsubUtils.createStream(
      ssc, projectId, None, subscription + a,
      SparkGCPCredentials.builder.jsonServiceAccount(jsonPath).build(),
      StorageLevel.MEMORY_ONLY_SER).map(message => {
          // Method to send avro bytes message and get row
          val row : Row = avroMsgToRow(message.getData())
          row
      })
}

My build.sbt looks like:

    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-core" % sparkVersion,
      "org.apache.spark" %% "spark-sql" % sparkVersion,
     // "org.apache.spark" %% "spark-mllib" % sparkVersion,
      "org.apache.spark" %% "spark-streaming" % sparkVersion,
     // "org.apache.spark" %% "spark-hive" % sparkVersion,
      "com.google.cloud" % "google-cloud-bigquery" % bigQueryVersion,
      "com.google.apis" % "google-api-services-bigquery" % googleApiBigQueryVersion,
      "com.google.cloud" % "google-cloud-nio" % gcsNioVersion,
      "com.sksamuel.avro4s" %% "avro4s-core" % avro4sVersion
    )

    // https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/bigquery-connector
    libraryDependencies += "com.google.cloud.bigdataoss" % "bigquery-connector" % "0.10.0-hadoop2"

    // https://mvnrepository.com/artifact/com.spotify/spark-bigquery
    libraryDependencies += "com.spotify" %% "spark-bigquery" % "0.2.2"

    libraryDependencies += "com.google.apis" % "google-api-services-pubsub" % "v1-rev425-1.25.0"

    // https://mvnrepository.com/artifact/org.apache.bahir/spark-streaming-pubsub
    libraryDependencies += "org.apache.bahir" %% "spark-streaming-pubsub" % "2.3.0"


    // https://mvnrepository.com/artifact/org.scala-lang/scala-library
    libraryDependencies += "org.scala-lang" % "scala-library" % "2.10.0-M3"

    // https://mvnrepository.com/artifact/org.apache.spark/spark-avro
    libraryDependencies += "org.apache.spark" %% "spark-avro" % "2.4.0"

Let me know if you need any more information.

I expect to get data ingestion speed of 1 million elements per second with single PubSub subscription.

1
Facing same issue with bahir library. were you able to identify the issue?vinisha

1 Answers

0
votes

I think you need to first identify the bottleneck with your Spark Streaming job. Is it CPU bound, memory bound, IO bound or because of some parameters of Spark which cause it didn't fully utilize the resources? I'd suggest you start by checking resource utilization and then try different machine types.