I am running apache beam pipeline on google dataflow. It reads data from GCS bucket and after processing it writes to GCS bucket. This pipeline processes Japanese data. In stack driver log, japanese character are showing properly. But when i see the data in o/p bucket, it is corrupted. So mostly I am thinking, either while writing data to GCS, encoder is not set or We have to do changes in GCS file format. Need help for solving this issue.
I have tried setting encoding in beam pipeline. Also while running pipleine, I tried running the jar with encoding parma with
-Dfile.encoding=EUC-JP -jar target/jarname -- other beam option
Beam version 2.14.0
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleRunner {
private static final Logger LOG = LoggerFactory.getLogger(SimpleRunner.class);
static TupleTag<String> successRecord = new TupleTag<String>() {
private static final long serialVersionUID = 3L;
};
static class JapaneseCharProcessor extends DoFn<String, String> {
private static final long serialVersionUID = 1L;
public JapaneseCharProcessor() {
}
@ProcessElement
public void processElement(@Element String record, MultiOutputReceiver out, ProcessContext c) {
LOG.info("processElement {}", record);
c.output("processed Recors:" + record);
}
}
public static void main(String[] args) {
IExtractorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(IExtractorOptions.class);
final List<String> LINES = Arrays.asList(
"あらゆる情報を簡単に整理・投稿ができる!",
"ブログランキング",
"ブログを作る・楽しむ",
"ショッピングカート");
Pipeline pipeline = Pipeline.create(options);
LOG.info("options configured {}", options);
FileSystems.setDefaultPipelineOptions(options);
CoderRegistry cr = pipeline.getCoderRegistry();
cr.registerCoderForClass(String.class, StringUtf8Coder.of());
pipeline.apply("Read from input files",
Create.of(LINES)).setCoder(StringUtf8Coder.of())
.apply("Process input files", ParDo.of(new JapaneseCharProcessor()))
.apply("Writing output to success",
(TextIO.write().to(options.getSuccessRecordBucketURL()).withNumShards(10)));
pipeline.run().waitUntilFinish();
LOG.info("pipeLine execution completed");
}
}
Actual result : processed Recors:ã‚ã‚‰ã‚†ã‚‹æƒ…å ±ã‚’ç°¡å˜ã«æ•´ç†ãƒ»æŠ•ç¨¿ãŒã§ãã‚‹ï¼
Expected result : processed Recors:あらゆる情報を簡単に整理・投稿ができる!