3
votes

I have created a writer for BucketingSink. The sink and writer works without error but when it comes to the writer writing avro genericrecord to parquet, the file was created from in-progress, pending to complete. But the files are empty with 0 bytes. Can anyone tell me what is wrong with the code ? I have tried placing the initialization of AvroParquetWriter at the open() method, but result still the same.

When debugging the code, I confirm that writer.write(element) does executed and element contain the avro genericrecord data

Streaming Data

BucketingSink<DataEventRecord> sink =
    new BucketingSink<DataEventRecord>("hdfs://localhost:9000/tmp/");

sink.setBucketer(new DateTimeBucketer<DataEventRecord>("yyyy-MM-dd--HHmm"));
sink.setWriter(new ParquetSinkWriter<DataEventRecord>());

ParquetSinkWriter

import java.io.File;
import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import com.any.DataEventRecord;

public class ParquetSinkWriter<T> extends StreamWriterBase<T> {

  private transient ParquetWriter<GenericRecord> writer;

  private Path path;
  private FileSystem fs;
  private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
  private final int blockSize = 256 * 1024 * 1024;
  private final int pageSize = 64 * 1024;


  @Override
  // workaround
  public void open(FileSystem fs, Path path) throws IOException {
    super.open(fs, path);
    this.path = path;
    this.fs = fs;
  }

  @Override
  public void write(T event) throws IOException {
    DataEventRecord element = (DataEventRecord) event;

    if (writer == null) {
      writer = new AvroParquetWriter<GenericRecord>(this.path, element.getSchema(), compressionCodecName, blockSize, pageSize);
    }

    if (writer != null) {
      GenericRecord datum = element.getRecord();
      writer.write(datum);
    }
  }

  @Override
  public void close() throws IOException {
    if (writer != null) {
      writer.close();
    }
    super.close();
  }

  @Override
  public Writer<T> duplicate() {
    return new ParquetSinkWriter<T>();
  }

}
1
I managed to resolve the problem. There is an issue when call super.open(fs, path) at the same time creating AvroParquetWRiter instance during write process. The open event already create a file and the writer is also trying to create the same file but not able to because file already exists. Therefore, there is always 0 records written into the file as the Avro writer fail to write into the already exists file. Removing super.open will cause the base class to fail due to "Writer is not open". I eventually extend my own sink class based on BucketingSink and everything is working fine now.jlim
Could you please show some reference code on how you solved it? I am also stuck with the same issueneoeahit
Can't you simply implement the Writer interface instead of using the StreamWriterBase? The StreamWriterBase opens a FSDataOutputStream to the file which you don't need.Till Rohrmann
@jlim can you share your solution ? I am stuck with the same issueigx
@igx We have move out from Flink and have build our own ingestion pipeline. Give me your email, i will email the code that we have extended from Flink. Bear in mind, those files that we have modified during that time is for Flink 1.3. Make sure you compare this file with the Flink version you are using and merge what is required.jlim

1 Answers

2
votes

Directly implementing Writer should look like

import org.apache.flink.util.Preconditions;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.io.IOException;

/**
 * Parquet writer.
 *
 * @param <T>
 */
public class ParquetSinkWriter<T extends GenericRecord> implements Writer<T> {

    private static final long serialVersionUID = -975302556515811398L;

    private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
    private final int pageSize = 64 * 1024;

    private final String schemaRepresentation;

    private transient Schema schema;
    private transient ParquetWriter<GenericRecord> writer;
    private transient Path path;

    private int position;

    public ParquetSinkWriter(String schemaRepresentation) {
        this.schemaRepresentation = Preconditions.checkNotNull(schemaRepresentation);
    }

    @Override
    public void open(FileSystem fs, Path path) throws IOException {
        this.position = 0;
        this.path = path;

        if (writer != null) {
            writer.close();
        }

        writer = createWriter();
    }

    @Override
    public long flush() throws IOException {
        Preconditions.checkNotNull(writer);
        position += writer.getDataSize();
        writer.close();
        writer = createWriter();

        return position;
    }

    @Override
    public long getPos() throws IOException {
        Preconditions.checkNotNull(writer);
        return position + writer.getDataSize();
    }

    @Override
    public void close() throws IOException {
        if (writer != null) {
            writer.close();
            writer = null;
        }
    }

    @Override
    public void write(T element) throws IOException {
        Preconditions.checkNotNull(writer);
        writer.write(element);
    }

    @Override
    public Writer<T> duplicate() {
        return new ParquetSinkWriter<>(schemaRepresentation);
    }

    private ParquetWriter<GenericRecord> createWriter() throws IOException {
        if (schema == null) {
            schema = new Schema.Parser().parse(schemaRepresentation);
        }

        return AvroParquetWriter.<GenericRecord>builder(path)
            .withSchema(schema)
            .withDataModel(new GenericData())
            .withCompressionCodec(compressionCodecName)
            .withPageSize(pageSize)
            .build();
    }
}