I tried to join two tables and write into a bigquery table with nested column. Using CoGroupByKey
joined two tables, but after that, I could not convert the values into nested table row. I got a type conversion error.
How can I convert the tuple collection into table row for writing bigquery.
Here is the entire code that I tried:
PipelineOptionsFactory.register(BQOptions.class);
BQOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BQOptions.class);
Pipeline p = Pipeline.create(options);
WithKeys<String, TableRow> headerKey = WithKeys.of(
(TableRow row) ->
String.format("%s",
row.get("empNo")))
.withKeyType(TypeDescriptors.strings());
PCollection<KV<String,TableRow>> empRow = p.apply("ReadTable1",BigQueryIO.readTableRows().from(options.getInputData1())).apply("WithKeys", headerKey); PCollection<KV<String,TableRow>> detailRow = p.apply("ReadTable2",BigQueryIO.readTableRows().from(options.getInputData2())).apply("WithKeys", headerKey);
final TupleTag<TableRow> table1Tag = new TupleTag<>(); final TupleTag<TableRow> table2Tag = new TupleTag<>();
PCollection<KV<String,TableRow>> empInfo = empRow.apply(ParDo.of(new fnGetEmp())); PCollection<KV<String,TableRow>> detailInfo = detailRow.apply(ParDo.of(new fnGetDetail()));
PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
.of(table1Tag, empInfo)
.and(table2Tag, detailInfo)
.apply(CoGroupByKey.<String>create());
@SuppressWarnings("serial") PCollection<TableRow> finalResultCollection =
kvpCollection.apply("Process", ParDo.of(
new DoFn<KV<String, CoGbkResult>, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, CoGbkResult> e = c.element();
TableRow targetRow = new TableRow();
targetRow.set("empNo", (String) e.getKey());
targetRow.set("empName", e.getValue().getOnly(table1Tag).get("empName"));
for (TableRow eventInfo : c.element().getValue().getAll(table2Tag)) {
targetRow.set("email", eventInfo.get("email"));
targetRow.set("phone", (Integer)eventInfo.get("phone"));
}
c.output(targetRow);
}
}));
TableSchema tableSchema =
new TableSchema().setFields(ImmutableList.of(
new TableFieldSchema().setName("empNo").setType("STRING"),
new TableFieldSchema().setName("empName").setType("STRING"),
new TableFieldSchema().setName("details").setMode("REPEATED").setType("RECORD")
.setFields(ImmutableList.of(
new TableFieldSchema().setName("email").setType("STRING"),
new TableFieldSchema().setName("phone").setType("INTEGER")))));
finalResultCollection.apply(BigQueryIO.writeTableRows()
.to(options.getBigQueryTablename())
.withSchema(tableSchema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); p.run().waitUntilFinish();