I have a streaming job in which I'm listening to message from PubSub and after that reading the data from BigQuery. Data has is queried using the data received from PubSUb. This means I need to form the query dynamically and then pass it to the BigQueryIO.Read.fromQuery() function. Below is the code which is going to read data from the BigQuery and return a TableRow, but it is giving me NullPointerException where my code is executing data to read.
public class RequestDailyUsageTransform extends PTransform<PCollection<DailyUsageJob>, PCollection<TableRow>> {
private String mQuery;
private String mForDate;
private LocalDateTime billingDateTime;
@Override
public PCollection<TableRow> apply(PCollection<DailyUsageJob> input) {
TableReference tableReference = getRequestTableReference();
return input
.apply(ParDo.named("RequestUsageQuery")
.of(new RequestUsageQueryStringDoFn()))
.apply(BigQueryIO.Read.named("RequestUsageReader")
.fromQuery(mQuery)
.from(tableReference).withoutValidation())
.apply(ParDo.named("DailyRequestMapper").of(new DailyRequestMapperDoFn()))
.apply(ParDo.named("BillDailyRequestUsage")
.of(new DailyRequestsBillDoFn(mForDate, billingDateTime)));
}}
I also wanted to know how to pass the string which was generated in a DoFn in BigQueryIO.Read.fromQuery() function.