0
votes

I am given the URL of a Google cloud bucket. I have to:

  1. Use the URL to acquire a list of blobs in that bucket

  2. For each blob I make some GCS API calls to get information about the blob (blob.size, blob.name, etc.)

  3. For each blob I have to also read it, find something inside it and add it to the values obtained from the GCS API calls

  4. For each blob I have to write the values found in step 2 and 3 about the blob to BigQuery

I have thousands of blobs so this needs to be done with Apache beam (I've been recommended)

My idea of the pipeline is something like this:

GetUrlOfBucket and make PCollection

Using that PCollection obtain a list of blobs as a new PCollection

Create a PCollection with the metadata of those blobs

Perform a Transform that will take in the PCollection that is a dictionary of metadata values, goes into the blob, scans for a value and returns a new PCollection that is a dictionary of the metadata values and this new value

Write this to BigQuery.

It's particularly hard for me to think about how to return a dictionary as a PCollection

[+] What I've read:

https://beam.apache.org/documentation/programming-guide/#composite-transforms

https://medium.com/@rajeshhegde/data-pipeline-using-apache-beam-python-sdk-on-dataflow-6bb8550bf366

Any suggestions, specifically about how to take in that bucket name and return a PCollection of blobs, is greatly appreciated.

1

1 Answers

1
votes

I resolved this by reading more about apache-beam and figuring out that I had to use the ParDo function to split the job between my resources, in the ParDo I call my DoFn function which takes in an element and does all the processing needed for it and yields a dic. refer to this post Apache Beam: How To Simultaneously Create Many PCollections That Undergo Same PTransform?

    class ExtractMetadata(beam.DoFn):                                                                                                                                                                                                                                                  
def process(self, element):                                                                                                                                                                                                                                                    
    """                                                                                                                                                                                                                                                                        
    Takes in a blobName, fetches the blob and its values and returns a dictionary of values                                                                                                                                                                                    
    """                                                                                                                                                                                                                                                                        
    metadata = element.metadata                                                                                                                                                                                                                                                
    if metadata is not None:                                                                                                                                                                                                                                                   
        event_count = int(metadata['count'])                                                                                                                                                                                                                                   
    else:                                                                                                                                                                                                                                                                      
        event_count = None                                                                                                                                                                                                                                                     

    event_type = self.determine_event_type(element.id)                                                                                                                                                                                                                         
    cluster    = self.determine_cluster(element.id)                                                                                                                                                                                                                            
    customer   = self.determine_customer(element)                                                                                                                                                                                                                              
   # date = datetime.strptime(element.time_created, '%a, %d %b %Y %H:%M:%S')                                                                                                                                                                                                   
    #date = date.isoformat()                                                                                                                                                                                                                                                   
    dic = {                                                                                                                                                                                                                                                                    
        'blob_name'       : element.name,                                                                                                                                                                                                                                      
        'event_path'      : element.path,                                                                                                                                                                                                                                      
        'size'            : int(element.size),                                                                                                                                                                                                                                 
        'time_of_creation': element.time_created.isoformat(),                                                                                                                                                                                                                  
        'event_count'     : event_count,                                                                                                                                                                                                                                       
        'event_type'      : event_type,                                                                                                                                                                                                                                        
        'cluster'         : cluster,                                                                                                                                                                                                                                           
        'customer'        : customer                                                                                                                                                                                                                                           
    }                                                                                                                                                                                                                                                                          
    yield dic