I'm using TextIO for reading from Cloud Storage. As I want to have the job running continously, I use watchForNewFiles.
For completeness, the data I read is working fine if I use bounded PCollections (no watchForNewFiles and BigQueryIO in batch mode), so there is no data issue.
I have p.run().waitUntilFinish(); in my code, so the pipeline runs. And it does not give any error.
Apache beam version is 2.8.0
PCollection<String> stream =
p.apply("Read File", TextIO
.read()
.from(options.getInput())
.watchForNewFiles(
Duration.standardMinutes(1),
Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))
)
.withCompression(Compression.AUTO));
This works perfectly fine and reads files as soon as they are available. the PCollection is unbounded and contains lines of text from these files.
After doing some transformations
PCollection<List<String>> lines = stream.apply("Parse CSV",
ParDo.of(new ParseCSV())
);
PCollection<TableRow> rows = lines.apply("Convert to BQ",
ParDo.of(new BigQueryConverter(schema))
);
The ParseCSV step adds timestamps to its receiver via outputWithTimestamp.
I end up with a PCollection of TableRows ready to stream to BigQuery. For that, I use
WriteResult result = rows.apply("WriteToBigQuery",
BigQueryIO.
<TableRow>write()
.withFormatFunction(input -> input)
.withSchema(bqSchema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withExtendedErrorInfo()
.to(options.getOutput())
);
This never writes data to BigQuery. If I take a look into the UI, I see that BigQueryIO does
- ShardTableWrites
- TagWithUniqueId
- Reshuffle
- Window.into
- GroupByKey
Data enters and leaves the first two steps. But never the Reshuffle. This only reads data but never passes data on. The step inside Reshuffle which causes that is GroupByKey.
As the collection is unbounded, I tried to configure the window with
lines = lines.apply(Window.configure()
.<List<String>>into(FixedWindows
.of(Duration.standardSeconds(10))
)
);
which should force anything doing a GroupByKey to release a window after 10 seconds. But it does not.
lines = lines.apply(Window.configure()
.<List<String>>into(FixedWindows
.of(Duration.standardSeconds(10))
)
.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10)))
.withAllowedLateness(Duration.standardSeconds(0))
.discardingFiredPanes()
);
Adding a specific trigger on processing time also did not help. Any clue? Thanks in advance!