0
votes

I have a flow that executes spark jobs on Dataproc clusters in parallel for different zones. For each zone it creates a cluster, execute the spark job and delete the cluster after it finishes.

The spark job uses the org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset method passing the BigQuery Configuration to save data on BigQuery table. The job saves data in more than one table, calling the saveAsNewAPIHadoopDataset method more than one time per job.

The problem is that sometimes I'm getting a error caused by a conflict in the Hadoop temporary BigQuery Dataset that it internally creates to run the jobs:

Exception in thread "main" com.google.api.client.googleapis.json.GoogleJsonResponseException: 409 Conflict
{
 "code" : 409,
 "errors" : [ {
   "domain" : "global",
   "message" : "Already Exists: Dataset <my-gcp-project>:<MY-DATASET>_hadoop_temporary_job_201802250620_0013",
   "reason" : "duplicate"
 } ],
 "message" : "Already Exists: Dataset <my-gcp-project>:<MY-DATASET>_hadoop_temporary_job_201802250620_0013"
}
    at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:145)
    at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
    at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1056)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
    at com.google.cloud.hadoop.io.bigquery.BigQueryOutputCommitter.setupJob(BigQueryOutputCommitter.java:107)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1150)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1078)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1078)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1078)
    at org.apache.spark.api.java.JavaPairRDD.saveAsNewAPIHadoopDataset(JavaPairRDD.scala:819)
    ...

The timestamp 201802250620_0013 on the exception above has the _0013 sufix that I'm unsure if it represents time.

My thoughts is that sometimes the jobs runs at the same time and try to create a dataset with same timestamp in the name. Either in a parallel job or inside the same job on another saveAsNewAPIHadoopDataset call.

How can we avoid this error without putting a delay on the job execution?

The dependency that I'm using is:

<dependency>
    <groupId>com.google.cloud.bigdataoss</groupId>
    <artifactId>bigquery-connector</artifactId>
    <version>0.10.2-hadoop2</version>
    <scope>provided</scope>
</dependency>

The Dataproc image version is 1.1

Edit 1:

I tried using IndirectBigQueryOutputFormat but now I'm getting an error saying that the gcs output path already exists even passing it different time on each saveAsNewAPIHadoopDataset call.

Here is my code: SparkConf sc = new SparkConf().setAppName("MyApp");

try (JavaSparkContext jsc = new JavaSparkContext(sc)) {
    JavaPairRDD<String, String> filesJson = jsc.wholeTextFiles(jsonFolder, parts);
    JavaPairRDD<String, String> jsons = filesJson.flatMapToPair(new FileSplitter()).repartition(parts);
    JavaPairRDD<Object, JsonObject> objsJson = jsons.flatMapToPair(new JsonParser()).filter(t -> t._2() != null).cache();

    objsJson
    .filter(new FilterType(MSG_TYPE1))
    .saveAsNewAPIHadoopDataset(createConf("my-project:MY_DATASET.MY_TABLE1", "gs://my-bucket/tmp1"));

    objsJson
    .filter(new FilterType(MSG_TYPE2))
    .saveAsNewAPIHadoopDataset(createConf("my-project:MY_DATASET.MY_TABLE2", "gs://my-bucket/tmp2"));

    objsJson
    .filter(new FilterType(MSG_TYPE3))
    .saveAsNewAPIHadoopDataset(createConf("my-project:MY_DATASET.MY_TABLE3", "gs://my-bucket/tmp3"));

    // here goes another ingestion process. same code as above but diferrent params, parsers, etc.
}

Configuration createConf(String table, String outGCS) {
  Configuration conf = new Configuration();
  BigQueryOutputConfiguration.configure(conf, table, null, outGCS, BigQueryFileFormat.NEWLINE_DELIMITED_JSON, TextOutputFormat.class);
  conf.set("mapreduce.job.outputformat.class", IndirectBigQueryOutputFormat.class.getName());
  return conf;
}
1

1 Answers

0
votes

I believe what may be happening is that each mapper tries to create its own dataset. This is rather inefficient (and burns your daily quota proportional to the number of mappers).

An alternative is to use IndirectBigQueryOutputFormat for output class:

IndirectBigQueryOutputFormat works by first buffering all the data into a Cloud Storage temporary table, and then, on commitJob, copies all data from Cloud Storage into BigQuery in one operation. Its use is recommended for large jobs since it only requires one BigQuery "load" job per Hadoop/Spark job, as compared to BigQueryOutputFormat, which performs one BigQuery job for each Hadoop/Spark task.

See the example here: https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example