1
votes

I got the following exception while running some job that read from g3, then group the data by key. The exception happen during the read.

java.io.IOException: INVALID_ARGUMENT: unable to parse key at com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleWriter.write(Native Method) at com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.outputChunk(ShuffleSink.java:293) at com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.close(ShuffleSink.java:288) at com.google.cloud.dataflow.sdk.util.common.worker.WriteOperation.finish(WriteOperation.java:100) at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:79) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:288) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:221) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Any ideas?

1

1 Answers

1
votes

This exception throws when you try to apply GroupByKey but some of the mapped keys are null.

This code throws exception:

pCollection
            .apply(ParDo.of(new DoFn<KV<MyObject, MyObject>, Object>() {
                @Override
                public void processElement(ProcessContext c) throws Exception {
                    c.output(KV.of(null, c.element()));
                }
            }))
            .apply(GroupByKey.<String, Statusable>create())

You can't write null key. Therefore when your key is nullable you must do something like this:

pCollection
            .apply(ParDo.of(new DoFn<KV<MyObject, MyObject>, Object>() {
                @Override
                public void processElement(ProcessContext c) throws Exception {
                    String key == c.element().getKeyField();
                    if (key == null){
                        // Handle some how....
                        key = ... // not null value

                    }
                    c.output(KV.of(key, c.element()));
                }
            }))