0
votes

I tried to join two tables and write into a bigquery table with nested column. Using CoGroupByKey joined two tables, but after that, I could not convert the values into nested table row. I got a type conversion error.

How can I convert the tuple collection into table row for writing bigquery.

Here is the entire code that I tried:

PipelineOptionsFactory.register(BQOptions.class);
                BQOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BQOptions.class);
                    Pipeline p = Pipeline.create(options);
                WithKeys<String, TableRow> headerKey = WithKeys.of(
                (TableRow row) ->
                    String.format("%s",
                        row.get("empNo")))
                .withKeyType(TypeDescriptors.strings());
                PCollection<KV<String,TableRow>> empRow = p.apply("ReadTable1",BigQueryIO.readTableRows().from(options.getInputData1())).apply("WithKeys", headerKey);      PCollection<KV<String,TableRow>> detailRow = p.apply("ReadTable2",BigQueryIO.readTableRows().from(options.getInputData2())).apply("WithKeys", headerKey);

                final TupleTag<TableRow> table1Tag = new TupleTag<>();      final TupleTag<TableRow> table2Tag = new TupleTag<>();
                PCollection<KV<String,TableRow>> empInfo = empRow.apply(ParDo.of(new fnGetEmp()));      PCollection<KV<String,TableRow>> detailInfo = detailRow.apply(ParDo.of(new fnGetDetail()));
                PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
                .of(table1Tag, empInfo)
                .and(table2Tag, detailInfo)
                .apply(CoGroupByKey.<String>create());

                        @SuppressWarnings("serial")         PCollection<TableRow> finalResultCollection =
                  kvpCollection.apply("Process", ParDo.of(
                    new DoFn<KV<String, CoGbkResult>, TableRow>() {
                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        KV<String, CoGbkResult> e = c.element();

                        TableRow targetRow = new TableRow();
                        targetRow.set("empNo", (String) e.getKey());
                        targetRow.set("empName", e.getValue().getOnly(table1Tag).get("empName"));
                        for (TableRow eventInfo : c.element().getValue().getAll(table2Tag)) {
                             targetRow.set("email", eventInfo.get("email"));
                             targetRow.set("phone", (Integer)eventInfo.get("phone"));
                        }

                        c.output(targetRow);
                      }
                  }));

                TableSchema tableSchema =
                  new TableSchema().setFields(ImmutableList.of(
                      new TableFieldSchema().setName("empNo").setType("STRING"),
                      new TableFieldSchema().setName("empName").setType("STRING"),
                      new TableFieldSchema().setName("details").setMode("REPEATED").setType("RECORD")
                                            .setFields(ImmutableList.of(
                                                new TableFieldSchema().setName("email").setType("STRING"),
                                                new TableFieldSchema().setName("phone").setType("INTEGER")))));
                finalResultCollection.apply(BigQueryIO.writeTableRows()
                    .to(options.getBigQueryTablename())
                    .withSchema(tableSchema)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));        p.run().waitUntilFinish();
1

1 Answers

0
votes

The below code snippet helps to build nested structure Table in BigQuery.

PCollection<TableRow> finalResultCollection =kvpCollection.apply("Process", ParDo.of(new DoFn<KV<String, CoGbkResult>, TableRow>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                KV<String, CoGbkResult> e = c.element();

                TableRow targetRow = new TableRow();
                targetRow.set("empNo", (String) e.getKey());

                targetRow.set("empName", e.getValue().getOnly(table1Tag).get("empName"));
                List<TableRow> nested = new ArrayList<>();

                for (TableRow eventInfo : c.element().getValue().getAll(table2Tag)) {
                    TableRow subRow = new TableRow();
                    subRow.set("email", eventInfo.get("email"));
                    subRow.set("phone", (Integer)eventInfo.get("phone"));
                    nested.add(subRow);
                }

                targetRow.set("NestedRecord", nested);

                c.output(targetRow);
            }
        }));