1
votes

I'm trying to read data from CSV file in GCS and save it in a BigQuery table.

This my csv file :

1,Marc,B12,2017-03-24
2,Marc,B12,2018-01-31
3,Marc,B21,2017-03-17
4,Jeam,B12,2017-12-30
5,Jeam,B12,2017-09-02
6,Jeam,B11,2018-06-30
7,Jeam,B21,2018-03-02
8,Olivier,B20,2017-12-30

And this is my code :

val spark = SparkSession
    .builder()
    .appName("Hyp-session-bq")
    .config("spark.master","local")
    .getOrCreate()
  val sc : SparkContext = spark.sparkContext


  val conf=sc.hadoopConfiguration

  //Input Parameters
  val projectId = conf.get("fs.gs.project.id")
  val bucket = conf.get("fs.gs.system.bucket")
  val inputTable = s"$projectId:rpc.testBig"

  //Input Configuration
  conf.set(BigQueryConfiguration.PROJECT_ID_KEY,projectId)
  conf.set(BigQueryConfiguration.GCS_BUCKET_KEY,bucket)
  BigQueryConfiguration.configureBigQueryInput(conf,inputTable)

  //Output Parameters
  val outPutTable = s"$projectId:rpc.outTestBig"

  // Temp output bucket that is deleted upon completion of job
  val outPutGcsPath = ("gs://"+bucket+"/hadoop/tmp/outTestBig")

  BigQueryOutputConfiguration.configure(conf,
    outPutTable,
    null,
    outPutGcsPath,
    BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
    classOf[TextOutputFormat[_,_]])

  conf.set("mapreduce.job.outputformat.class", classOf[IndirectBigQueryOutputFormat[_,_]].getName)

  // Truncate the table before writing output to allow multiple runs.
  conf.set(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY,"WRITE_TRUNCATE")

  val text_file = sc.textFile("gs://test_files/csvfiles/test.csv")
  val lignes = text_file.flatMap(x=>x.split(" "))
  case class schemaFile(id: Int, name: String, symbole: String, date: String)

  def parseStringWithCaseClass(str: String): schemaFile = schemaFile(
      val id = str.split(",")(0).toInt,
      val name = str.split(",")(1),
      val symbole = str.split(",")(2),
      val date = str.split(",")(3)
    )

    val result1 = lignes.map(x=>parseStringWithCaseClass(x))
    val x =result1.map(elem =>(null,new Gson().toJsonTree(elem)))
    val y = x.saveAsNewAPIHadoopDataset(conf)  

When I run the Code I get this Error :

ERROR org.apache.spark.internal.io.SparkHadoopMapReduceWriter: Aborting job job_20180226083501_0008.
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "Load configuration must specify at least one source URI",
    "reason" : "invalid"
  } ],
  "message" : "Load configuration must specify at least one source URI"
}
        at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:145)
        at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
        at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
        at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1056)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        at com.google.cloud.hadoop.io.bigquery.BigQueryHelper.insertJobOrFetchDuplicate(BigQueryHelper.java:306)
        at com.google.cloud.hadoop.io.bigquery.BigQueryHelper.importFromGcs(BigQueryHelper.java:160)
        at com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputCommitter.commitJob(IndirectBigQueryOutputCommitter.java:57)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:128)
        at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:101)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
        at jeam.BigQueryIO$.main(BigQueryIO.scala:115)
        at jeam.BigQueryIO.main(BigQueryIO.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)  

I think the probleme is about the case class and parseStringWithCaseClass but I don't Know How to resolve this. I don't have a problème in the configuration because i get the perfect result when i'm trying with wordcount example : https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example

2
why do you need Spark to load a CSV file into BigQuery?Graham Polley
because I have a datalake in GCS, and I want to load CSV files to BigQuery for SQL users and visualisationG.Saleh
but you don't need Spark to do this. You can load directly from GCS into BigQuery.Graham Polley
How to do that, and automaticaly and with specification for example I split my csv and save a specific column ?G.Saleh

2 Answers

1
votes

Try to work with Tuple4 :

  def parseStringWithTuple(str: String): Tuple4[Int, String, String, String] = {
      val id = str.split(",")(0).toInt
      val name = str.split(",")(1)
      val symbole = str.split(",")(2)
      val date = str.split(",")(3)
      (id, name, symbole, date)
    }
val result1 = lignes.map(x=>parseStringWithTuple(x))

But I tested your Code and it works fine.

1
votes

I have been performing some tests running your code with my own BigQuery tables and CSV files, and it has worked for me without needing any additional modification.

I see that when you changed CaseClass to Tuple4, as suggested by @jean-marc, your code started working, so it is a strange behavior, even more considering that for both him and me, your code is actually working without needing further modifications. The error Load configuration must specify at least one source URI usually appears when the load job in BigQuery is not properly configured, and it is not receiving a correct Cloud Storage object URL. However, if the same exact code works when only changing to Tuple4 and the CSV file you are using is the same and has not changed (i.e. the URL is valid), it may have been a transient issue, possibly related to Cloud Storage or BigQuery and not to the Dataproc job itself.

Finally, given the case that this issue is specific to you (it has worked for at least two more users with the same code), once you have checked that there is no issue related to the Cloud Storage object (permissions, wrong location, etc.), you may be interested in creating a bug in the Public Issue Tracker.