0
votes

I have a dataflow pipeline that takes many files from a gcs bucket, extracts the records and applies some transformations, and finally outputs them into parquet files. It is continuously watching the bucket for files making this a streaming pipeline, though for now we have a termination condition to stop the pipeline after it has been 1 minute since the last new file. We are testing with a fixed set of files in the bucket

I initially ran this pipeline in batch mode(no continuous file watching) and from querying the parquet files in bigquery there was about 36 million records. However when I enabled continuous file watching and reran the pipeline the parquet files only contained ~760k records. I doubled checked that in both runs that the input bucket had the same set of files.

The metrics on the streaming job details page does not match up at all with what was outputted. Going by the section Elements added (Approximate) it says ~21 million records(which is wrong) were added to input collection for the final parquet writing step even though the files contained ~760k records.

The same step on the batch job had correct number(36 million) for Elements added (Approximate) and that was the same number of records in the outputted parquet files.

I haven't seen anything unusual in the logs.

Why is cloud dataflow marking the streaming job as Succeeded even though a ton of records have been dropped during writing the output?

Why is there an inconsistency with the metrics reporting for batch and streaming jobs on cloud dataflow with the same input?

For both jobs I have set 3 workers with a machine type of n1-highmem-4. I pretty much reached my quota for the project.

1

1 Answers

0
votes

I suspect this might be due to the way you have configured Windows and triggers for your streaming pipeline. By default Beam/Dataflow triggers data when watermark passes the end of the window and default window configuration sets allowed lateness to zero. So any late data will be discarded by the pipeline. To change this behavior you can try setting the allowed lateness value or try setting a different trigger. See here for more information.