0
votes

I need to parse json data from files in GCS which is compressed, since the files extension is .gz so it should be reorganized and handled properly by dataflow, however the job log printed out unreadable characters and data not processed. when I process uncompressed data it worked fine. I used the following method to map/parse json:

        ObjectMapper mapper = new ObjectMapper();
        Map<String, String> eventDetails = mapper.readValue(c.element(),
                    new TypeReference<Map<String, String>>() {
                    });

any idea what could be the cause?

===================================

To add more details about how to read from input files:

  1. to create a pipeline:

    Poptions pOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(Poptions.class);
    Pipeline p = Pipeline.create(pOptions);
    p.apply(TextIO.Read.named("ReadLines").from(pOptions.getInput()))                                          
     .apply(new Pimpression())
     .apply(BigQueryIO.Write
    .to(pOptions.getOutput())
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
    p.run();
    
  2. configuration at run time:

    PROJECT="myProjectId"
    DATASET="myDataSetId"
    INPUT="gs://foldername/input/*"
    STAGING1="gs://foldername/staging" 
    TABLE1="myTableName"
    mvn exec:java -pl example \
    -Dexec.mainClass=com.google.cloud.dataflow.examples.Example1 \
    -Dexec.args="--project=${PROJECT} --output=${PROJECT}:${DATASET}.${TABLE1}   --input=${INPUT} --stagingLocation=${STAGING1} --runner=BlockingDataflowPipelineRunner"
    
  3. input file name example: file.gz, and the output of command gsutil ls -L gs://bucket/input/file.gz | grep Content- is:

    Content-Length:     483100
    Content-Type:       application/octet-stream
    
1
Hi, I'm sorry to hear that you're having trouble. Can you provide some details about how you're generating/reading from the files (in both cases)?MattL
Hi MattL, any comments/ideas/Echo
Sorry for the delay--this looks fine. Can you share the output of the following? gsutil ls -L gs://foldername/input/exampleinputfile.gz" | grep Content- We may have to follow up privately, because this doesn't look like intended behavior.MattL
thanks, I put in the output of that command, please let me know what should I do next.Echo
use withCompressionType(TextIO.CompressionType.AUTO) in your pipeline.apply options. This will process parsed files properlyProgrammer

1 Answers

1
votes

After following up privately, we determined that this issue was due to using an older version of the Dataflow SDK (pre-gzip support). Since Dataflow is in alpha and the SDK is being continually updated, ensure that the SDK version you are using is up to date (either from Maven central or GitHub).