0
votes

I have designed a Nifi flow to push JSON events serialized in Avro format into Kafka topic, then I am trying to consume it in Spark Structured streaming.

While Kafka part works fine, Spark Structured streaming is not able to read Avro events. It fails with below error.

[Stage 0:>                                                          (0 + 1) / 1]2019-07-19 16:56:57 ERROR Utils:91 - Aborting task
org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -62
        at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
        at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
        at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
        at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
        at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)

Spark code

import org.apache.spark.sql.types.{ StructField, StructType }
import org.apache.spark.sql.types.{ DecimalType, LongType, ByteType, StringType }
import org.apache.spark.sql.types.DataType._
import scala.collection.Seq
import org.apache.spark._
import spark.implicits._
import org.apache.spark.streaming._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.sql.avro._
import java.nio.file.{Files, Path, Paths}

val spark = SparkSession.builder.appName("Spark-Kafka-Integration").master("local").getOrCreate()
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("schema.avsc")))
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:port").option("subscribe", "topic_name").load()
val df1 = df.select(from_avro(col("value"),jsonFormatSchema).as("data")).select("data.*")
df1.writeStream.format("console").option("truncate","false").start()
))

Schema used in Spark

{
 "type": "record",
 "name": "kafka_demo_new",
 "fields": [
  {
   "name": "host",
   "type": "string"
  },
  {
   "name": "event",
   "type": "string"
  },
  {
   "name": "connectiontype",
   "type": "string"
  },
  {
   "name": "user",
   "type": "string"
  },
  {
   "name": "eventtimestamp",
   "type": "string"
  }
 ]
}

Sample topic data in Kafka

{"host":"localhost","event":"Qradar_Demo","connectiontype":"tcp/ip","user":"user","eventtimestamp":"2018-05-24 23:15:07"}

Below is version information

HDP - 3.1.0
Kafka - 2.0.0
Spark - 2.4.0

Any help is appreciated.

1
The issue might be the schema in Nifi and what Kafka serializer you are using. For example, did you add the Schema Registry provider? If so, Spark cannot natively read thatOneCricketeer
Did you try a sample avro file and Spark SQL alone? No Kafka, no NIFI, no Structured Streaming, just a sample avro file and Spark SQL. That'd give you a smaller environment to isolate the issue. I'd give your issue a shot, but there's too many moving parts.Jacek Laskowski
@Jacek Laskowski I'll give a try , avro file and Spark SQL alonechhaya vishwakarma

1 Answers

0
votes

Had a similar issue and found out that Kafka / KSQL have a different version of AVRO that made other components complain.

This might be your case also: Have a look: https://github.com/confluentinc/ksql/issues/1742