0
votes

I have a PCollection [String] say "X" that I need to dump in a BigQuery table. The table destination and the schema for it is in a PCollection[TableRow] say "Y". How to accomplish this in the simplest manner?

I tried extracting the table and schema from "Y" and saving it in static global variables (tableName and schema respectively). But somehow oddly the BigQueryIO.writeTableRows() always gets the value of the variable tableName as null. But it gets the schema. I tried logging the values of those variables and I can see the values are there for both.

Here is my pipeline code:

static String tableName;
static TableSchema schema;

PCollection<String> read = p.apply("Read from input file",
  TextIO.read().from(options.getInputFile()));

PCollection<TableRow> tableRows = p.apply(
  BigQueryIO.read().fromQuery(NestedValueProvider.of(
    options.getfilename(),
    new SerializableFunction<String, String>() {
         @Override
         public String apply(String filename) {
           return "SELECT table,schema FROM `BigqueryTest.configuration` WHERE file='" + filename +"'";
         }
    })).usingStandardSql().withoutValidation());

final PCollectionView<List<String>> dataView = read.apply(View.asList());

tableRows.apply("Convert data read from file to TableRow",
  ParDo.of(new DoFn<TableRow,TableRow>(){
    @ProcessElement
    public void processElement(ProcessContext c) {
      tableName = c.element().get("table").toString();
      String[] schemas = c.element().get("schema").toString().split(",");
      List<TableFieldSchema> fields = new ArrayList<>();
      for(int i=0;i<schemas.length;i++) {
        fields.add(new TableFieldSchema()
          .setName(schemas[i].split(":")[0]).setType(schemas[i].split(":")[1]));
      }
      schema = new TableSchema().setFields(fields);

      //My code to convert data to TableRow format.
    }}).withSideInputs(dataView)); 


tableRows.apply("write to BigQuery", 
  BigQueryIO.writeTableRows()
    .withSchema(schema)
    .to("ProjectID:DatasetID."+tableName)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

Everything works fine. Only BigQueryIO.write operation fails and I get the error TableId is null.

I also tried using SerializableFunction and returning the value from there but i still get null.

Here is the code that I tried for it:

tableRows.apply("write to BigQuery",
BigQueryIO.writeTableRows()
  .withSchema(schema)
  .to(new GetTable(tableName))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

public static class GetTable implements SerializableFunction<String,String> {
  String table;

  public GetTable() {
    this.table = tableName;
  }

  @Override
  public String apply(String arg0) {
    return "ProjectId:DatasetId."+table;
  }
}

I also tried using DynamicDestinations but I get an error saying schema is not provided. Honestly I'm new to the concept of DynamicDestinations and I'm not sure that I'm doing it correctly.

Here is the code that I tried for it:

tableRows2.apply(BigQueryIO.writeTableRows()
  .to(new DynamicDestinations<TableRow, TableRow>() {
    private static final long serialVersionUID = 1L;
    @Override
    public TableDestination getTable(TableRow dest) {
      List<TableRow> list = sideInput(bqDataView); //bqDataView contains table and schema
      String table = list.get(0).get("table").toString();
      String tableSpec = "ProjectId:DatasetId."+table;
      String tableDescription = "";
      return new TableDestination(tableSpec, tableDescription);
    }

    public String getSideInputs(PCollectionView<List<TableRow>> bqDataView) {
      return null;
    }

    @Override
    public TableSchema getSchema(TableRow destination) {
      return schema;   //schema is getting added from the global variable
    }
    @Override
    public TableRow getDestination(ValueInSingleWindow<TableRow> element) {
      return null;
    }
}.getSideInputs(bqDataView)));

Please let me know what I'm doing wrong and which path I should take.

Thank You.

1

1 Answers

0
votes

Part of the reason your having trouble is because of the two stages of pipeline execution. First the pipeline is constructed on your machine. This is when all of the applications of PTransforms occur. In your first example, this is when the following lines are executed:

BigQueryIO.writeTableRows()
  .withSchema(schema)
  .to("ProjectID:DatasetID."+tableName)

The code within a ParDo however runs when your pipeline executes, and it does so on many machines. So the following code runs much later than the pipeline construction:

@ProcessElement
public void processElement(ProcessContext c) {
  tableName = c.element().get("table").toString();
  ...
  schema = new TableSchema().setFields(fields);
  ...
}

This means that neither the tableName nor the schema fields will be set at when the BigQueryIO sink is created.

Your idea to use DynamicDestinations is correct, but you need to move the code to actually generate the schema the destination into that class, rather than relying on global variables that aren't available on all of the machines.