3
votes

I am using structured streaming with Spark 2.1.1. I need to apply some business logic to incoming messages (from Kafka source).

essentially, I need to pick up the message, get some key values, look them up in HBase and perform some more biz logic on the dataset. the end result is a string message that needs to be written out to another Kafka queue.

However, since the abstraction for incoming messages is a dataframe (unbounded table - structured streaming), I have to iterate through the dataset received during a trigger through mapPartitions (partitions due to HBase client not being serializable).

During my process, i need to iterate through each row for executing the business process for the same.

  1. Is there a better approach possible that could help me avoid the dataFrame.mapPartitions call? I feel its sequential and iterative !!
  2. Structured streaming basically forces me to generate an output data frame out of my business process, whereas there is none to start with. What other design pattern can I use to achieve my end goal ?

Would you recommend an alternative approach ?

2
It is not more sequential then other Spark operations. Personally I would advise statically typed input and output - this will save you a lot of pain with extracting values from Row and working with RowEncoders. On a side note - title of this question seems to have nothing to do with the content.zero323
:) realized that and fixed the title. Furthermore, I am using statically typed input and output already (using StructType). the encoders are a pain area, and i want to avoid them, trying different things, including foreach sink, yet nothing seems to work as expected. Any other ideas ?Raghav
What about ForEach sink? Have you considered it? What about custom Sink if ForEach were not suitable?Jacek Laskowski
@JacekLaskowski - I have been looking at your notes on gitbooks and tried to imitate the foreach sink as well. However, No luck there too.Raghav
Ask questions then. I think this one is too general. Take few steps back, ask few questions and build your understanding again on stronger ground :)Jacek Laskowski

2 Answers

13
votes

When you talk about working with Dataframes in Spark, speaking very broadly, you can do one of 3 things a) Generate a Dataframe b) Transform a data frame c) Consume a data frame

In structured streaming, a streaming DataFrame is generated using a DataSource. Normally you create sources using methods exposed sparkSession.readStream method. This method returns a DataStreamReader which has several methods for reading from various kinds of input. All of there return a DataFrame. Internally it creates a DataSource. Spark allows you to implement your own DataSource, but they recommend against it, because as of 2.2, the interface is considered experimental

You transform data frames mostly using map or reduce, or using spark SQL. There are different flavors of map (map, mapPartition, mapParititionWithIndex), etc. All of them basically take a row and return a row. Internally Spark does the work of parallelizing the calls to your map method. It partitions the data, spreads it around on executors on the cluster, and calls your map method in the executor. You don't need to worry about parallelism. It's built under the hood. mapParitions is not "sequential". Yes, rows within a partition are executed sequentially, but multiple partitions are executed in parallel. You can easily control the degree of parallelism by partitioning your dataframe. You have 5 partitions, you will have 5 processes running in parallel. You have 200, you can have 200 of them running in parallel if you have 200 cores

Note that there is nothing stopping you from going out to external systems that manage state inside your transformation. However, your transformations should be idempotent. Given a set of input, they should always generate the same output, and leave the system in the same state over time. This can be difficult if you are talking to external systems inside your transformation. Structured Streaming provides at least once guarantee. The means that the same row might be transformed multiple times. So, if you are doing something like adding money to a bank account, you might find that you have added the same amount of money twice to some of the accounts.

Data is consumed by sinks. Normally, you add a sink by calling the format method on a Dataframe and then calling start. StructuredStreaming has a handful of inbuilt sinks which (except for one) are more or less useless.You can create your custom Sink but again it's not recommended because the interface is experimental. The only useful sink is what you would implement. It is called ForEachSink. Spark will call your for each sink with all the rows in your partition. You can do whatever you want with the rows, which includes writing it to Hbase. Note that because of the at least once nature of Structured Streaming, the same row might be fed to your ForEachSink multiple times. You are expected to implement it in an idempotent manner. Also, if you have multiple sinks, data is written to sinks in parallel. You cannot control in what order the sinks are called. It can happen that one sink is getting data from one micro batch while another sink is still processing data for the previous micro batch. Essentially, the Sinks are eventually consistent, not immediately consistent.

Generally, the cleanest way to build your code is to avoid going to outside systems inside your transformations. Your transformations should purely transform data in data frames. If you want data from HBase, get it into a data frame, join it with your streaming data frame, and then transform it. This is because when you go to outside systems, it becomes difficult to scale. You want to scale up your transformations by increasing partitioning on your data frames and adding nodes. However, too many nodes talking to external systems can increase the load on the external systems and cause bottlenecks, Separating transformation from data retrieval allows you to scale them independently.

BUT!!!! there are big buts here......

1) When you talk about Structured streaming, there is no way to implement a Source that can selectively get data from your HBase based on the data in your input. You have to do this inside a map(-like) method. So, IMO, what you have is perfectly fine if the data in Hbase changes or there is a lot of data that you don't want to keep in memory. If your data in HBase is small and unchanging, then it's better to read it into a batch data frame, cache it and then join it with your streaming data frame. Spark will load all the data into its own memory/disk storage, and keep it there. If your data is small and changing very frequently, it's better to read it in a data frame, don't cache it and join it with a streaming data frame. Spark will load the data from HBase every time it runs a micro batch.

2) there is no way to order the execution of 2 separate Sinks. So, if your requirement requires you to write to a database, and write to Kafka, and you want to guarantee that a row in Kafka is written after the row is committed in the database, then the only way to do that is to a) do both writes in a For each Sink. b)write to one system in a map-like function and the other in a for each sink

Unfortunately, if you have a requirement that requires you to read data from a streaming source, join it with data from batch source, transform it, write it to database, call an API, get the result from the API and write the result of the API to Kafka, and those operations have to be done in exact order, then the only way you can do this is by implementing sink logic in a transformation component. You have to make sure you keep the logic separate in separate map functions, so you can parallelize them in an optimal manner.

Also, there is no good way to know when a micro-batch is completely processed by your application, especially if you have multiple sinks

0
votes

try ForeachWriter, In ForeachWriter process() method receives single row from data frame. and you can process the data as you want. https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/ForeachWriter.html