0
votes

I'm using Google Cloud Platform to transfer data from an Azure server to a BigQuery table (working nice and smoothly, functionally speaking). The pipeline looks like this: Dataflow streaming pipeline

The 'FetchMetadata' part of the pipeline is a simple TextIO.Read implementation where I read a 66-line .csv file with metadata from a GCP Storage bucket:

PCollection<String> metaLine = p.apply(TextIO.Read.named("FetchMetadata")
            .from("gs://my-bucket"));

When I use my pipeline in Batch mode this works like a charm: first the metadata file is loaded in the pipeline in less than a second of vCPU time and then the data itself is loaded in the pipeline. Now when running in Streaming mode I would love to replicate that behaviour to some extent but when I just use the same code there is a problem: when running the pipeline for just 15 minutes (actual time) the TextIO.Read block uses a whopping 4 hours of vCPU time. For a pipeline that will be permanently running for a low budget project this is unacceptable.

So my question: is it possible to change the code so the file is periodically read again (if the file changes I want the pipeline to be updated, so let's say hourly updates) and not continiously like it's doing right now.

I've found some documentation where there is mention of TextIO.Read.Bound which looks like a good place to start solving this issue, but it's no solution for my periodical update problem (as far as I know)

1

1 Answers

1
votes

I was stuck in a similar situation. The way I solved this problem is a bit different. I would like the community's insights into this solution.

I had files being updated every hour in a GCS bucket. I followed the blog post about Scheduling Dataflow Jobs from App Engine or Google Cloud Function.

I had the app engine endpoint configured to receive the object change notifications from the GCS bucket which contained the files to be processed. For every file that was created (update is also a create operation in an object store), app engine application would submit a job to google dataflow. The job would read the lines from the file (file name in the HTTP request body) and publish it to a Google PubSub topic.

A streaming pipeline then had been subscribed to the Google PubSub topic that would process and output the relevant rows to big query. This way, streaming pipeline ran at the minimum worker count when idle, the ingest of the files happened through a batch pipeline and the streaming pipeline scaled with respect to the volume of the publications in the Google PubSub topic.

In the tutorial for submitting jobs to Google Dataflow, the jar is executed on the underlying terminal. I modified the code to submit a job to google dataflow using templates which can be executed with parameters. This way, the job submission operation becomes super light weight while still creating a job for every new file upload to the GCS bucket. Please refer this link for details about executing google dataflow job templates.

Note: Please mention in the comments if the answer needs to be modified for the code snippets of the dataflow job template and app engine application and I will update the answer accordingly.