1
votes

In the example of Using the BigQuery Connector with Spark

// Perform word count.
val wordCounts = (tableData
    .map(entry => convertToTuple(entry._2))
    .reduceByKey(_ + _))

// Write data back into a new BigQuery table.
// IndirectBigQueryOutputFormat discards keys, so set key to null.
(wordCounts
     .map(pair => (null, convertToJson(pair)))
     .saveAsNewAPIHadoopDataset(conf))

if I remove the .reduceByKey(_ + _) part, then I will have the following error

org.apache.spark.SparkException: Job aborted. at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:107) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084) ... 46 elided Caused by: java.io.IOException: Schema has no fields. Table: test_output_40b400dc_1bfe_454a_9aa8_bf9562d54c3f_source at com.google.cloud.hadoop.io.bigquery.BigQueryUtils.waitForJobCompletion(BigQueryUtils.java:95) at com.google.cloud.hadoop.io.bigquery.BigQueryHelper.importFromGcs(BigQueryHelper.java:164) at com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputCommitter.commitJob(IndirectBigQueryOutputCommitter.java:57) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:128) at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:101) ... 53 more

In some cases, I don't use reduceByKey and want to save my RDD in BigQuery.

2
can you add the full error? Is that error because the code change you made?mrsrinivas
yes, the error happens only after my code change.bignano

2 Answers

0
votes

try to work with schema :

object Schema {
  def apply(record: JsonObject): Schema = Schema (
      word = record.get ("word").getAsString,
      Count = record.get ("Count").getAsInt
    )
}
case class Schema(word String,
                  Count :Int
                  )

and pass this schema like this :

wordCounts.map(x=>Schema(x))  

hope it helps you

0
votes

java.io.IOException: Schema has no fields is the error, which means BigQuery is unable to detect schema automatically. If you specify a schema like

List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("word").setType("STRING"));
fields.add(new TableFieldSchema().setName("word_count").setType("INTEGER"));
BigQueryOutputConfiguration.configure(conf, ..., new TableSchema().setFields(fields), ...);

You should no longer hit this issue.

I think the reason .reduceByKey(_ + _) hides this issue is because:

  • Schema auto-detection will select one random file and scan up to 100 rows of data.
  • tableData RDD is initially partitioned into many small shards, each of which is insufficient to let BigQuery infer schema automatically.
  • .reduceByKey(_ + _) repartitions the RDD into bigger shards.

My hunch is that if you replace .reduceByKey(_ + _) by .repartition(2), the job should also work without explicitly providing a schema.