0
votes

I have Spark Streaming job which reads data from kafka partitions (one executor per partition).
I need to save transformed values to HDFS, but need to avoid empty files creation.
I tried to use isEmpty, but this doesn't help when not all partitions are empty.

P.S. repartition is not an acceptable solution due to perfomance degradation.

1
You could use Kafka Connect instead... Then you wouldn't need to write code, and you wouldn't have empty files - OneCricketeer
@cricket_007 this could work for text data, but will not for my avro pipeline which requires processing and multiple outputs. Now it works fine with LazyOutputFormat - Ruslan Ostafiichuk
You don't have to use the Schema Registry, either github.com/farmdawgnation/registryless-avro-converter - OneCricketeer
@cricket_007 I have json, not avro in Kafka. I build three outputs with different content in avro for each message. I read page on confluent.io after your first comment, but still don't think it could solve my problem. - Ruslan Ostafiichuk

1 Answers

0
votes

The code works for PairRDD only.

Code for text:

  val conf = ssc.sparkContext.hadoopConfiguration
  conf.setClass("mapreduce.output.lazyoutputformat.outputformat",
    classOf[TextOutputFormat[Text, NullWritable]]
    classOf[OutputFormat[Text, NullWritable]])

  kafkaRdd.map(_.value -> NullWritable.get)
    .saveAsNewAPIHadoopFile(basePath,
      classOf[Text],
      classOf[NullWritable],
      classOf[LazyOutputFormat[Text, NullWritable]],
      conf)

Code for avro:

  val avro: RDD[(AvroKey[MyEvent], NullWritable)]) = ....
  val conf = ssc.sparkContext.hadoopConfiguration

  conf.set("avro.schema.output.key", MyEvent.SCHEMA$.toString)
  conf.setClass("mapreduce.output.lazyoutputformat.outputformat",
    classOf[AvroKeyOutputFormat[MyEvent]],
    classOf[OutputFormat[AvroKey[MyEvent], NullWritable]])

  avro.saveAsNewAPIHadoopFile(basePath,
    classOf[AvroKey[MyEvent]],
    classOf[NullWritable],
    classOf[LazyOutputFormat[AvroKey[MyEvent], NullWritable]],
    conf)