0
votes

I have a streaming job in which I'm listening to message from PubSub and after that reading the data from BigQuery. Data has is queried using the data received from PubSUb. This means I need to form the query dynamically and then pass it to the BigQueryIO.Read.fromQuery() function. Below is the code which is going to read data from the BigQuery and return a TableRow, but it is giving me NullPointerException where my code is executing data to read.

public class RequestDailyUsageTransform extends PTransform<PCollection<DailyUsageJob>, PCollection<TableRow>> {

    private String mQuery;

    private String mForDate;
    private LocalDateTime billingDateTime;

    @Override
    public PCollection<TableRow> apply(PCollection<DailyUsageJob> input) {

        TableReference tableReference = getRequestTableReference();

        return input
                .apply(ParDo.named("RequestUsageQuery")
                        .of(new RequestUsageQueryStringDoFn()))
                .apply(BigQueryIO.Read.named("RequestUsageReader")
                        .fromQuery(mQuery)
                        .from(tableReference).withoutValidation())
                .apply(ParDo.named("DailyRequestMapper").of(new DailyRequestMapperDoFn()))
                .apply(ParDo.named("BillDailyRequestUsage")
                        .of(new DailyRequestsBillDoFn(mForDate, billingDateTime)));
    }}

I also wanted to know how to pass the string which was generated in a DoFn in BigQueryIO.Read.fromQuery() function.

1
Hello, would it be possible to provide the full stacktrace please? Also, which version of dataflow are you using? As for passing in query calculated in a DoFn. I don't think that fits well into the dataflow model. Would you please explain your use case a bit more. Why do you want to construct the query at runtime? Are you trying to lookup specific information based on an element/key? The BigQueryIO source query is supposed to lookup data across all keys, and then that data is passed through your pipeline and sharded by a key. Each DoFn operates on a specific shard/key of your data.Alex Amato
@AlexAmato Daily on a specific time my back-end application will send notifications to the streaming job to perform some task for each user. Each message will have the time and the userid for which the task has to be performed. I wanna form a query dynamically which will query data only for that user between the time mentioned inside the message.ghost
I think in the case the best thing to do would be to run a daily batch job that queries all the data, and is keyed by the userid. This will pull in more data than you would like, but allow you to locate the per user information. Unfortuately there is not currently a way do perform data dependent readsAlex Amato

1 Answers

0
votes

I think in the case the best thing to do would be to run a daily batch job that queries all the data, and is keyed by the userid. This will pull in more data than you would like, but allow you to locate the per user information. Unfortuately there is not currently a way do perform data dependent reads