there!I'm new to Cloud-DataFlow.
I'm using DataflowPipelineRunner to read a csv file and output the result to BigQuery.It works well when the csv file's size is small(only 20 records,less than 1MB),but went OOM error while the file's size becomes huge(over 10million records, about 616.42 MB).
Below is the error message:
java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at com.google.cloud.dataflow.sdk.util.StreamUtils.getBytes(StreamUtils.java:63) at co.coder.MyCoder.decode(MyCoder.java:54) at co.coder.MyCoder.decode(MyCoder.java:1) at com.google.cloud.dataflow.sdk.io.TextIO$TextSource$TextBasedReader.decodeCurrentElement(TextIO.java:1065) at com.google.cloud.dataflow.sdk.io.TextIO$TextSource$TextBasedReader.readNextRecord(TextIO.java:1052) at com.google.cloud.dataflow.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:536) at com.google.cloud.dataflow.sdk.io.OffsetBasedSource$OffsetBasedReader.advance(OffsetBasedSource.java:287) at com.google.cloud.dataflow.sdk.runners.worker.WorkerCustomSources$BoundedReaderIterator.advance(WorkerCustomSources.java:541) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation$SynchronizedReaderIterator.advance(ReadOperation.java:425) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:217) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:182) at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:69) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:284) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:220) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:170) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:192) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:172) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:159) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
From the error message,error happened in [MyCoder.java:54].MyCoder is a subclass of CustomCoder implemented by me,which is going to encode csv file from Shift-JIS to UTF-8:
53:@Override
54:public String decode(InputStream inStream, Context context) throws CoderException, IOException {
55: if (context.isWholeStream) {
56: byte[] bytes = StreamUtils.getBytes(inStream);
57: return new String(bytes, Charset.forName("Shift_JIS"));
58: } else {
59: try {
60: return readString(new DataInputStream(inStream));
61: } catch (EOFException | UTFDataFormatException exn) {
62: // These exceptions correspond to decoding problems, so change
63: // what kind of exception they're branded as.
64: throw new CoderException(exn);
65: }
66: }
67:}
and alse,here is how I ran the DataflowPipelineRunner:
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setRunner(DataflowPipelineRunner.class);
options.setProject(projectId);
options.setStagingLocation(stagingFolderPathInGCS);
options.setWorkerMachineType("n1-highmem-4");
options.setMaxNumWorkers(5);
Pipeline p = Pipeline.create(options);
// read csv from gcs
PCollection<String> lines = p.apply(TextIO.Read.named("csv input")
.from("gs://" + bucketName + "/original/" + fileName).withCoder(MyCoder.of()));
lines.apply(TextIO.Write.named("csv output").to("gs://" + bucketName + "/encoded/" + fileName)
.withCoder(StringUtf8Coder.of()).withoutSharding().withHeader("test Header"));
p.run();
Since Dataflow is a scalable cloud service for big data,so I am quiet a little confused for this OOM error,can anyone explain to me why the [OutOfMemoryError] happened and how to resolve it?
Thanks very much!
withoutSharding()will severely limit performance since the write cannot be parallelized, but that should not cause a crash. What is thereadString()called in your coder? - Kenn Knowlesprivate static String readString(DataInputStream dis) throws IOException { int len = VarInt.decodeInt(dis); if (len < 0) { throw new CoderException("Invalid encoded string length: " + len); } byte[] bytes = new byte[len]; dis.readFully(bytes); return new String(bytes, Charset.forName("Shift_JIS")); }hello,Kenn Knowles,readString() is just like this. - xialin