I'm using flink v1.11.2 and try to sink my protobuf data to hdfs, I get code from document
My Code is Following
val writer = ParquetProtoWriters.forTypeWithConf(classOf[RawSample], CompressionCodecName.GZIP)
val sinker = StreamingFileSink
.forBulkFormat(new Path(option.dumpOutputPath), writer)
.withBucketAssigner(new DateTimeBucketAssigner[RawSample]("yyyy-MM-dd/HH"))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withBucketCheckInterval(option.rolloverInterval)
.withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".gz.parquet").build())
.build()
I copies ParquetProtoWriters
code to support gzip compress, RawSample
is a protobuf-generated class,and it did sink file to hdfs, but filename looks like
└── 2019-08-25/12
├── part-0-0.gz.parquet
├── part-0-1.gz.parquet
├── ...
├── part-0-9.gz.parquet
└── 2019-08-25/13
├── part-0-10.gz.parquet
├── part-0-11.gz.parquet
├── ...
├── part-0-19.gz.parquet
└── 2019-08-25/14
├── part-0-20.gz.parquet
├── part-0-21.gz.parquet
├── ...
├── part-0-29.gz.parquet
The field partFileIndex
from Part file configuration of part file keeps growing, it's there any way that I can keep it start from 0 for every hour and make it looks like
└── 2019-08-25/12
├── part-0-0.gz.parquet
├── part-0-1.gz.parquet
├── ...
├── part-0-9.gz.parquet
└── 2019-08-25/13
├── part-0-0.gz.parquet
├── part-0-1.gz.parquet
├── ...
├── part-0-9.gz.parquet
└── 2019-08-25/14
├── part-0-0.gz.parquet
├── part-0-1.gz.parquet
├── ...
├── part-0-9.gz.parquet