0
votes

I am trying to launch a Streaming Dataflow Job which contains n number of pipelines.

Based on configured topic and corresponding BQ table for each Topic i want to launch a Pipeline inside a one Streaming Job.

My actual problem is i have to create and upload a template for each and every project. What i want is, i can reuse the uploaded template and only configuration files ihave to pass for launching new dataflow job by changing topic,subscription, dataset and bq table.

Which is i am unable to reuse the template.

Please help me on this and let me know if this is possible or not. Because Google has also provided one to one template. Not many to many Template (e.g Three topic - Three BQ Table (three data pipeleine) , n-n).

import logging
import os
import json
from google.cloud import storage
from apache_beam import Pipeline, ParDo, DoFn
from apache_beam.io import ReadFromPubSub, WriteToBigQuery, BigQueryDisposition
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, WorkerOptions, GoogleCloudOptions, \
    SetupOptions


def _get_storage_service():
    storage_client = storage.Client \
        .from_service_account_json(
        json_credentials_path='C:\Users\dneema\PycharmProjects\iot_dataflow\df_stm_iot_pubsub_bq\service_account_credentials.json')
    print('storage service fetched')
    return storage_client


class RuntimeOptions(PipelineOptions):

    def __init__(self, flags=None, **kwargs):
        super(RuntimeOptions, self).__init__(flags, **kwargs)

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--bucket_name', type=str)
        parser.add_value_provider_argument('--config_json_path', type=str,)


class PipelineCreator:

    def __init__(self):
        self.options = PipelineOptions()
        storage_client = storage.Client.from_service_account_json(
            'service_account_credentials_updated.json')

        runtime_options = self.options.view_as(RuntimeOptions)
        bucket_name = str(runtime_options.bucket_name)
        config_json_path = str(runtime_options.config_json_path)

        # get the bucket with name
        bucket = storage_client.get_bucket(bucket_name)

        # get bucket file as blob
        blob = bucket.get_blob(config_json_path)

        # convert to string and load config
        json_data = blob.download_as_string()
        self.configData = json.loads(json_data)

        dataflow_config = self.configData['dataflow_config']
        self.options.view_as(StandardOptions).streaming = bool(dataflow_config['streaming'])
        self.options.view_as(SetupOptions).save_main_session = True

        worker_options = self.options.view_as(WorkerOptions)
        worker_options.max_num_workers = int(dataflow_config['max_num_worker'])
        worker_options.autoscaling_algorithm = str(dataflow_config['autoscaling_algorithm'])
        #worker_options.machine_type = str(dataflow_config['machine_type'])
        #worker_options.zone = str(dataflow_config['zone'])
        #worker_options.network = str(dataflow_config['network'])
        #worker_options.subnetwork = str(dataflow_config['subnetwork'])

    def run(self):
        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'dataflow-service-account.json'

        project_id = self.configData['project_id']
        dataset_id = self.configData['dataset_id']
        topics = self.configData['topics']
        table_ids = self.configData['bq_table_ids']
        error_table_id = self.configData['error_table_id']

        logger = logging.getLogger(project_id)
        logger.info(self.options.display_data())

        pipeline = Pipeline(options=self.options)

        size = len(topics)
        for index in range(size):
            print(topics[index])
            pipeline_name = "pipeline_"+str(index)
            logger.info("Launch pipeline :: "+pipeline_name)
            messages = pipeline | 'Read PubSub Message in ' + pipeline_name >> ReadFromPubSub(topic=topics[index])
            logger.info("Read PubSub Message")
            valid_messages, invalid_messages = messages  | 'Convert Messages to TableRows in ' + pipeline_name >> ParDo(TransformMessageToTableRow()).with_outputs('invalid', main='valid')
            valid_messages | 'Write Messages to BigQuery in ' + pipeline_name >> WriteToBigQuery(table=table_ids[index],
                                                                                               dataset=dataset_id,
                                                                                               project=project_id,
                                                                                          write_disposition=BigQueryDisposition.WRITE_APPEND)

        pipeline.run().wait_until_finish()

class TransformMessageToTableRow(DoFn):

    def process(self, element, *args, **kwargs):
        logging.getLogger('dataflow').log(logging.INFO, element)
        print element
        print("element type ", type(element))
        print("inside bq pardo")
        import json
        try:
            message_rows = json.loads(element)

            # if using emulator, uncomment below line
            message_rows = json.loads(message_rows)
            print 'loaded element'
        except:
            try:
                element = "[" + element + "]"
                message_rows = json.loads(element)
            except Exception as e:
                print(e)
                from apache_beam import pvalue
                yield [pvalue.TaggedOutput('invalid', [element, str(e)])]
        print(message_rows)
        print("message rows", type(message_rows))
        if not isinstance(message_rows, list):
            message_rows = [message_rows]
        #rows = list()
        if isinstance(message_rows, list):

            for row in message_rows:
                try:
                    new_row = dict()
                    for k, v in row.items():
                        new_row[str(k)] = v
                    #rows.append(new_row)
                    print(new_row)
                    yield new_row
                except Exception as e:
                    print(e)
                    from apache_beam import pvalue
                    yield pvalue.TaggedOutput('invalid', [row, str(e)])

if __name__ == '__main__':
        PipelineCreator().run()

Here Runtime argument as bucket_name and config_json_path for all the configuration related stuffs like Dataset, BQ table, Topics/ Subscription and all Workflow options.

This is possible or not ? Because Google has also provided one to one template. Not many to many Template (e.g Three topic - Three BQ Table (three data pipeleine) , n-n).

1

1 Answers

0
votes

Regarding this previously answered thread Unable to run multiple Pipelines in desired order by creating template in Apache Beam, you can run only one pipeline inside a template at any time. You'll have to delegate the template creation to another service and pass the configuration with it, just follow the link inside the thread and you'll have How To examples.