13
votes

I'm currently writing a Java utility to import few CSV files from GCS into BigQuery. I can easily achieve this by bq load, but I wanted to do it using a Dataflow job. So I'm using Dataflow's Pipeline and ParDo transformer (returns TableRow to apply it on the BigQueryIO) and I have created the StringToRowConverter() for the transformation. Here the actual problem starts - I am forced to specify the schema for the destination table although I don't want to create a new table if it doesn't exist - only trying to load data. So I do not want to manually set the column name for the TableRow as I have about 600 columns.

public class StringToRowConverter extends DoFn<String, TableRow> {

private static Logger logger = LoggerFactory.getLogger(StringToRowConverter.class);

public void processElement(ProcessContext c) {
    TableRow row = new TableRow();          
    row.set("DO NOT KNOW THE COLUMN NAME", c.element());
    c.output(row);
}
}

Moreover, it is assumed that the table already exists in the BigQuery dataset and I don't need to create it, and also the CSV file contains the columns in a correct order.

If there's no workaround to this scenario and the column name is needed for the data load, then I can have it in the first row of the CSV file.

Any help will be appreciated.

1

1 Answers

7
votes

To avoid the creation of the table, you should use the BigQueryIO.Write.CreateDisposition.CREATE_NEVER of the BigQueryIO.Write during the pipeline configuration. Source: https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/BigQueryIO.Write

You don't need to know a BigQuery table schema upfront, you can discover it dynamically. For instance, you can use the BigQuery API (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get) to query a table schema and pass it as a parameter for class StringToRowConverter. Another option and assuming that first row is a header, is to skip the first row and use it to map the rest of the file correctly.

The code below implements the 2nd approach and also configures the output to append to an existing BigQuery table.

public class DFJob {

    public static class StringToRowConverter extends DoFn<String, TableRow> {

        private String[] columnNames;

        private boolean isFirstRow = true;

        public void processElement(ProcessContext c) {
            TableRow row = new TableRow();

            String[] parts = c.element().split(",");

            if (isFirstRow) {
                columnNames = Arrays.copyOf(parts, parts.length);
                isFirstRow = false;
            } else {
                for (int i = 0; i < parts.length; i++) {
                    row.set(columnNames[i], parts[i]);
                }
                c.output(row);
            }
        }
    }

    public static void main(String[] args) {
        DataflowPipelineOptions options = PipelineOptionsFactory.create()
                .as(DataflowPipelineOptions.class);
        options.setRunner(BlockingDataflowPipelineRunner.class);

        Pipeline p = Pipeline.create(options);

        p.apply(TextIO.Read.from("gs://dataflow-samples/myfile.csv"))
                .apply(ParDo.of(new StringToRowConverter()))
                .apply(BigQueryIO.Write.to("myTable")
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

        PipelineResult result = p.run();
    }
}