2
votes

I have created a NEWLINE DELIM JSON File in GCS. I have created an external table on top of the same JSON file as well and able to read data from BigQuery UI.

I want to access the external table data using PySpark and create a dataframe and then run the same job from Dataproc. Below is the code snippet I've written:

#!/usr/bin/python
import sys
import json
from pyspark.sql.functions import udf, lit, when, date_sub
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField, StringType, BooleanType, DateType
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from datetime import datetime

TargetTableUri=sys.argv[1]

spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-bigquery-demo') \
  .getOrCreate()


bucket1 = "gs://first-bucket-arpan/output1"
spark.conf.set('temporaryGcsBucket', bucket1)

src_tbl = spark.read.format('bigquery') \
  .option('table', 'turing-thought-277215:first_dataset.ext_employee_details') \
  .load()
src_tbl.createOrReplaceTempView('src_tbl')

src_tbl_df = spark.sql( 'SELECT EMPID,EMPNAME,STREETADRESS,REGION,STATE,COUNTRY FROM src_tbl' )
src_tbl_df.show()
src_tbl_df.print_schema()

While running the job from dataproc cluster, getting below error: ": java.lang.UnsupportedOperationException: The type of table turing-thought-277215.first_dataset.ext_employee_details is currently not supported: EXTERNAL"

Is BigQuery External Table not supported in PySpark for BigQuery Connector? Below is the complete log.

20/08/13 16:44:25 INFO org.spark_project.jetty.util.log: Logging initialized @4863ms
20/08/13 16:44:25 INFO org.spark_project.jetty.server.Server: jetty-9.3.z-SNAPSHOT, build timestamp: unknown, git hash: unknown
20/08/13 16:44:25 INFO org.spark_project.jetty.server.Server: Started @5045ms
20/08/13 16:44:25 INFO org.spark_project.jetty.server.AbstractConnector: Started ServerConnector@5cf22b28{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
20/08/13 16:44:25 WARN org.apache.spark.scheduler.FairSchedulableBuilder: Fair Scheduler configuration file not found so jobs will be scheduled in FIFO order. To use fair scheduling, configure pools in fairscheduler.xml or set spark.scheduler.allocation.file to a file that contains the configuration.
20/08/13 16:44:27 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at my-dataproc-cluster-m/10.148.0.40:8032
20/08/13 16:44:27 INFO org.apache.hadoop.yarn.client.AHSProxy: Connecting to Application History server at my-dataproc-cluster-m/10.148.0.40:10200
20/08/13 16:44:31 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1596957621647_0022
Traceback (most recent call last):
  File "/tmp/job-scd2curation-6/scdtype2curation.py", line 25, in <module>
    .option('table', 'turing-thought-277215:first_dataset.ext_employee_details') \
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 172, in load
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o59.load.

: java.lang.UnsupportedOperationException: The type of table turing-thought-277215.first_dataset.ext_employee_details is currently not supported: EXTERNAL

    at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelationInternal(BigQueryRelationProvider.scala:83)
    at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:40)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:341)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
1
@Lamanus is correct. The Spark BigQuery connector relies on the BigQuery Storage API to read data efficiently, and this API does not support external tables yet. Adding support for external tables is on our roadmap (with or without the API), but it is not ready yet. In any case, you can read the data directly using spark.read.json.load(...).David Rabinowitz

1 Answers

1
votes

It only support the table and view not the external. See the source

    val table = Option(bigquery.getTable(opts.tableId))
      .getOrElse(sys.error(s"Table $tableName not found"))
    table.getDefinition[TableDefinition].getType match {
      case TABLE => new DirectBigQueryRelation(opts, table)(sqlContext)
      case VIEW | MATERIALIZED_VIEW => if (opts.viewsEnabled) {
        new DirectBigQueryRelation(opts, table)(sqlContext)
      } else {
        sys.error(
          s"""Views were not enabled. You can enable views by setting
             |'${SparkBigQueryOptions.ViewsEnabledOption}' to true.
             |Notice additional cost may occur."""
            .stripMargin.replace('\n', ' '))
      }
      case unsupported => throw new UnsupportedOperationException(
        s"The type of table $tableName is currently not supported: $unsupported")
    }