2
votes

I am using the following code to populate data into Bigtable:

CloudBigtableScanConfiguration config = new CloudBigtableScanConfiguration.Builder()
                .withConfiguration("clusterId", options.getBigTableClusterId())
                .withProjectId(options.getProject())
                .withInstanceId(options.getBigTableInstanceId())
                .withTableId(options.getOutputBTTable())
                .build();
     Pipeline p = Pipeline.create(options);
     /**
      * Read Data from Big Query
      */
     CloudBigtableIO.initializeForWrite(p);
     p.apply(BigQueryIO.Read.fromQuery(getQuery(options.getDate())))
        .apply(ParDo.of(new DoFn<TableRow, Mutation>() {
           public void processElement(ProcessContext c) {
             Mutation output = convertDataToRow(c.element());
             if (output != null) { 
                 c.output(output); 
                 };
           }

           }))
         .apply(CloudBigtableIO.writeToTable(config));
     p.run();

private static Mutation convertDataToRow(TableRow element) {
     LOG.info("element: "+ element);
     if(element.get("BASM_AID") != null){
         Put obj = new Put(getRowKey(element).getBytes()).addColumn(SEGMENT_FAMILY, SEGMENT_COLUMN_NAME, ((String)element.get("BAS_category")).getBytes() );
                obj.addColumn(USER_FAMILY, AID, ((String)element.get("BASM_AID")).getBytes());
         if(element.get("BASM_segment_id") != null){
                obj.addColumn(SEGMENT_FAMILY, SEGMENT_ID, ((String)element.get("BASM_segment_id")).getBytes());
         }
         if(element.get("BAS_sub_category") != null){
                obj.addColumn(SEGMENT_FAMILY, SUB_CATEGORY, ((String)element.get("BAS_sub_category")).getBytes());
         }
         if(element.get("BAS_name") != null){
                obj.addColumn(SEGMENT_FAMILY, NAME, ((String)element.get("BAS_name")).getBytes());
         }
         if(element.get("BAS_description") != null){
                obj.addColumn(SEGMENT_FAMILY, DESCRIPTION, ((String)element.get("BAS_description")).getBytes());
         }
         if(element.get("BASM_krux_user_id") != null){
             obj.addColumn(USER_FAMILY, KRUX_USER_ID, ((String)element.get("BASM_krux_user_id")).getBytes());
         }
         if(element.get("BAS_last_compute_day") != null){
                obj.addColumn(SEGMENT_FAMILY, LAST_COMPUTE_DAY, ((String)element.get("BAS_last_compute_day")).getBytes());
         }
         if(element.get("BAS_type") != null){
                obj.addColumn(SEGMENT_FAMILY, TYPE, ((String)element.get("BAS_type")).getBytes());
         }      
         if(element.get("BASM_REGID") != null){
                obj.addColumn(USER_FAMILY, REGID, ((String)element.get("BASM_REGID")).getBytes() );
         }
        return obj;
     }else{
         return null;
     }
    }

We have 30 Bigtable Nodes and my data flow job is working with 100 workers, the whole process has to process around 10 billion rows of data, with the above configuration my job is taking more than one day to complete which is not ideal.

Any suggestions at the code level through which we can run our job little faster, I know increasing the number of Bigtable nodes is one of the option but currently, I am looking for other options where we don't have to increase the nodes.

2

2 Answers

1
votes

You might wish to look at this question. Based on that, you'll want to look at the write bandwidth and if you are above 80%, you might wish to either cut down the number of your worker nodes, or ask for a quota increase and increase the size of your cluster.

0
votes

Bulk loading a lot of data into an empty table (whether in Bigtable or HBase) is going to have performance problems, unless you pre-split the table in advance, because initially, there are zero tablets, so all the writes are going to a single server node, not being well-distributed.

Thus, regardless of your cluster size, an initial bulk load will not achieve high performance because it won't be parallelized.

To fix this, you need to create a table with pre-splits. You can see examples for how to create tables in Cloud Bigtable via the HBase shell with pre-splits. When the we ran a benchmark of loading data into Bigtable and HBase, we also pre-split the tables.

Writing into an existing table at steady-state with a lot of data will have many tablets, well-distributed on the cluster, so it will perform well. However, if you're doing a bulk load for an empty table, it must be pre-split.