0
votes

Not able to run the dataflow for streaming and getting the count of unread messages I needed a streaming dataflow to monitor the unread messagaes Is there any issue in my code.

import logging
import datetime
import argparse
import yaml
import apache_beam as beam
from apache_beam import Pipeline, ParDo, DoFn
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import monitoring_v3
from google.cloud.monitoring_v3 import query

class unread(DoFn):
    def __init__(self, project, pubsub_subscription):
        self.project = project
        self.pubsub_subscription = pubsub_subscription

    def process(self, key_value):
        logging.info("Key_value {}".format(key_value))
        client = monitoring_v3.MetricServiceClient()
        result = query.Query(
            client,
            project,
            'pubsub.googleapis.com/subscription/num_undelivered_messages',
            end_time=datetime.datetime.now(),
            minutes=2,
            ).as_dataframe()
        if result.empty:
            logging.info("All Messages Read")
        else:
            logging.info(result['pubsub_subscription'][self.project][self.pubsub_subscription][0])
        return result

def run():
    pipeline_args = [arguments passed
                    ]
    logging.info("Pipeline Arguments: {}".format(pipeline_args))

    try:
        # Set `save_main_session` to True so DoFns can access globally imported modules.
        pipeline_options = PipelineOptions(
            pipeline_args, streaming=True, save_main_session=True
        )

        with Pipeline(options=pipeline_options) as pipeline:
            (
            pipeline
            | 'Initializing ' >> beam.Create(['1'])
            | 'PubSub Unread Messages' >> ParDo(unread(project, pubsub_subscription))
            )
    except Exception as e:
        logging.error("Failed to Deploy DataFlow Pipeline. Error {}".format(e))


if __name__ == "__main__":

    with open("input.yaml", 'r') as yamlfile:
        cfg = yaml.load(yamlfile, Loader=yaml.BaseLoader)

    logging.basicConfig(format='%(asctime)s %(levelname)s - %(message)s', datefmt='%y-%m-%d %H:%M:%S', level=logging.INFO)

    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args()

    run()

Error message: Error message from worker: generic::unknown: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 582, in apache_beam.runners.common.SimpleInvoker.invoke_process File "unread.py", line 18, in process NameError: name 'monitoring_v3' is not defined During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute response = task() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1001, in process_bundle element.data) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 229, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 358, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 220, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 717, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1235, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1300, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 581, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 220, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 717, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1235, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1300, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 581, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 220, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 717, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1235, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1315, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.7/site-packages/future/utils/init.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 582, in apache_beam.runners.common.SimpleInvoker.invoke_process File "unread.py", line 18, in process NameError: name 'monitoring_v3' is not defined [while running 'PubSub Unread Messages-ptransform-32']

2

2 Answers

0
votes

See how to handle name error here.

Try importing the module inside the function, or enable the --save_main_session pipeline option.

0
votes

The default python dependencies that are installed on the workers are lister here - https://cloud.google.com/dataflow/docs/concepts/sdk-worker-dependencies#python-3.7.10

To install additional packages please follow the steps as mentioned here - https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/