0
votes

I am trying to read file from GCS bucket (with path: gs://bucket_name), and load it to Dataflow VM folder(with path /tmp/file name).

Also I need to copy another file from Dataflow VM folder back to GCS bucket.

I have tried apache_beam.io.gcp.gcsio library, but it not seems to work.

Can anyone give any suggestion on this?

1
Just to clarify my wording, I dont want to read line by line, basically what I want to do is to copy file from GCS bucket to DataflowVM, and also copy file from DataflowVM to GCS bucketAlex

1 Answers

1
votes

Best way to do it is to trigger a custom DoFn with the process method calling into the GCS Python API. The DoFn can be triggered by sending in elements to the DoFn. It can be triggered either by an Impulse (only execute once) or a PCollection (execute per element in the PCollection). Take a look here for downloading/uploading blobs and here for the GCS Python Client library docs.

import apache_beam as beam
from google.cloud import storage

p = beam.Pipeline(...)
impulse = p | beam.Impulse()

class ReadWriteToGcs(beam.DoFn):
  def setup(self, e):
    self.client = storage.Client()

  def process(self, e):
    bucket = self.client.bucket(bucket_name)
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)
    blob.upload_from_filename(source_file_name)