4
votes

I am currently trying to develop a Dataflow pipeline in order to replace some partitions of a partitioned table. I have a custom partition field which is a date. The input of my pipeline is a file with potentially different dates.

I developed a Pipeline :

    PipelineOptionsFactory.register(BigQueryOptions.class);
    BigQueryOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryOptions.class);

    Pipeline p = Pipeline.create(options);

    PCollection<TableRow> rows =  p.apply("ReadLines", TextIO.read().from(options.getFileLocation()))
            .apply("Convert To BQ Row", ParDo.of(new StringToRowConverter(options)));



    ValueProvider<String>  projectId = options.getProjectId();
    ValueProvider<String> datasetId = options.getDatasetId();
    ValueProvider<String> tableId = options.getTableId();
    ValueProvider<String> partitionField = options.getPartitionField();
    ValueProvider<String> columnNames = options.getColumnNames();
    ValueProvider<String> types = options.getTypes();

    rows.apply("Write to BQ", BigQueryIO.writeTableRows()
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
            .withCustomGcsTempLocation(options.getGCSTempLocation())
            .to(new DynamicDestinations<TableRow, String>() {

                @Override
                public String getDestination(ValueInSingleWindow<TableRow> element) {

                    TableRow date = element.getValue();

                    String partitionDestination = (String) date.get(partitionField.get());

                    SimpleDateFormat from = new SimpleDateFormat("yyyy-MM-dd");
                    SimpleDateFormat to = new SimpleDateFormat("yyyyMMdd");

                    try {

                        partitionDestination = to.format(from.parse(partitionDestination));
                        LOG.info("Table destination "+partitionDestination);
                        return projectId.get()+":"+datasetId.get()+"."+tableId.get()+"$"+partitionDestination;

                    } catch(ParseException e){
                        e.printStackTrace();
                        return projectId.get()+":"+datasetId.get()+"."+tableId.get()+"_rowsWithErrors";
                    }
                }

                @Override
                public TableDestination getTable(String destination) {

                    TimePartitioning timePartitioning = new TimePartitioning();
                    timePartitioning.setField(partitionField.get());
                    timePartitioning.setType("DAY");
                    timePartitioning.setRequirePartitionFilter(true);

                    TableDestination tableDestination  = new TableDestination(destination, null, timePartitioning);

                    LOG.info(tableDestination.toString());

                    return tableDestination;

                }

                @Override
                public TableSchema getSchema(String destination) {

                        return new TableSchema().setFields(buildTableSchemaFromOptions(columnNames, types));
                }
            })
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
    );

    p.run();
}

When I trigger the pipeline locally, it successfully replacesthe partitions which date are in the input file. Nevertheless, when deploying on Google Cloud Dataflow and running the template with the exact same parameters, it truncates all the data, and I just have at the end the file I wanted to upload in my table.

Do you know why there is such a difference ?

Thank you !

1
There should be no difference between running locally and in the cloud. Are you sure what you've described is actually happening?Graham Polley
Hello Graham, thank you for your answer, yes I am sure : I generated a template from my beam code, and when I run it with the exact same parameters it overwrites all my partitions.John David
@GrahamPolley, I have also tried to launch the pipeline with the Dataflow runner (instead of generating a template), and it still overwrites all the partitionsJohn David
Is it possible because you are using .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) instead of .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPENDHaris Nadeem
Hello @HarisNadeem, the use case is this one : I have a partitioned table, let's say I have at t 3 partitions for the following dates : 2018-05-01, 2018-05-02, 2018-05-03. At t+1, I have an input file containing data for the 2018-05-02, 2018-05-03. What I want to do is to replace the current partitions for these dates, and leave unchanged the one of 2018-05-01. When I execute the pipeline I developed with the direct runner, everything works fine. But, when I trigger it with the DataflowRunner, it overwrites all the partitions, and I just have in my output 2 partitions.John David

1 Answers

2
votes

You specified BigQueryIO.Write.CreateDisposition to CREATE_IF_NEEDED, and this is paired with BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE, so even if the table exists, it may be recreated. This is the reason why you see your table getting replaced.

See this document [1] for details.

[1] https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/BigQueryIO.Write.CreateDisposition#CREATE_IF_NEEDED