1
votes

I have a spark structured steaming application that I'm reading in from Kafka. Here is the basic structure of my code.

I create the Spark session.

val spark = SparkSession
  .builder
  .appName("app_name")
  .getOrCreate()

Then I read from the stream

val data_stream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "server_list")
  .option("subscribe", "topic")
  .load()

In Kafka record, I cast the "value" as a string. It converts from binary to string. At this point there is 1 column in the data frame

val df = data_stream
    .select($"value".cast("string") as "json")

Based off of a pre-defined schema, I try to parse out the JSON structure into columns. However, the problem here is if the data is "bad", or a different format then it doesn't match the defined schema. So the next dataframe (df2) get's null values into the columns.

val df2 = df.select(from_json($"json", schema) as "data")
  .select("data.*")

I'd like to be able to filter out from df2 the row's that have "null" in a certain column (one that I use as a primary key in a database) i.e. ignore bad data that doesn't match the schema?

EDIT: I was somewhat able to accomplish this but not the way I intended to. In my process, I use a query that uses the .foreach(writer) process. What this does is it opens a connection to a database, processes each row, and then closes the connection. The documentation for structured streaming mentions the necessities you need for this process. In the process method, I get the values from each row and check if my primary key is null, if it is null I don't insert it into the database.

2

2 Answers

1
votes

Just filter out any null values you don't want:

df2
  .filter(row => row("colName") != null)
0
votes

Kafka stores data as raw byte array format. Data producers and consumers need to agree a structure of data for processing.

If there is change in produced message format, consumer need to adjust to read same format. The problem comes when your data structure is evolving, you might need to have compatible at consumer side.

Defining message format by Protobuff solves this problem.