I'm fairly new to the Google Cloud Platform and I'm trying Google Dataflow for the first time for a project for my postgraduate programme. What I want to do is write an automated load job that loads files from a certain bucket on my Cloud Storage and inserts the data from it into a BigQuery table.
I get the data as a PCollection<String>
type, but for insertion in BigQuery I apparently need to transform it to a PCollection<TableRow>
type. So far I haven't found a solid answer to do this.
Here's my code:
public static void main(String[] args) {
//Defining the schema of the BigQuery table
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("Datetime").setType("TIMESTAMP"));
fields.add(new TableFieldSchema().setName("Consumption").setType("FLOAT"));
fields.add(new TableFieldSchema().setName("MeterID").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
//Creating the pipeline
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
//Getting the data from cloud storage
PCollection<String> lines = p.apply(TextIO.Read.named("ReadCSVFromCloudStorage").from("gs://mybucket/myfolder/certainCSVfile.csv"));
//Probably need to do some transform here ...
//Inserting data into BigQuery
lines.apply(BigQueryIO.Write
.named("WriteToBigQuery")
.to("projectID:datasetID:tableID")
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
}
I'm probably just forgetting something basic, so I hope you guys can help me with this ...
PCollection<String>
using ParDo. – Andy Turner