1
votes

I have one source CSV file (without header) as well as header config CSV file (contains only column names) in GCS. I also have static table in Bigquery. I want to load source file into static table by using column header mapping (config file).

I was tried with different approach earlier(I was maintain source file which contain header and data in same file and then tried to split header from source file then insert those data into Bigquery by using header column mapping. I noticed this approach is NOT possible because dataflow shuffle data into multiple worker node. so i dropped this approach.

The below code i have used hard coded column names. I am looking approach to read column names from external config file (I want to make my code as dynamic).

package com.coe.cog;

import java.io.BufferedReader;
import java.util.*;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;

public class SampleTest {
            private static final Logger LOG = LoggerFactory.getLogger(SampleTest.class);

            public static TableReference getGCDSTableReference() {
                        TableReference ref = new TableReference();
                        ref.setProjectId("myownproject");
                        ref.setDatasetId("DS_Employee");
                        ref.setTableId("tLoad14");
                        return ref;
            }

            static class TransformToTable extends DoFn<String, TableRow> {
                        @ProcessElement
                        public void processElement(ProcessContext c) {

                            String csvSplitBy = ",";

                            String lineHeader = "ID,NAME,AGE,SEX"; // Hard code column name but i want to read these header from GCS file.

                            String[] colmnsHeader = lineHeader.split(csvSplitBy); //Only Header array

                            String[] split = c.element().split(csvSplitBy); //Data section

                                TableRow row = new TableRow();


                                for (int i = 0; i < split.length; i++) {                                 

                                  row.set(colmnsHeader[i], split[i]);

                                }

                              c.output(row);
                           // }
                            }

            }

            public interface MyOptions extends PipelineOptions {

                        /*
                        * Param
                        *
                        */

            }

            public static void main(String[] args) {

                        MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

                        options.setTempLocation("gs://demo-bucket-data/temp");

                        Pipeline p = Pipeline.create(options);

                        PCollection<String> lines = p.apply("Read From Storage", TextIO.read().from("gs://demo-bucket-data/Demo/Test/SourceFile_WithOutHeader.csv"));

                        PCollection<TableRow> rows = lines.apply("Transform To Table",ParDo.of(new TransformToTable()));

                        rows.apply("Write To Table",BigQueryIO.writeTableRows().to(getGCDSTableReference())                                              
                                                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                                                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));

                        p.run();
            }
}

Source File:

1,John,25,M 
2,Smith,30,M
3,Josephine,20,F

Config File (Headers only):

ID,NAME,AGE,SEX
2

2 Answers

4
votes

You have a couple of options:

  1. Use a Dataflow/Beam side input to read the config/header file into some sort of collection e.g. a a ArrayList. It will be available to all workers in the cluster. You can then use the side input to dynamically assign the schema to the BigQuery table using DynamicDestinations.
  2. Before dropping into your Dataflow pipeline, call the GCS api directly to grab your config/header file, parse it and then it the results to setup your pipeline.
0
votes

Using Beam's FileSystems API for reading config files from GCS, is another approach.

Advantages:

  • No need of additional dependencies, it's included with beam API.
  • Using GCP's client libraries can lead to dependency version issues.
  • We can use a beam's FileSystems API in any transforms.

Here is a snippet for reading files.

//filePath format: gs://bucket/file

    public static String loadSchema(String filePath) {
            MatchResult.Metadata metadata;
            try {
                metadata = FileSystems.matchSingleFileSpec(filePath);  // searching 
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            String schema;
    
            try {
               // reading file
                schema = CharStreams.toString(
                    Channels.newReader(
                            FileSystems.open(metadata.resourceId()),
                            StandardCharsets.UTF_8.name()
                    )
                );
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            // returning content as string. We can process it now. 
            return schema;
        } 

Disadvantages of Sideinput

  • File's orientation changes.
  • It's hard to parse multiline file like Json and others.

Side Input can work for single line static values.