2
votes

I'm not getting the Google example work

https://cloud.google.com/hadoop/examples/bigquery-connector-spark-example PySpark

There are a few mistakes in the code i think, like:

'# Output Parameters 'mapred.bq.project.id': '',

Should be: 'mapred.bq.output.project.id': '',

and

'# Write data back into new BigQuery table.
'# BigQueryOutputFormat discards keys, so set key to None.
(word_counts
.map(lambda pair: None, json.dumps(pair))
.saveAsNewAPIHadoopDataset(conf))

will give an error message. If I change it to:
(word_counts
.map(lambda pair: (None, json.dumps(pair)))
.saveAsNewAPIHadoopDataset(conf))

I get the error message:
org.apache.hadoop.io.Text cannot be cast to com.google.gson.JsonObject

And whatever I try I can not make this work.
There is a dataset created in BigQuery with the name I gave it in the 'conf' with a trailing '_hadoop_temporary_job_201512081419_0008'
And a table is created with '_attempt_201512081419_0008_r_000000_0' on the end. But are always empty

Can anybody help me with this?
Thanks

1

1 Answers

2
votes

We are working to update the documentation because, as you noted, the docs are incorrect in this case. Sorry about that! While we're working to update the docs, I wanted to get you a reply ASAP.

Casting problem

The most important problem you mention is the casting issue. Unfortunately,PySpark cannot use the BigQueryOutputFormat to create Java GSON objects. The solution (workaround) is to save the output data into Google Cloud Storage (GCS) and then load it manually with the bq command.

Code example

Here is a code sample which exports to GCS and loads the data into BigQuery. You could also use subprocess and Python to execute the bq command programatically.

#!/usr/bin/python
"""BigQuery I/O PySpark example."""
import json
import pprint
import pyspark

sc = pyspark.SparkContext()

# Use the Google Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Google Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory ='gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)

conf = {
    # Input Parameters
    'mapred.bq.project.id': project,
    'mapred.bq.gcs.bucket': bucket,
    'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': 'publicdata',
    'mapred.bq.input.dataset.id': 'samples',
    'mapred.bq.input.table.id': 'shakespeare',
}

# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

# Perform word count.
word_counts = (
    table_data
    .map(lambda (_, record): json.loads(record))
    .map(lambda x: (x['word'].lower(), int(x['word_count'])))
    .reduceByKey(lambda x, y: x + y))

# Display 10 results.
pprint.pprint(word_counts.take(10))

# Stage data formatted as newline delimited json in Google Cloud Storage.
output_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
partitions = range(word_counts.getNumPartitions())
output_files = [output_directory + '/part-{:05}'.format(i) for i in partitions]

(word_counts
 .map(lambda (w, c): json.dumps({'word': w, 'word_count': c}))
 .saveAsTextFile(output_directory))

# Manually clean up the input_directory, otherwise there will be BigQuery export
# files left over indefinitely.
input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)

print """
###########################################################################
# Finish uploading data to BigQuery using a client e.g.
bq load --source_format NEWLINE_DELIMITED_JSON \
    --schema 'word:STRING,word_count:INTEGER' \
    wordcount_dataset.wordcount_table {files}
# Clean up the output
gsutil -m rm -r {output_directory}
###########################################################################
""".format(
    files=','.join(output_files),
    output_directory=output_directory)