0
votes

We've created a pipeline, which is performing a transformation from 3 streams located in GCS ('Clicks', 'Impressions', 'ActiveViews'). We have the requirement that we need to write the individual streams back out to GCS, but to separate files (to be later loaded into BigQuery), because they all have slightly a different schema.

One of the writes has failed twice in succession with different errors each time, which turn causes the pipeline to fail.

These are the last 2 workflow/pipeline represented visually from the GDC, which show the failure:

Write failing

Write failing

The 1st error:

Feb 21, 2015, 12:55:14 PM (b0cbc05dfc56dbd9): Workflow failed. Causes: (f98c177c56055863): Map task completion for Step "ActiveViews-GSC-write" failed. Causes: (2d838e694976dc6): Expansion failed for filepattern: gs://cdf/binaries/tmp-38156614004ed90e-[0-9][0-9][0-9][0-9][0-9]-of-[0-9][0-9][0-9][0-9][0-9].avro.

The 2nd error:

Feb 21, 2015, 1:20:15 PM (19dcdcf1fe125eeb): Workflow failed. Causes: (2a27345ef73673d3): Map task completion for Step "ActiveViews-GSC-write" failed. Causes: (8f79a20dfa5c4d2b): Unable to view metadata for file: gs://cdf/binaries/tmp-2a27345ef7367fe6-00001-of-00015.avro.

It's only happening on the "ActiveViews-GCS-Write" step.

Any idea what we're doing wrong?

2
Does loading avro files to bigquery works for you? From what I see only CSV and JSON are supported.G B
We're only using CSV files. I don't know why the error message says avro.Graham Polley
polleyg@ Have you had a chance to check whether your original code is now working?Jeremy Lewi
Not yet. Been busy trying to get side - inputs working.Graham Polley

2 Answers

1
votes

We've found a workaround. The problem seems to be when more than one input source is specified and a flatten is used to merge them.

Using a flatten for the 2 input sources (e.g. all our files for 1st-2nd Feb) doesn't work (or we've done it wrong):

PCollection<String> pc1 = pipeline.apply(TextIO.Read.from("gs://<bucket_name>/NetworkImpressions_20150201*"); //1st Feb
PCollection<String> pc2 = pipeline.apply(TextIO.Read.from("gs://<bucket_name>/NetworkImpressions_20150202*"); //2nd Feb
PCollectionList<String> all = PCollectionList.of(pc1).and(pc2);
PCollection<String> flattened = all.apply(Flatten.<String>pCollections());

Instead, we just use GLOB (without a flatten) and it works every time:

pipeline.apply(TextIO.Read.from("gs://<bucket_name>/Files_2015020[12]*");
1
votes

The original code is most likely hitting two different issues one of which was already fixed. The two issues have to do respectively with

  1. Combining collections by flattening them together.
  2. How we handle glob patterns.

Issue number 1 with the flatten is the one that has been fixed. With that issue fixed you are most likely hitting a second issue with how glob patterns are handled.

What happens if you use a flatten but with globs similar to what you use in the non flatten case e.g.

PCollection<String> pc1 = pipeline.apply(TextIO.Read.from("gs://<bucket_name>/NetworkImpressions_2015020[1]*");
PCollection<String> pc2 = pipeline.apply(TextIO.Read.from("gs://<bucket_name>/NetworkImpressions_2015020[2]*")

Matching globs in GCS is a bit tricky because GCS list operations are eventually consistent.