0
votes

I'm using flink v1.11.2 and try to sink my protobuf data to hdfs, I get code from document

My Code is Following


val writer = ParquetProtoWriters.forTypeWithConf(classOf[RawSample], CompressionCodecName.GZIP)

val sinker = StreamingFileSink
      .forBulkFormat(new Path(option.dumpOutputPath), writer)
      .withBucketAssigner(new DateTimeBucketAssigner[RawSample]("yyyy-MM-dd/HH"))
      .withRollingPolicy(OnCheckpointRollingPolicy.build())
      .withBucketCheckInterval(option.rolloverInterval)
      .withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".gz.parquet").build())
      .build()

I copies ParquetProtoWriters code to support gzip compress, RawSample is a protobuf-generated class,and it did sink file to hdfs, but filename looks like

└── 2019-08-25/12
    ├── part-0-0.gz.parquet
    ├── part-0-1.gz.parquet
    ├── ...
    ├── part-0-9.gz.parquet
└── 2019-08-25/13
    ├── part-0-10.gz.parquet
    ├── part-0-11.gz.parquet
    ├── ...
    ├── part-0-19.gz.parquet
└── 2019-08-25/14
    ├── part-0-20.gz.parquet
    ├── part-0-21.gz.parquet
    ├── ...
    ├── part-0-29.gz.parquet

The field partFileIndex from Part file configuration of part file keeps growing, it's there any way that I can keep it start from 0 for every hour and make it looks like

└── 2019-08-25/12
    ├── part-0-0.gz.parquet
    ├── part-0-1.gz.parquet
    ├── ...
    ├── part-0-9.gz.parquet
└── 2019-08-25/13
    ├── part-0-0.gz.parquet
    ├── part-0-1.gz.parquet
    ├── ...
    ├── part-0-9.gz.parquet
└── 2019-08-25/14
    ├── part-0-0.gz.parquet
    ├── part-0-1.gz.parquet
    ├── ...
    ├── part-0-9.gz.parquet