I am currently trying to develop a Dataflow pipeline in order to replace some partitions of a partitioned table. I have a custom partition field which is a date. The input of my pipeline is a file with potentially different dates.
I developed a Pipeline :
PipelineOptionsFactory.register(BigQueryOptions.class);
BigQueryOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<TableRow> rows = p.apply("ReadLines", TextIO.read().from(options.getFileLocation()))
.apply("Convert To BQ Row", ParDo.of(new StringToRowConverter(options)));
ValueProvider<String> projectId = options.getProjectId();
ValueProvider<String> datasetId = options.getDatasetId();
ValueProvider<String> tableId = options.getTableId();
ValueProvider<String> partitionField = options.getPartitionField();
ValueProvider<String> columnNames = options.getColumnNames();
ValueProvider<String> types = options.getTypes();
rows.apply("Write to BQ", BigQueryIO.writeTableRows()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withCustomGcsTempLocation(options.getGCSTempLocation())
.to(new DynamicDestinations<TableRow, String>() {
@Override
public String getDestination(ValueInSingleWindow<TableRow> element) {
TableRow date = element.getValue();
String partitionDestination = (String) date.get(partitionField.get());
SimpleDateFormat from = new SimpleDateFormat("yyyy-MM-dd");
SimpleDateFormat to = new SimpleDateFormat("yyyyMMdd");
try {
partitionDestination = to.format(from.parse(partitionDestination));
LOG.info("Table destination "+partitionDestination);
return projectId.get()+":"+datasetId.get()+"."+tableId.get()+"$"+partitionDestination;
} catch(ParseException e){
e.printStackTrace();
return projectId.get()+":"+datasetId.get()+"."+tableId.get()+"_rowsWithErrors";
}
}
@Override
public TableDestination getTable(String destination) {
TimePartitioning timePartitioning = new TimePartitioning();
timePartitioning.setField(partitionField.get());
timePartitioning.setType("DAY");
timePartitioning.setRequirePartitionFilter(true);
TableDestination tableDestination = new TableDestination(destination, null, timePartitioning);
LOG.info(tableDestination.toString());
return tableDestination;
}
@Override
public TableSchema getSchema(String destination) {
return new TableSchema().setFields(buildTableSchemaFromOptions(columnNames, types));
}
})
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
);
p.run();
}
When I trigger the pipeline locally, it successfully replacesthe partitions which date are in the input file. Nevertheless, when deploying on Google Cloud Dataflow and running the template with the exact same parameters, it truncates all the data, and I just have at the end the file I wanted to upload in my table.
Do you know why there is such a difference ?
Thank you !
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
instead of.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND
– Haris Nadeem