9
votes

I am converting data from CSV to Parquet using Python (Pandas) to later load it into Google BigQuery. I have some integer columns that contain missing values and since Pandas 0.24.0 I can store them as Int64 dtype.

Is there a way to use Int64 dtype also in a parquet file? I can't find a clean solution for ints with missing values (so they stay as INTEGER in BigQuery).

I have tried importing it directly to BigQuery and got the same error as when converting to parquet using Pandas (as shown below.)

Import a CSV with int column that includes a missing value:

import pandas as pd
df = pd.read_csv("docs/test_file.csv")
print(df["id"].info())

id 8 non-null float64

The line is imported as float64. I change the type to Int64:

df["id"] = df["id"].astype('Int64')
print(df["id"].info())

id 8 non-null Int64

Then I try and save to parquet:

df.to_parquet("output/test.parquet")

The error:

pyarrow.lib.ArrowTypeError: ('Did not pass numpy.dtype object', 'Conversion failed for column id with type Int64')

1
Specifically for BigQuery, you got an answer below. But in general for writing to Parquet, pyarrow first needs to add support for the nullable Int64 type, which is an open issue: issues.apache.org/jira/browse/ARROW-5379joris

1 Answers

3
votes

There's currently an open issue to support the new Int64 column from google-cloud-bigquery at https://github.com/googleapis/google-cloud-python/issues/7702.

In the meantime, I suggest using the object dtype. In version 1.13.0 of google-cloud-bigquery, you can specify the desired BigQuery schema, and the library will use the desired types in the parquet file.

    # Schema with all scalar types.
    table_schema = (
        bigquery.SchemaField("int_col", "INTEGER"),
    )

    num_rows = 100
    nulls = [None] * num_rows
    dataframe = pandas.DataFrame(
        {
            "int_col": nulls,
        }
    )

    table_id = "{}.{}.load_table_from_dataframe_w_nulls".format(
        Config.CLIENT.project, dataset_id
    )

    job_config = bigquery.LoadJobConfig(schema=table_schema)
    load_job = Config.CLIENT.load_table_from_dataframe(
        dataframe, table_id, job_config=job_config
    )
    load_job.result()