I am using the following code to populate data into Bigtable:
CloudBigtableScanConfiguration config = new CloudBigtableScanConfiguration.Builder()
.withConfiguration("clusterId", options.getBigTableClusterId())
.withProjectId(options.getProject())
.withInstanceId(options.getBigTableInstanceId())
.withTableId(options.getOutputBTTable())
.build();
Pipeline p = Pipeline.create(options);
/**
* Read Data from Big Query
*/
CloudBigtableIO.initializeForWrite(p);
p.apply(BigQueryIO.Read.fromQuery(getQuery(options.getDate())))
.apply(ParDo.of(new DoFn<TableRow, Mutation>() {
public void processElement(ProcessContext c) {
Mutation output = convertDataToRow(c.element());
if (output != null) {
c.output(output);
};
}
}))
.apply(CloudBigtableIO.writeToTable(config));
p.run();
private static Mutation convertDataToRow(TableRow element) {
LOG.info("element: "+ element);
if(element.get("BASM_AID") != null){
Put obj = new Put(getRowKey(element).getBytes()).addColumn(SEGMENT_FAMILY, SEGMENT_COLUMN_NAME, ((String)element.get("BAS_category")).getBytes() );
obj.addColumn(USER_FAMILY, AID, ((String)element.get("BASM_AID")).getBytes());
if(element.get("BASM_segment_id") != null){
obj.addColumn(SEGMENT_FAMILY, SEGMENT_ID, ((String)element.get("BASM_segment_id")).getBytes());
}
if(element.get("BAS_sub_category") != null){
obj.addColumn(SEGMENT_FAMILY, SUB_CATEGORY, ((String)element.get("BAS_sub_category")).getBytes());
}
if(element.get("BAS_name") != null){
obj.addColumn(SEGMENT_FAMILY, NAME, ((String)element.get("BAS_name")).getBytes());
}
if(element.get("BAS_description") != null){
obj.addColumn(SEGMENT_FAMILY, DESCRIPTION, ((String)element.get("BAS_description")).getBytes());
}
if(element.get("BASM_krux_user_id") != null){
obj.addColumn(USER_FAMILY, KRUX_USER_ID, ((String)element.get("BASM_krux_user_id")).getBytes());
}
if(element.get("BAS_last_compute_day") != null){
obj.addColumn(SEGMENT_FAMILY, LAST_COMPUTE_DAY, ((String)element.get("BAS_last_compute_day")).getBytes());
}
if(element.get("BAS_type") != null){
obj.addColumn(SEGMENT_FAMILY, TYPE, ((String)element.get("BAS_type")).getBytes());
}
if(element.get("BASM_REGID") != null){
obj.addColumn(USER_FAMILY, REGID, ((String)element.get("BASM_REGID")).getBytes() );
}
return obj;
}else{
return null;
}
}
We have 30 Bigtable Nodes and my data flow job is working with 100 workers, the whole process has to process around 10 billion rows of data, with the above configuration my job is taking more than one day to complete which is not ideal.
Any suggestions at the code level through which we can run our job little faster, I know increasing the number of Bigtable nodes is one of the option but currently, I am looking for other options where we don't have to increase the nodes.