1
votes

We have a Spark Streaming application which is reading the data from Pubsub and applying some transformation and then convert the JavaDStream to Dataset and then write the results into BigQuery normalize tables.

Below is the sample Code. All the normalize tables are partitioned on CurrentTimestamp column. Is there any parameter we can set to improve write performance?

        pubSubMessageDStream
                .foreachRDD(new VoidFunction2<JavaRDD<PubSubMessageSchema>, Time>() {
            @Override
            public void call(JavaRDD<PubSubMessageSchema> v1, Time v2) throws Exception {
            Dataset<PubSubMessageSchema> pubSubDataSet = spark.createDataset(v1.rdd(), Encoders.bean(PubSubMessageSchema.class));
                       ---
                       ---
                       ---
                  for (Row payloadName : payloadNameList) {
                           Dataset<Row> normalizedDS = null;
                           if(payloadNameAList.contains(payloadName) {
                                 normalizedDS = dataSet.filter(col(colA.equalTo(<Value>)));
                           } else if(payloadNameBList.contains(payloadName) {
                                 normalizedDS = dataSet.filter(col(colA.equalTo(<Value>)));
                           }
                    normalizedDS.selectExpr(columnsBigQuery).write().format("bigquery")
                   .option("temporaryGcsBucket", gcsBucketName)
                   .option("table", tableName)
                   .option("project", projectId)
                   .option("parentProject", parentProjectId)
                   .mode(SaveMode.Append)
                   .save();
                 }
            }
        }
1
Which spark version are you using? - David Rabinowitz
Spark 2.4.7 and scala 2.11.12 - arunkindra

1 Answers

0
votes

Writing to BigQuery requires writing to GCS and then triggering a BigQuery load job. You can try to change the intermediateFormat to AVRO and see if it affects the performance - from our tests the better format depends on the schema and data size.

In addition, in the upcoming connector version 0.19.0 there is a write implementation for the DataSource v2 API for spark 2.4, which should improve the performance by 10-15%.