0
votes

I have been testing an Apache beam pipeline within Apache beam notebooks provided by GCP using a Kafka instance as a input and Bigquery as output. I have been able to successfully use the pipeline via Interactive runner, but when I deploy the same pipeline to Dataflow runner it seems to never actually read from the Kafka topic that has been defined. Looking into the logs gives me the error:

Failed to read inputs in the data plane. Traceback (most recent call last): File /usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py,

Implementation based on this post here

Any ideas? Code provided below:

from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio

kafka_config = {"topic": kafka_topic, "bootstrap_servers": ip_addr}

# p = beam.Pipeline(interactive_runner.InteractiveRunner(), options=options) # <- use for test
p = beam.Pipeline(DataflowRunner(), options=options) # <- use for dataflow implementation

notifications = p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(kafka_config) 
preprocess = notifications | "Pre-process for model" >> beam.ParDo(preprocess()) 
model = preprocess | "format & predict" >> beam.ParDo(model())

newWrite = model | beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
    create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)

Error message from logs:

Failed to read inputs in the data plane. Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", line 528, in _read_inputs for elements in elements_iterator: File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 416, in __next__ return self._next() File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 689, in _next raise self grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "DNS resolution failed" debug_error_string = "{"created":"@1595595923.509682344","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3948,"referenced_errors":[{"created":"@1595595923.509650517","description":"Resolver transient failure","file":"src/core/ext/filters/client_channel/resolving_lb_policy.cc","file_line":216,"referenced_errors":[{"created":"@1595595923.509649070","description":"DNS resolution failed","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":375,"grpc_status":14,"referenced_errors":[{"created":"@1595595923.509645878","description":"unparseable host:port","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":417,"target_address":""}]}]}]}" >

and also

grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "DNS resolution failed" debug_error_string = "{"created":"@1594205651.745381243","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3948,"referenced_errors":[{"created":"@1594205651.745371624","description":"Resolver transient failure","file":"src/core/ext/filters/client_channel/resolving_lb_policy.cc","file_line":216,"referenced_errors":[{"created":"@1594205651.745370349","description":"DNS resolution failed","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":375,"grpc_status":14,"referenced_errors":[{"created":"@1594205651.745367499","description":"unparseable host:port","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":417,"target_address":""}]}]}]}" >

Pipeline settings:

Python sdk harness started with pipeline_options: {'streaming': True, 'project': 'example-project', 'job_name': 'beamapp-root-0727105627-001796', 'staging_location': 'example-staging-location', 'temp_location': 'example-staging-location', 'region': 'europe-west1', 'labels': ['goog-dataflow-notebook=2_23_0_dev'], 'subnetwork': 'example-subnetwork', 'experiments': ['use_fastavro', 'use_multiple_sdk_containers'], 'setup_file': '/root/notebook/workspace/setup.py', 'sdk_location': '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-2.23.0.dev0.tar.gz', 'sdk_worker_parallelism': '1', 'environment_cache_millis': '0', 'job_port': '0', 'artifact_port': '0', 'expansion_port': '0'}
2
Hey there, could you please tell us if the Error message you see is only "Failed to read[...]"? Does it include something else? Please lets us knowKevin Quinzel
Hi @KevinQuinzel after further investigation, The first error in the log is - (error: INVALID_ARGUMENT: Http(400) Bad Request). The networking specific error is - "referenced_errors\":[{\"created\":\"@1594977061.550058030\",\"description\":\"DNS resolution failed\",\"file\":\"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc\",\"file_line\":375,\"grpc_status\":14,\"referenced_errors\":[{\"created\":\"@1594977061.550053572\",\"description\":\"unparseable host:port\"enzed01
That error mentions "unparseable host:port". Are you specifying host:port anywhere, like in the arguments to the pipeline, or code that's not included here? There might be a typo somewhere.Daniel Oliveira
Looks like you ip_addr is not resolving to a PUBLIC IP and PORT, rather it is resolving to a string called host:port. You might want to check on this.Jayadeep Jayaraman
Hi @DanielOliveira earlier up in the notebook file I am declaring it as ip_addr = "10.0.0.70:9092" I have also tried not using a variable at all and just declaring it as "bootstrap_servers": "10.0.0.70:9092" & unfortunately have seen the same result.enzed01

2 Answers

0
votes

As far as I know Failed to read inputs in the data plane ... status = StatusCode.UNAVAILABLE details = "DNS resolution failed" could be an issue in Python Beam SDK, it is recommended to update to Python Beam SDK 2.23.0.

0
votes

Seems as if this isn't possible in my implementation plan, but with multi language pipelines it appears to be more viable. I opened a ticket with google support on this matter and got the following reply after some time investigating:

“… at this moment Python doesn't have any KafkaIO that works with DataflowRunner. You can use Java as a workaround. In case you need Python for something in particular (TensorFlow or similar), a possibility is to send the message from Kafka to a PubSub topic (via another pipeline that only reads from Kafka and publish to PS or an external application).”

So feel free to take their advice, or you might be able to hack something together. I just revised my architecture to use pubsub instead of kafka.