0
votes

I have written code to inject CSV file from GCS to BigQuery with hardcoded ProjectID, Dataset, Table name, GCS Temp & Staging location.

I am looking code that should read

  • ProjectID
  • Dataset
  • Table name
  • GCS Temp & Staging location parameters

from BigQuery table(Dynamic parameters).

Code:-

public class DemoPipeline {

public static TableReference getGCDSTableReference() {
    TableReference ref = new TableReference();
    ref.setProjectId("myprojectbq");
    ref.setDatasetId("DS_Emp");
    ref.setTableId("emp");
    return ref;
}
static class TransformToTable extends DoFn<String, TableRow> {
    @ProcessElement
    public void processElement(ProcessContext c) {

        String input = c.element();

        String[] s = input.split(",");
        TableRow row = new TableRow();

        row.set("id", s[0]);
        row.set("name", s[1]);
        c.output(row);

    }
}
public interface MyOptions extends PipelineOptions {

    /*
     * Param
     * 
     */

}

public static void main(String[] args) {

    MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
    options.setTempLocation("gs://demo-xxxxxx/temp");
    Pipeline p = Pipeline.create(options);

    PCollection<String> lines = p.apply("Read From Storage", TextIO.read().from("gs://demo-xxxxxx/student.csv"));

    PCollection<TableRow> rows = lines.apply("Transform To Table",ParDo.of(new TransformToTable()));

    rows.apply("Write To Table",BigQueryIO.writeTableRows().to(getGCDSTableReference())
            //.withSchema(BQTableSemantics.getGCDSTableSchema())
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));

    p.run();
}
}
1
I don't quite understand the question. Do you want to use BigQuery as a source, and load from a specific table and/or dataset based on an element you have process from another source? Or to use it as a sink and to write to a specific table and/or dataset based on elements you have processed from another source?Alex Amato
Thanks Alex for your response. My requirement is to load CSV file from GCS to BigQuery without hardcoding Project ID / dataset / tables names in java code. I want to read those parameters from external storage or dynamic parameters(Template). Kindly advise.Kannan
@Kannan just use a config fileHaris Nadeem
@Haris Nadeem , it would be grateful if you provide some example and how to read config file from GCS. My requirement is to read source CSV file from GCS and compare with config CSV file(I will maintain column names) from GCS then load it into Bigquery. Thanks in advance.Kannan
You can find an example for a config file here: mkyong.com/java/java-properties-file-examples and then you would just package the config file with your jobHaris Nadeem

1 Answers

0
votes

Even to read from an initial table (Project ID / dataset / tables names) where other data is contained, you need to hardcode such information in somewhere. Properties files as Haris recommended is a good approach, look at the following suggestions:

  1. Java Properties file. Used when parameters have to be changed or tuned. In general, changes that don't require new compilation. It's a file that has to live or attached to your java classes. Reading this file from GCS is feasible but a weird option.

  2. Pipeline Execution Parameters. Custom parameters can be a workaround for your question, please check Creating Custom Options to understand how can be accomplished, here is a small example.