This is most similar to this question.
I am creating a pipeline in Dataflow 2.x that takes streaming input from a Pubsub queue. Every single message that comes in needs to be streamed through a very large dataset that comes from Google BigQuery and have all the relevant values attached to it (based on a key) before being written to a database.
The trouble is that the mapping dataset from BigQuery is very large - any attempt to use it as a side input fails with the Dataflow runners throwing the error "java.lang.IllegalArgumentException: ByteString would be too long". I have attempted the following strategies:
1) Side input
- As stated,the mapping data is (apparently) too large to do this. If I'm wrong here or there is a work-around for this, please let me know because this would be the simplest solution.
2) Key-Value pair mapping
- In this strategy, I read the BigQuery data and Pubsub message data in the first part of the pipeline, then run each through ParDo transformations that change every value in the PCollections to KeyValue pairs. Then, I run a Merge.Flatten transform and a GroupByKey transform to attach the relevant mapping data to each message.
- The trouble here is that streaming data requires windowing to be merged with other data, so I have to apply windowing to the large, bounded BigQuery data as well. It also requires that the windowing strategies are the same on both datasets. But no windowing strategy for the bounded data makes sense, and the few windowing attempts I've made simply send all the BQ data in a single window and then never send it again. It needs to be joined with every incoming pubsub message.
3) Calling BQ directly in a ParDo (DoFn)
- This seemed like a good idea - have each worker declare a static instance of the map data. If it's not there, then call BigQuery directly to get it. Unfortunately this throws internal errors from BigQuery every time (as in the entire message just says "Internal error"). Filing a support ticket with Google resulted in them telling me that, essentially, "you can't do that".
It seems this task doesn't really fit the "embarrassingly parallelizable" model, so am I barking up the wrong tree here?
EDIT :
Even when using a high memory machine in dataflow and attempting to make the side input into a map view, I get the error java.lang.IllegalArgumentException: ByteString would be too long
Here is an example (psuedo) of the code I'm using:
Pipeline pipeline = Pipeline.create(options);
PCollectionView<Map<String, TableRow>> mapData = pipeline
.apply("ReadMapData", BigQueryIO.read().fromQuery("SELECT whatever FROM ...").usingStandardSql())
.apply("BQToKeyValPairs", ParDo.of(new BQToKeyValueDoFn()))
.apply(View.asMap());
PCollection<PubsubMessage> messages = pipeline.apply(PubsubIO.readMessages()
.fromSubscription(String.format("projects/%1$s/subscriptions/%2$s", projectId, pubsubSubscription)));
messages.apply(ParDo.of(new DoFn<PubsubMessage, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
JSONObject data = new JSONObject(new String(c.element().getPayload()));
String key = getKeyFromData(data);
TableRow sideInputData = c.sideInput(mapData).get(key);
if (sideInputData != null) {
LOG.info("holyWowItWOrked");
c.output(new TableRow());
} else {
LOG.info("noSideInputDataHere");
}
}
}).withSideInputs(mapData));
The pipeline throws the exception and fails before logging anything from within the ParDo
.
Stack trace:
java.lang.IllegalArgumentException: ByteString would be too long: 644959474+1551393497
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.concat(ByteString.java:524)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:576)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.copyFrom(ByteString.java:559)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString$Output.toByteString(ByteString.java:1006)
com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575)
com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320)
com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:951)
com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1000)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:133)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:771)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
View.asSingleton
,View.asMap
, etc.? For example --View.asSingleton
will take a PCollection with a single element and make it visible to the ParDo.View.asMap
will take aPCollection<KV<K, V>>
and make it available as aMap<K, V>
, but will only read the keys you need. – Ben Chambers