0
votes

I tried to move the data from one table to another table. Used SideInput for filtering the records while transform the data. SideInput also type of KV collection and its loaded the data from another table.

When run my pipeline got "java.lang.IllegalArgumentException: calling sideInput() with unknown view" error.

Here is the entire code that I tried:

{
PipelineOptionsFactory.register(OptionPipeline.class);

OptionPipeline options = PipelineOptionsFactory.fromArgs(args).withValidation().as(OptionPipeline.class);

Pipeline p = Pipeline.create(options);

PCollection<TableRow> sideInputData = p.apply("ReadSideInput",BigQueryIO.readTableRows().from(options.getOrgRegionMapping()));
PCollection<KV<String,String>> sideInputMap = sideInputData.apply(ParDo.of(new getSideInputDataFn()));
final PCollectionView<Map<String,String>> sideInputView = sideInputMap.apply(View.<String,String>asMap());



PCollection<TableRow> orgMaster = p.apply("ReadOrganization",BigQueryIO.readTableRows().from(options.getOrgCodeMaster()));
PCollection<TableRow> orgCode = orgMaster.apply(ParDo.of(new gnGetOrgMaster()));


@SuppressWarnings("serial")
PCollection<TableRow> finalResultCollection =  orgCode.apply("Process", ParDo.of(new DoFn<TableRow, TableRow>() 
{
      @ProcessElement
      public void processElement(ProcessContext c) {

          TableRow outputRow = new TableRow();

          TableRow orgCodeRow = c.element();
          String orgCodefromMaster = (String) orgCodeRow.get("orgCode");

          String region = c.sideInput(sideInputView).get(orgCodefromMaster);

          outputRow.set("orgCode", orgCodefromMaster);
          outputRow.set("orgName", orgCodeRow.get("orgName"));
          outputRow.set("orgName", region);
          DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
          Date dateobj = new Date();
          outputRow.set("updatedDate",df.format(dateobj));

          c.output(outputRow);
      }
}));


finalResultCollection.apply(BigQueryIO.writeTableRows()
                     .withSchema(schema)
                     .to(options.getOrgCodeTable())
                     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

p.run().waitUntilFinish();
}
@SuppressWarnings("serial")
static class getSideInputDataFn extends DoFn<TableRow,KV<String, String>>
{
    @ProcessElement
    public void processElement(ProcessContext c)
    {
        TableRow row = c.element();
        c.output(KV.of((String) row.get("orgcode"), (String) row.get("region")));
    }
}
1

1 Answers

0
votes

It looks like the runner is complaining because you never told it about the side input when defining the graph. In this case you call .withSideInputs after the ParDo.of call passing in the reference to the PCollectionView<T> you defined earlier.

@SuppressWarnings("serial")
PCollection<TableRow> finalResultCollection =  orgCode.apply("Process", ParDo.of(new DoFn<TableRow, TableRow>()
{
    @ProcessElement
    public void processElement(ProcessContext c) {

        TableRow outputRow = new TableRow();

        TableRow orgCodeRow = c.element();
        String orgCodefromMaster = (String) orgCodeRow.get("orgCode");

        String region = c.sideInput(sideInputView).get(orgCodefromMaster);

        outputRow.set("orgCode", orgCodefromMaster);
        outputRow.set("orgName", orgCodeRow.get("orgName"));
        outputRow.set("orgName", region);
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
        Date dateobj = new Date();
        outputRow.set("updatedDate",df.format(dateobj));

        c.output(outputRow);
    }
}).withSideInputs(sideInputView));

I didn't test this code but that's what stands out when I look at it.