1
votes

I am trying to take input from pandas dataframe to apache beam pipeline and write it to GCS. Without using dataflow/apache beam, I am able to write the dataframe data in GCS. But now dataflow is in picture.

def database_to_gcs(self, type='full'):
    if type == 'full':
        with open(self.tablemetadata, 'r') as fr:
            next(fr)
            self.clear_directory()
            argv = [
                '--project={0}'.format(self.project_name),
                '--job_name=One',
                '--save_main_session',
                '--staging_location=gs://{0}/staging/'.format(self.bucket_name),
                '--temp_location=gs://{0}/staging/'.format(self.bucket_name),
                '--runner=DataflowRunner'
            ]
            p = beam.Pipeline(argv=sys.argv)
            for line in fr:
                table_name, primary_key = line.split(',')
                self.cur.execute("SELECT * FROM " + table_name)
                df = pd.DataFrame(list(self.cur))
                dictionary = df.to_dict('split')
                print(dictionary)
                input_dataframe = df
                output_path = 'gs://{0}/output/{1}/{2}/{3}'.format(self.bucket_name,
                                                                   table_name,
                                                                   str(datetime.now().date()),
                                                                   str(datetime.now()) + "_" + table_name + '.csv')
                (p
                  | 'ReadDataframe' >> beam.io.ReadFromText(input_dataframe)
                  | 'WriteToFile' >> beam.io.Write(output_path)
                  )
                p.run()
1

1 Answers

0
votes

Beam provides ParDo transform where you can write arbitrary Python code that operates on input elements. So probably consider writing a DoFn that takes lines of text read from input file and generates dataframes. You can either process these dataframes in the same ParDo or feed them to a secondary ParDo where you do the processing. I don't think Beam currently have any utility transforms for handling pandas dataframes currently even though this was discussed several times.