I'm writing a Spark job in Scala on Google DataProc that executes daily and processes records each marked with a transaction time. The records are grouped by year-month combo and each group is written to a separate monthly parquet file in GCS (e.g. 2018-07-file.parquet
, 2018-08-file.parquet
, etc). Note that these files go back about 5 years and form a very large dataset (~1TB).
I want to write these files into BigQuery and have the job update only the monthly records that have changed in the current run. For simplicity, I'd like to delete the existing records for any month with updated ones and then just load in the data from the monthly parquet file.
I am trying to use the BigQuery Connector for DataProc but it only seems to support updating of an entire table and not a batch of records filtered by a date field, for example.
What is the best way to do this? I tried including the full BigQuery library JAR into my project and using a data manipulation query to delete existing monthly records as shown below:
def writeDataset(sparkContext: SparkContext, monthYear: String, ds: Dataset[TargetOrder]) = {
val dtMonthYear = FeedWriter.parquetDateFormat.parse(monthYear)
val bigquery: BigQuery = BigQueryOptions.getDefaultInstance.getService
val queryConfig: QueryJobConfiguration =
QueryJobConfiguration.newBuilder("DELETE FROM `" + getBQTableName(monthYear) + "` " +
"WHERE header.trans_time BETWEEN PARSE_DATETIME('" + FeedWriter.parquetDateFormat.toPattern + "', '" + monthYear + "') " +
"AND PARSE_DATETIME('" + FeedWriter.parquetDateFormat.toPattern + "', '" + DateUtils.addMonths(dtMonthYear, 1) + "') ")
.setUseLegacySql(false)
.build();
val jobId: JobId = JobId.of(UUID.randomUUID().toString());
val queryJob: Job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()).waitFor()
}
but I get the following error (I assume including the full BQ client JAR in a DataProc job is not allowed or perhaps it just doesn't play nice with the BQ connector):
java.lang.NoSuchMethodError: com.google.api.services.bigquery.model.JobReference.setLocation(Ljava/lang/String;)Lcom/google/api/services/bigquery/model/JobReference;
at com.google.cloud.bigquery.JobId.toPb(JobId.java:114)
at com.google.cloud.bigquery.JobInfo.toPb(JobInfo.java:370)
at com.google.cloud.bigquery.BigQueryImpl.create(BigQueryImpl.java:198)
at com.google.cloud.bigquery.BigQueryImpl.create(BigQueryImpl.java:187)
at ca.mycompany.myproject.output.BigQueryWriter$.writeDataset(BigQueryWriter.scala:39)