I am trying to take input from pandas dataframe to apache beam pipeline and write it to GCS. Without using dataflow/apache beam, I am able to write the dataframe data in GCS. But now dataflow is in picture.
def database_to_gcs(self, type='full'):
if type == 'full':
with open(self.tablemetadata, 'r') as fr:
next(fr)
self.clear_directory()
argv = [
'--project={0}'.format(self.project_name),
'--job_name=One',
'--save_main_session',
'--staging_location=gs://{0}/staging/'.format(self.bucket_name),
'--temp_location=gs://{0}/staging/'.format(self.bucket_name),
'--runner=DataflowRunner'
]
p = beam.Pipeline(argv=sys.argv)
for line in fr:
table_name, primary_key = line.split(',')
self.cur.execute("SELECT * FROM " + table_name)
df = pd.DataFrame(list(self.cur))
dictionary = df.to_dict('split')
print(dictionary)
input_dataframe = df
output_path = 'gs://{0}/output/{1}/{2}/{3}'.format(self.bucket_name,
table_name,
str(datetime.now().date()),
str(datetime.now()) + "_" + table_name + '.csv')
(p
| 'ReadDataframe' >> beam.io.ReadFromText(input_dataframe)
| 'WriteToFile' >> beam.io.Write(output_path)
)
p.run()