0
votes

I have gzipped line-delimitered JSON file in GCS. I want to load them using Dataflow then save it into BigQuery.

However, it always fails to parse JSON in the first line in the file. I use Jackson and a log says

    Failed to parse JSON, com.fasterxml.jackson.core.JsonParseException:
    Unexpected character ('_' (code 95)):
    Expected space separating root-level values  at
    [Source: (String)"40085_telemetry_2015-09-09.log0000664000076600007660000011300712574251007016553
    0ustar  xxxadminxxxadmin"[truncated 142 chars];
    line: 1, column: 7

However, when I checked a file content, there was no string like above. It is surely a valid JSON string. It looks when Dataflow starts processing, the string above is prepended to the beginning of the first line.

Do you have any idea why this happens? I use Apache Beam Java SDK, Version 2.1.0.

My code showns as below:

static class ReadFile extends PTransform<PInput, PCollection<String>> {
    private static final long serialVersionUID = 1L;
    private ValueProvider<String> files;

    public ReadFile(ValueProvider<String> env, ValueProvider<String> productName, ValueProvider<String> files,
            ValueProvider<String> deadLetterId) {
        this.files = files;
    }

    @Override
    public PCollection<String> expand(PInput input) {
        Pipeline p = input.getPipeline();

        // this pipeline supports both wildcard and comma separated
        String inputFile = files.get();
        if (inputFile.contains("*")) {
            return p.apply("Read from GCS with wildcard prefix",
                    TextIO.read().from(inputFile).withCompressionType(CompressionType.GZIP));
        }

        String[] targetFiles = inputFile.split(",");
        PCollectionList<String> rowsList = PCollectionList.empty(p);
        for (String targetFile : targetFiles) {
            PCollection<String> fileLines = p.apply("Read (" + targetFile + ")",
                    TextIO.read().from(targetFile).withCompressionType(CompressionType.GZIP));
            rowsList = rowsList.and(fileLines);
        }

        PCollection<String> allRows = rowsList.apply("Flatten rows", Flatten.<String>pCollections());
        return allRows;
    }
}
2
Are you using Java or Python SDK? Can you show the code that constructs this part of the pipeline? Can this file be decompressed using "gzip -d"? What is the file extension?jkff
Thanks @jkff for the reply. I added the info, but in the meantime noticed that when I decompress my file with "gzip -d", the string is prepended at the first line (it didn't happen when I used Archive Utility of Mac).Norio Akagi
@jkff I think what I see with "gzip -d" is correct and this string actually exists but Mac Archive Utility just kindly omits it somehow. Sorry for bothering you, I think this is just a problem in our data :(Norio Akagi
I suspect this might be a .tar.gz archive, try tar -xzvf ?jkff
tar: Error opening archive: Failed to open returned...I think the file is just corrupted for old ones I have. Anyway, this is not totally an issue on Dataflow side. Thank you so much for clarifying it.Norio Akagi

2 Answers

1
votes

I just found that it is our file's problem. When I use gzip -d in my local the weird string actually exist at the beginning of my file.

Somehow Mac's archive utility omits that part and I didn't notice that. For now, I can hard code so that my pipeline strips a string until it finds the first "{" to overcome this issue.

0
votes

I built a gzip reader in java to avoid the gzip -d step. This has the advantage of saving disk space and processing time.