I have a spark structured steaming application that I'm reading in from Kafka. Here is the basic structure of my code.
I create the Spark session.
val spark = SparkSession
.builder
.appName("app_name")
.getOrCreate()
Then I read from the stream
val data_stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server_list")
.option("subscribe", "topic")
.load()
In Kafka record, I cast the "value" as a string. It converts from binary to string. At this point there is 1 column in the data frame
val df = data_stream
.select($"value".cast("string") as "json")
Based off of a pre-defined schema, I try to parse out the JSON structure into columns. However, the problem here is if the data is "bad", or a different format then it doesn't match the defined schema. So the next dataframe (df2) get's null values into the columns.
val df2 = df.select(from_json($"json", schema) as "data")
.select("data.*")
I'd like to be able to filter out from df2 the row's that have "null" in a certain column (one that I use as a primary key in a database) i.e. ignore bad data that doesn't match the schema?
EDIT: I was somewhat able to accomplish this but not the way I intended to.
In my process, I use a query that uses the .foreach(writer)
process. What this does is it opens a connection to a database, processes each row, and then closes the connection. The documentation for structured streaming mentions the necessities you need for this process. In the process method, I get the values from each row and check if my primary key is null, if it is null I don't insert it into the database.