0
votes

When I declare a pipeline with two sources (1 gcs and 1 pubsub) I get an error but only with the Beam DirectRunner. With the Google dataflow runner it works well. My pipeline has the "Streaming option = True"

gcsEventsColl = p | "Read from GCS" >> beam.io.ReadFromText("gs://sample_events_for_beam/*.log") \
                  | 'convert to dict' >> beam.Map(lambda x: json.loads(x))
liveEventsColl = p | "Read from Pubsub" >> beam.io.ReadFromPubSub(topic="projects/axxxx/topics/input_topic") \
                   | 'convert to dict2' >> beam.Map(lambda x: json.loads(x))


input_rec = (gcsEventsColl, liveEventsColl) | 'flatten' >> beam.Flatten()

It seems that the DirectRunner makes some incompatible transformations for the ReadFromText but I don't get it.

   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 564, in run
    return self.runner.run_pipeline(self, self._options)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 529, in run_pipeline
    pipeline.replace_all(_get_transform_overrides(options))
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 504, in replace_all
    self._check_replacement(override)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 478, in _check_replacement
    self.visit(ReplacementValidator())
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 611, in visit
    self._root_transform().visit(visitor, self, visited)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1195, in visit
    part.visit(visitor, pipeline, visited)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1195, in visit
    part.visit(visitor, pipeline, visited)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1195, in visit
    part.visit(visitor, pipeline, visited)   [Previous line repeated 4 more times]
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1198, in visit
    visitor.visit_transform(self)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 476, in visit_transform
    transform_node) RuntimeError: Transform node AppliedPTransform(Read from GCS/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey,
    _GroupByKeyOnly) was not replaced as expected.

I suppose that it is related with this code but I'm not sure: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/direct/direct_runner.py#L375

Thanks for your Help

2

2 Answers

1
votes

In fact, the same code works with the Java direct runner. Perhaps it is a limitation of the Python runner?

  public static void main(String[] args) {
    Pipeline p = Pipeline.create();
    PCollection<String> apply = p.apply("read from gcs", TextIO.read().from("gs://xxx/*.log"));
    PCollection<String> apply1 = p.apply("read from pubsub", PubsubIO.readMessages().fromTopic("projects/xxx/topics/input_topic")).apply("test", MapElements.via(new FormatAsPubSubMessage()));
    PCollectionList<String> pcs = PCollectionList.of(apply).and(apply1);
    PCollection<String> merged = pcs.apply("merge", Flatten.<String>pCollections());
    merged.apply("log elements", MapElements.via(new LogElement()));
    p.run().waitUntilFinish();
  }
1
votes

This is an internal failure due to a bug. This error message means that the Python DirectRunner has corrupted the pipeline graph when it was trying to do a rewrite of the transforms.