0
votes

I am writing Kafka data stream to bucketing sink in a HDFS path. Kafka gives out string data. Using FlinkKafkaConsumer010 to consume from Kafka

-rw-r--r--   3 ubuntu supergroup    4097694 2018-10-19 19:16 /streaming/2018-10-19--19/_part-0-1.in-progress
-rw-r--r--   3 ubuntu supergroup    3890083 2018-10-19 19:16 /streaming/2018-10-19--19/_part-1-1.in-progress
-rw-r--r--   3 ubuntu supergroup    3910767 2018-10-19 19:16 /streaming/2018-10-19--19/_part-2-1.in-progress
-rw-r--r--   3 ubuntu supergroup    4053052 2018-10-19 19:16 /streaming/2018-10-19--19/_part-3-1.in-progress

This happens only when I use some mapping function to manipulate the stream data on the fly. If I directly write the stream to HDFS its working fine. Any idea why this might be happening? I am using Flink 1.6.1, Hadoop 3.1.1 and Oracle JDK1.8

3

3 Answers

2
votes

Little bit late for this question, but I also experience similar issue. I have a case class Address

case class Address(val i: Int)

and I read the source from collection with number of Address, for example

    env.fromCollection(Seq(new Address(...), ...)) 

    ...
    val customAvroFileSink = StreamingFileSink
      .forBulkFormat(
        new Path("/tmp/data/"),
        ParquetAvroWriters.forReflectRecord(classOf[Address]))
      .build()
    ... 
    xxx.addSink(customAvroFileSink)

with checkpoint enabled, my parquet file will also end up with in-progress

I find that the Flink finish the process before checkpoint triggered, so my result never full flushed to the disk. After I changed the checkpoint interval to a smaller number, the parquet is no longer in-progress.

1
votes

This scenario generally happens when checkpointing is disabled.

Could you check checkpointing setting while running a job with the mapping function? Looks like you have enabled checkpointing for a job writing directly to HDFS.

0
votes

I had a similar issue and enabling checkpointing and changing the state backend from the default MemoryStateBackend to FsStateBackend worked. In my case, checkpointing failed because MemoryStateBackend had a maxStateSize that was too small such that the state of one of the operations could not fit in memory.

StateBackend stateBackend = new FsStateBackend("file:///home/ubuntu/flink_state_backend");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
    .enableCheckpointing(Duration.ofSeconds(60).toMillis())
    .setStateBackend(stateBackend);