2
votes

I have a Spark Structured Streaming Application which has to read from 12 Kafka topics (Different Schemas, Avro format) at once, deserialize the data and store in HDFS. When I read from a single topic using my code, it works fine and without errors but on running multiple queries together, I'm getting the following error

java.lang.IllegalStateException: Race while writing batch 0

My code is as follows:

def main(args: Array[String]): Unit = {


  val kafkaProps = Util.loadProperties(kafkaConfigFile).asScala
  val topic_list = ("topic1", "topic2", "topic3", "topic4")

  topic_list.foreach(x => {
kafkaProps.update("subscribe", x)

val source= Source.fromInputStream(Util.getInputStream("/schema/topics/" + x)).getLines.mkString
val schemaParser = new Schema.Parser
val schema = schemaParser.parse(source)
val sqlTypeSchema = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]

val kafkaStreamData = spark
  .readStream
  .format("kafka")
  .options(kafkaProps)
  .load()

val udfDeserialize = udf(deserialize(source), DataTypes.createStructType(sqlTypeSchema.fields))

val transformedDeserializedData = kafkaStreamData.select("value").as(Encoders.BINARY)
  .withColumn("rows", udfDeserialize(col("value")))
  .select("rows.*")

val query = transformedDeserializedData
  .writeStream
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .outputMode("append")
  .format("parquet")
  .option("path", "/output/topics/" + x)
  .option("checkpointLocation", checkpointLocation + "//" + x)
  .start()  
})  
spark.streams.awaitAnyTermination()  
 }
1
Why not use KAFKA Connect? Make life easier, also small files issues potentially. - thebluephantom
Interesting approach. the formats are correct when only one such topic I assume. Will try this at some stage. See also this: waitingforcode.com/apache-spark-structured-streaming/…. Concurrency issue? - thebluephantom
@thebluephantom It is probably not related to small files because the data is pretty large. Anyways can you guide me how I can do this with Kafka Connect? - Siddharth Goel
ok, cool on the common problem of small files. KAFKA Connect is either via Confluent or KAFKA CONNECT itself. Point is you need a KAFKA Connect Cluster and that is typically a systems thing to be set up via Administrators. You need to see how your organization feels about that.\ - thebluephantom

1 Answers

0
votes

Alternative. You can use KAFKA Connect (from Confluent), NIFI, StreamSets etc. as your use case seems to fit "dump/persist to HDFS". That said, you need to have these tools (installed). The small files problem you state is not an issue, so be it.

From Apache Kafka 0.9 or later version you can Kafka Connect API for KAFKA --> HDFS Sink (various supported HDFS formats). You need a KAFKA Connect Cluster though, but that is based on your existing Cluster in any event, so not a big deal. But someone needs to maintain.

Some links to get you on your way: