3
votes

We want to write compressed data to HDFS by Flink's BucketingSink or StreamingFileSink. I have write my own Writer which works fine if no failure occurs. However when It encounters a failure and restart from checkpoint, It will generate valid-length file(hadoop < 2.7) or truncate the file. Unluckily gzips are binary files which have trailer at the end of file. Therefore simple truncation does not work in my case. Any ideas to enable exactly-once semantic for compression hdfs sink?

That's my writer's code:

public class HdfsCompressStringWriter extends StreamWriterBaseV2<JSONObject> {

private static final long serialVersionUID = 2L;

/**
 * The {@code CompressFSDataOutputStream} for the current part file.
 */
private transient GZIPOutputStream compressionOutputStream;

public HdfsCompressStringWriter() {}

@Override
public void open(FileSystem fs, Path path) throws IOException {
    super.open(fs, path);
    this.setSyncOnFlush(true);
    compressionOutputStream = new GZIPOutputStream(this.getStream(), true);
}

public void close() throws IOException {
    if (compressionOutputStream != null) {
        compressionOutputStream.close();
        compressionOutputStream = null;
    }
    resetStream();
}

@Override
public void write(JSONObject element) throws IOException {
    if (element == null || !element.containsKey("body")) {
        return;
    }
    String content = element.getString("body") + "\n";
    compressionOutputStream.write(content.getBytes());
    compressionOutputStream.flush();
}

@Override
public Writer<JSONObject> duplicate() {
    return new HdfsCompressStringWriter();
}

}

1

1 Answers

3
votes

I would recommend to implement a BulkWriter for the StreamingFileSink which compresses the elements via a GZIPOutputStream. The code could look the following:

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.enableCheckpointing(1000);

    final DataStream<Integer> input = env.addSource(new InfinitySource());

    final StreamingFileSink<Integer> streamingFileSink = StreamingFileSink.<Integer>forBulkFormat(new Path("output"), new GzipBulkWriterFactory<>()).build();
    input.addSink(streamingFileSink);

    env.execute();
}

private static class GzipBulkWriterFactory<T> implements BulkWriter.Factory<T> {
    @Override
    public BulkWriter<T> create(FSDataOutputStream fsDataOutputStream) throws IOException {
        final GZIPOutputStream gzipOutputStream = new GZIPOutputStream(fsDataOutputStream, true);
        return new GzipBulkWriter<>(new ObjectOutputStream(gzipOutputStream), gzipOutputStream);
    }
}

private static class GzipBulkWriter<T> implements BulkWriter<T> {

    private final GZIPOutputStream gzipOutputStream;
    private final ObjectOutputStream objectOutputStream;

    public GzipBulkWriter(ObjectOutputStream objectOutputStream, GZIPOutputStream gzipOutputStream) {
        this.gzipOutputStream = gzipOutputStream;
        this.objectOutputStream = objectOutputStream;
    }

    @Override
    public void addElement(T t) throws IOException {
        objectOutputStream.writeObject(t);
    }

    @Override
    public void flush() throws IOException {
        objectOutputStream.flush();
    }

    @Override
    public void finish() throws IOException {
        objectOutputStream.flush();
        gzipOutputStream.finish();
    }
}