1
votes

I am trying to write the Parquet files to Pubsub through Spark on a Dataproc cluster.

I have used below pseudo code

dataFrame
      .as[MyCaseClass]
      .foreachPartition(partition => {
          val topicName = "projects/myproject/topics/mytopic"
          val publisher = Publisher.newBuilder(topicName).build()
          partition.foreach(users => {
            try {
              val jsonUser = users.asJson.noSpaces //using circe scala lib
              val data = ByteString.copyFromUtf8(jsonUser)
              val pubsubMessage = PubsubMessage.newBuilder().setData(data).build()
              val message = publisher.publish(pubsubMessage)
            }
            catch {
              case e: Exception => System.out.println("Exception in processing the event " + e.printStackTrace())
            }
          })
          publisher.shutdown()
        }
        catch {
          case e: Exception => System.out.println("Exception in processing the partition = " + e.printStackTrace())
        }
      }
      )

Whenever I am submitting this on the cluster I am getting the spark prelaunch errors with exit code 134.

I have shaded the guava and protobuf in my pom. If I run this example through a local test case, it works but if submitted on dataproc I get the errors. I did not find any relative information about writing the data frame to pub-sub. Any pointers?

Update: System Details: Single Node Cluster with N1-Standard-32 (32 Cores,120GB Memory) Executor Cores: Dynamic enabled

Attaching the stack trace:

20/12/22 17:51:43 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 1 for reason Container from a bad node: container_1608332157194_0026_01_000002 on host: dataproc-cluster.internal. Exit status: 134. Diagnostics: [2020-12-22 17:51:43.556]Exception from container-launch.
Container id: container_1608332157194_0026_01_000002
Exit code: 134

[2020-12-22 17:51:43.557]Container exited with a non-zero exit code 134. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :
/bin/bash: line 1: 19017 Aborted                 /usr/lib/jvm/adoptopenjdk-8-hotspot-amd64/bin/java -server -Xmx5586m -Djava.io.tmpdir=/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1608332157194_0026/container_1608332157194_0026_01_000002/tmp '-Dspark.driver.port=43691' '-Dspark.rpc.message.maxSize=512' -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/userlogs/application_1608332157194_0026/container_1608332157194_0026_01_000002 -XX:OnOutOfMemoryError='kill %p' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://[email protected]:43691 --executor-id 1 --hostname dataproc-cluster.internal --cores 2 --app-id application_1608332157194_0026 --user-class-path file:/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1608332157194_0026/container_1608332157194_0026_01_000002/__app__.jar --user-class-path file:/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1608332157194_0026/container_1608332157194_0026_01_000002/mySparkJar-1.0.0-0-SNAPSHOT.jar --user-class-path file:/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1608332157194_0026/container_1608332157194_0026_01_000002/org.apache.spark_spark-avro_2.11-2.4.2.jar --user-class-path file:/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1608332157194_0026/container_1608332157194_0026_01_000002/org.spark-project.spark_unused-1.0.0.jar > /var/log/hadoop-yarn/userlogs/application_1608332157194_0026/container_1608332157194_0026_01_000002/stdout 2> /var/log/hadoop-yarn/userlogs/application_1608332157194_0026/container_1608332157194_0026_01_000002/stderr
Last 4096 bytes of stderr :
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
20/12/22 17:51:36 INFO org.apache.parquet.hadoop.InternalParquetRecordReader: RecordReader initialized will read a total of 11320100 records.
20/12/22 17:51:36 INFO org.apache.parquet.hadoop.InternalParquetRecordReader: at row 0. reading next block
20/12/22 17:51:38 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.snappy]
20/12/22 17:51:38 INFO org.apache.parquet.hadoop.InternalParquetRecordReader: block read in memory in 2301 ms. row count = 11320100
20/12/22 17:51:39 INFO org.apache.parquet.hadoop.InternalParquetRecordReader: RecordReader initialized will read a total of 11320100 records.
20/12/22 17:51:39 INFO org.apache.parquet.hadoop.InternalParquetRecordReader: at row 0. reading next block
20/12/22 17:51:40 INFO org.apache.parquet.hadoop.InternalParquetRecordReader: block read in memory in 1411 ms. row count = 11320100
1
May you attach stacktrace for one of the failures? - Igor Dvorzhak

1 Answers

0
votes

If job failed early it could be the case that there not enough memory for Spark Driver to start: https://discuss.xgboost.ai/t/container-exited-with-a-non-zero-exit-code-134/133

To solve this issue you need to provision Dataproc cluster with master node that has more RAM or allocate more memory/heap for Spark driver and/or Spark executors.