2
votes

Current State:

Today I have built a Spark Structured Streaming application which consumes a single Kafka topic which contain JSON messages. Embedded within the Kafka topic's value contains some information about the source and the schema of the message field. A very simplified version of the message looks something like this:

{
  "source": "Application A",
  "schema": [{"col_name": "countryId", "col_type": "Integer"}, {"col_name": "name", "col_type": "String"}],
  "message": {"countryId": "21", "name": "Poland"}
}

There are a handful of Kafka topics in the system today, and I've deployed this Spark Structured Streaming application per topic, using the subscribe option. The application applies the topic's unique schema (hacked by batch reading the first message in the Kafka topic and mapping the schema) and writes it to HDFS in parquet format.

Desired State:

My organization will soon start producing more and more topics and I don't think this pattern of a Spark Application per topic will scale well. Initially it seems that the subscribePattern option would work well for me, as these topics somewhat have a form of hierarchy, but now I'm stuck on applying the schema and writing to distinct locations in HDFS.

In the future we will most likely have thousands of topics and hopefully only 25 or so Spark Applications.

Does anyone have advice on how to accomplish this?

2
Why do you have to send the schema along with the message like this? Only send the message. Spark SQL can infer the schema and load each JSON as a DataFrame for you.void
Thanks for the reply! I'm using Spark Structured Streaming, which does not support schema inheritance.Kyle Schmitt
Have you tried to configure a pool ( in spark standalone you need to configure manually and on yarn: Hadoop Fair Scheduler ) and try to submit all the jobs. Although you will create multiple jobs ( one for each topic) but with fewer resources).Arnon Rodman
I think as the dataframe what we get when we subscribe to list of topics, will have (key,value,topic,parition,offset,timestamp). So we can try to select only particular topic by querying the df and then apply schema. So for different topics different schema we can apply.vamshi palutla

2 Answers

1
votes

When sending these events with your kafka producer, you could also send a key as well as the value. If every event had it's event type as the key, when reading the stream from the topic(s), you could also get the key:

val kafkaKvPair = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)] 

Then you could just filter on which events you want to process:

val events = kafkaKvPair
  .filter(f => f._1 == "MY_EVENT_TYPE")

In this way if you are subscribed to multiple topics within one Spark app, you could process as many event types as you wish.

1
votes

If you are running Kafka 0.11+, consider using the headers functionality. Headers will come across as a MapType, and you can then route messages based on their header without having to parse the body first.