Looking through the code on this question, I want to be able to create a dataflow pipeline that can look at all the files within a specific gcs bucket folder and state the final subdirectories with the greatest amount of data in terms of bytes. I would write code that is similar to :
class SortFiles(beam.DoFn):
def __init__(self, gfs):
self.gfs = gfs
def process(self, file_metadata):
if file_metadata.size_in_bytes > 0:
# Sort the files here?
class SortFolders(beam.DoFn):
def __init__(self, gfs):
self.gfs = gfs
def process(self, file_metadata):
if file_metadata.size_in_bytes > 0:
# Sort the folders here based on maximum addition of a combination
# of the file sizes and file numbers
def delete_empty_files():
options = PipelineOptions(...)
gfs = gcs.GCSFileSystem(pipeline_options)
p = beam.Pipeline(options=pipeline_options)
discover_empty = p | 'Filenames' >> beam.Create(gfs.match(gs_folder).metadata_list)
| 'Reshuffle' >> beam.Reshuffle()
| 'SortFilesbySize' >> beam.ParDo(SortFiles(gfs))
| 'SortFoldersbySize' >> beam.ParDo(SortFolders(gfs))
| 'OutputFolders' >> ...
I have not decided on whether to list the folders by the total amount of bytes or the total number of files within them. How would I go about solving this issue? Another issue lies in the fact that I want to be able to find the final sub directory and not its parent folders for this task.