1
votes

I'm trying to load the CSV file with schema under auto detection but I am unable to load the file into Big query. Can any one help me on this.

Please find my code below:

def load_data_from_file(dataset_name, table_name, source_file_name):

    bigquery_client = bigquery.Client()
    dataset = bigquery_client.dataset(dataset_name)
    table = dataset.table(table_name)    
    table.reload()
    with open(source_file_name, 'rb') as source_file:        
        job = table.upload_from_file(
            source_file, source_format='text/csv')
    wait_for_job(job)
    print('Loaded {} rows into {}:{}.'.format(
        job.output_rows, dataset_name, table_name))
def wait_for_job(job):
    while True:
        job.reload()
        if job.state == 'DONE':
            if job.error_result:
                raise RuntimeError(job.errors)
            return
    time.sleep(1)
4
Are you getting an error? If so, what is it?Elliott Brossard

4 Answers

2
votes

Based on the Google BigQuery python API documentation, you should set source_format to 'CSV' instead of 'text/csv':

source_format='CSV'

Code Sample:

with open(csv_file.name, 'rb') as readable:
    table.upload_from_file(
        readable, source_format='CSV', skip_leading_rows=1)

Source: https://googlecloudplatform.github.io/google-cloud-python/stable/bigquery-usage.html#datasets

If this does not solve your problem, please provide more specifics about the errors you are observing.

2
votes

You can use the below code snippet to create and load data (CSV format) from Cloud Storage to BigQuery with auto-detect schema:

from google.cloud import bigquery

bigqueryClient = bigquery.Client()
jobConfig = bigquery.LoadJobConfig()
jobConfig.skip_leading_rows = 1
jobConfig.source_format = bigquery.SourceFormat.CSV
jobConfig.write_disposition = bigquery.WriteDisposition.WRITE_APPEND   
jobConfig.autodetect=True

datasetName = "dataset-name"
targetTable = "table_name"
uri = "gs://bucket_name/file.csv"
tableRef = bigqueryClient.dataset(datasetName).table(targetTable)
bigqueryJob = bigqueryClient.load_table_from_uri(uri, tableRef, job_config=jobConfig)
bigqueryJob.result()
0
votes

Currently, the Python Client has no support for loading data from file with a schema auto-detection flag (I plan on doing a pull request to add this support but still I'd like to talk to the maintainers what their opinions are on this implementation).

There are still some ways to work this around. I didn't find a very elegant solution so far but nevertheless this code allows you to add schema detection as input flag:

from google.cloud.bigquery import Client
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'path/your/json.key'
import google.cloud.bigquery.table as mtable

def _configure_job_metadata(metadata,
                             allow_jagged_rows,
                             allow_quoted_newlines,
                             create_disposition,
                             encoding,
                             field_delimiter,
                             ignore_unknown_values,
                             max_bad_records,
                             quote_character,
                             skip_leading_rows,
                             write_disposition):
     load_config = metadata['configuration']['load']

     if allow_jagged_rows is not None:
         load_config['allowJaggedRows'] = allow_jagged_rows

     if allow_quoted_newlines is not None:
         load_config['allowQuotedNewlines'] = allow_quoted_newlines

     if create_disposition is not None:
         load_config['createDisposition'] = create_disposition

     if encoding is not None:
         load_config['encoding'] = encoding

     if field_delimiter is not None:
         load_config['fieldDelimiter'] = field_delimiter

     if ignore_unknown_values is not None:
         load_config['ignoreUnknownValues'] = ignore_unknown_values

     if max_bad_records is not None:
         load_config['maxBadRecords'] = max_bad_records

     if quote_character is not None:
         load_config['quote'] = quote_character

     if skip_leading_rows is not None:
         load_config['skipLeadingRows'] = skip_leading_rows

     if write_disposition is not None:
         load_config['writeDisposition'] = write_disposition
     load_config['autodetect'] = True # --> Here you can add the option for schema auto-detection

mtable._configure_job_metadata = _configure_job_metadata

bq_client = Client()
ds = bq_client.dataset('dataset_name')
ds.table = lambda: mtable.Table('table_name', ds)
table = ds.table()

with open(source_file_name, 'rb') as source_file:        
    job = table.upload_from_file(
        source_file, source_format='text/csv')
-1
votes

Just wanted to show how i've used the python client.

Below is my function to create a table and load it with a csv file.

Also, self.client is my bigquery.Client()

def insertTable(self, datasetName, tableName, csvFilePath, schema=None):
    """
    This function creates a table in given dataset in our default project
    and inserts the data given via a csv file.

    :param datasetName: The name of the dataset to be created
    :param tableName: The name of the dataset in which the table needs to be created
    :param csvFilePath: The path of the file to be inserted
    :param schema: The schema of the table to be created
    :return: returns nothing
    """
    csv_file = open(csvFilePath, 'rb')

    dataset_ref = self.client.dataset(datasetName)
    # <import>: from google.cloud.bigquery import Dataset
    dataset = Dataset(dataset_ref)

    table_ref = dataset.table(tableName)
    if schema is not None:
        table = bigquery.Table(table_ref,schema)
    else:
        table = bigquery.Table(table_ref)

    try:
        self.client.delete_table(table)
    except:
        pass

    table = self.client.create_table(table)

    # <import>: from google.cloud.bigquery import LoadJobConfig
    job_config = LoadJobConfig()
    table_ref = dataset.table(tableName)
    job_config.source_format = 'CSV'
    job_config.skip_leading_rows = 1
    job_config.autodetect = True
    job = self.client.load_table_from_file(
        csv_file, table_ref, job_config=job_config)
    job.result()

Let me know if this solves your problem.