1
votes

I'm using BigQuery and Dataproc in Google Cloud. Both are in the same project, let's call it "project-123". I use Composer (Airflow) to run my code.

I have a simple python script, test_script.py, that uses pyspark to get read data from a table in the bigquery public dataset:

if __name__ == "__main__":
    # Create Spark Cluster
    try:
        spark = SparkSession.builder.appName("test_script").getOrCreate()
        log.info("Created a SparkSession")
    except ValueError:
        warnings.warn("SparkSession already exists in this scope")

    df = (
        spark.read.format("bigquery")
        .option("project", "project-123")
        .option("dataset", "bigquery-public-data")
        .option("table", "crypto_bitcoin.outputs")
        .load()
    )

I run the script using the DataProcPySparkOperator in airflow:

    # This task corresponds to the ""
    test_script_task = DataProcPySparkOperator(
        task_id="test_script",
        main="./test_script.py",
        cluster_name="test_script_cluster",
        arguments=[],

        # Since we are using bigquery, we need to explicity add the connector jar
        dataproc_pyspark_jars="gs://spark-lib/bigquery/spark-bigquery-latest.jar",
    )

However, every time I try I get the following error:

Invalid project ID '/tmp/test_script_20200304_407da59b/test_script.py'. Project IDs must contain 6-63 lowercase letters, digits, or dashes. Some project IDs also include domain name separated by a colon. IDs must start with a letter and may not end with a dash.

Where is this project ID coming from? It's obviously not being overwritten by my .option("project", "project-123"). My guess is that Composer is storing my spark job script at the location /tmp/test_script_20200304_407da59b/test_script.py. If that's the case, how can I overwrite the project ID?

Any help is much appreciated

1

1 Answers

0
votes

I'm afraid you are mixing the parameters. project is the project the table belongs to, and bigquery-public-data is a project rather than dataset. Please try the following call:

df = (
        spark.read.format("bigquery")
        .option("parentProject", "project-123")
        .option("project", "bigquery-public-data")
        .option("table", "crypto_bitcoin.outputs")
        .load()
    )