Not able to run the dataflow for streaming and getting the count of unread messages I needed a streaming dataflow to monitor the unread messagaes Is there any issue in my code.
import logging
import datetime
import argparse
import yaml
import apache_beam as beam
from apache_beam import Pipeline, ParDo, DoFn
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import monitoring_v3
from google.cloud.monitoring_v3 import query
class unread(DoFn):
def __init__(self, project, pubsub_subscription):
self.project = project
self.pubsub_subscription = pubsub_subscription
def process(self, key_value):
logging.info("Key_value {}".format(key_value))
client = monitoring_v3.MetricServiceClient()
result = query.Query(
client,
project,
'pubsub.googleapis.com/subscription/num_undelivered_messages',
end_time=datetime.datetime.now(),
minutes=2,
).as_dataframe()
if result.empty:
logging.info("All Messages Read")
else:
logging.info(result['pubsub_subscription'][self.project][self.pubsub_subscription][0])
return result
def run():
pipeline_args = [arguments passed
]
logging.info("Pipeline Arguments: {}".format(pipeline_args))
try:
# Set `save_main_session` to True so DoFns can access globally imported modules.
pipeline_options = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True
)
with Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| 'Initializing ' >> beam.Create(['1'])
| 'PubSub Unread Messages' >> ParDo(unread(project, pubsub_subscription))
)
except Exception as e:
logging.error("Failed to Deploy DataFlow Pipeline. Error {}".format(e))
if __name__ == "__main__":
with open("input.yaml", 'r') as yamlfile:
cfg = yaml.load(yamlfile, Loader=yaml.BaseLoader)
logging.basicConfig(format='%(asctime)s %(levelname)s - %(message)s', datefmt='%y-%m-%d %H:%M:%S', level=logging.INFO)
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args()
run()
Error message: Error message from worker: generic::unknown: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 582, in apache_beam.runners.common.SimpleInvoker.invoke_process File "unread.py", line 18, in process NameError: name 'monitoring_v3' is not defined During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute response = task() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1001, in process_bundle element.data) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 229, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 358, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 220, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 717, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1235, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1300, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 581, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 220, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 717, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1235, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1300, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 581, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 220, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 717, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1235, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1315, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.7/site-packages/future/utils/init.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 582, in apache_beam.runners.common.SimpleInvoker.invoke_process File "unread.py", line 18, in process NameError: name 'monitoring_v3' is not defined [while running 'PubSub Unread Messages-ptransform-32']