I have a pipeline that receive some data from pub sub, do some processing, and needs to process all data on Bigtable based on the result of that processing.
For example, I have a pub sub message like: {clientId: 10}, so I need to read from Bigtable all the data for clientId 10 (I know how to create the scan based on the clientId). The problem is that both reads that we have at the moment for Bigtable (BigtableIO and CloudBigtableIO) are based on the fact that pipeline starts with bigtable, so I can not (or could not find a way) to use them on the middle of the pipeline. How can I achieve this case?
Simple pseudo-like code:
Pipeline p = Pipeline.create(...)
p.apply(PubsubIO.readMessagesWithAttributes ...)
.apply( PubsubMessageToScans()) // I know how to do this
.apply( ReadBigTable()) // How to do this?