0
votes

I have csv (gzip compressed) files in GCS. I want to read these files and send data to BigQuery.

The header info can be changed (although I know all columns in advance), so just dropping a header is not enough, somehow I need to read the first line and append the column info to remaining line.

How is it possible?

I first think I must implement a custom source like this post.
Reading CSV header with Dataflow
But with this solution, I'm not sure how I can decompress Gzip first. Can I somehow use withCompressionType like TextIO? (I found a parameter compression_type in a python Class but I'm using Java and could not find a similar one in Java FileBasedSource class.)

Also I feel this a bit overkilling because it makes a file unsplittable (although in my case it's okay).

Or I can use GoogleCloudStorage and directly read a file and its first line in the first place in my main() function then proceed to a pipeline.

But it is also bothersome, so I want to confirm if there is any best practice (the Dataflow way) to read csv file while utilizing a header in Dataflow?

1
It should be fairly easy for you to subclass some TextIO transform, and add the special behavior.Pablo
@Pablo is there any example that I can refer to extend TextIO and adding a special behavior? I thought I need to use Source to extend a behavior of file processing, and I know how to do that, but by doing so I lose the benefit of auto decompression of a gz file of TextIO.Norio Akagi

1 Answers

1
votes

If I understand what you are trying to accomplish correctly, SideInput (doc, example) is likely the answer here. It will allow the header to be available for processing along side every line of the file.

The general idea is to emit the header as a separate PCollectionView and use this as a SideInput to your per-line processing. You can achieve this using a single pass over your file using SideOutput (doc)

If I am reading your question correctly, it sounds like your header contents vary form file to file. If so, you can use View.asMap to keep a map of headers from each file. Unfortunately keeping track of the current filename being read is not currently supported natively, but there are work-arrounds discussed in this post.