0
votes

I am running apache beam pipeline on google dataflow. It reads data from GCS bucket and after processing it writes to GCS bucket. This pipeline processes Japanese data. In stack driver log, japanese character are showing properly. But when i see the data in o/p bucket, it is corrupted. So mostly I am thinking, either while writing data to GCS, encoder is not set or We have to do changes in GCS file format. Need help for solving this issue.

I have tried setting encoding in beam pipeline. Also while running pipleine, I tried running the jar with encoding parma with

-Dfile.encoding=EUC-JP -jar target/jarname -- other beam option

Beam version 2.14.0


import java.util.Arrays;
import java.util.List;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



public class SimpleRunner {

    private static final Logger LOG = LoggerFactory.getLogger(SimpleRunner.class);

    static TupleTag<String> successRecord = new TupleTag<String>() {

        private static final long serialVersionUID = 3L;
    };


    static class JapaneseCharProcessor extends DoFn<String, String> {
        private static final long serialVersionUID = 1L;

        public JapaneseCharProcessor() {
        }

        @ProcessElement
        public void processElement(@Element String record, MultiOutputReceiver out, ProcessContext c) {
            LOG.info("processElement  {}", record);
            c.output("processed Recors:" + record);

        }

    }


    public static void main(String[] args)  {

        IExtractorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(IExtractorOptions.class);
        final List<String> LINES = Arrays.asList(
                  "あらゆる情報を簡単に整理・投稿ができる!",
                  "ブログランキング",
                  "ブログを作る・楽しむ",
                  "ショッピングカート");
        Pipeline pipeline = Pipeline.create(options);

        LOG.info("options configured  {}", options);
        FileSystems.setDefaultPipelineOptions(options);

        CoderRegistry cr = pipeline.getCoderRegistry();
        cr.registerCoderForClass(String.class, StringUtf8Coder.of());

        pipeline.apply("Read from input files",
                Create.of(LINES)).setCoder(StringUtf8Coder.of())
                .apply("Process input files", ParDo.of(new JapaneseCharProcessor()))
                .apply("Writing output to success",
                        (TextIO.write().to(options.getSuccessRecordBucketURL()).withNumShards(10)));
        pipeline.run().waitUntilFinish();
        LOG.info("pipeLine execution completed");

    }

}

Actual result : processed Recors:ã‚ã‚‰ã‚†ã‚‹æƒ…å ±ã‚’ç°¡å˜ã«æ•´ç†ãƒ»æŠ•ç¨¿ãŒã§ãã‚‹ï¼

Expected result : processed Recors:あらゆる情報を簡単に整理・投稿ができる!

1

1 Answers

0
votes

I checked your code and works perfectly fine for both the DirectRunner and the DataflowRunner with 2.16.0 SDK:

$ gsutil cat gs://$BUCKET/japanese/output-cloud/result.txt-00000-of-00002
processed Recors:あらゆる情報を簡単に整理・投稿ができる!
processed Recors:ショッピングカート

However, even if the content is correct, I also see strange characters if I inspect the file using the browser and the GCS Console:

enter image description here