5
votes

I am writing an spark app ,where I need to evaluate the streaming data based on the historical data, which sits in a sql server database

Now the idea is , spark will fetch the historical data from the database and persist it in the memory and will evaluate the streaming data against it .

Now I am getting the streaming data as

import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext,functions as func,Row


sc = SparkContext("local[2]", "realtimeApp")
ssc = StreamingContext(sc,10)
files = ssc.textFileStream("hdfs://RealTimeInputFolder/")

########Lets get the data from the db which is relavant for streaming ###

driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
dataurl = "jdbc:sqlserver://myserver:1433"
db = "mydb"
table = "stream_helper"
credential = "my_credentials"

########basic data for evaluation purpose ########



files_count = files.flatMap(lambda file: file.split( ))

pattern =  '(TranAmount=Decimal.{2})(.[0-9]*.[0-9]*)(\\S+ )(TranDescription=u.)([a-zA-z\\s]+)([\\S\\s]+ )(dSc=u.)([A-Z]{2}.[0-9]+)'


tranfiles = "wasb://myserver.blob.core.windows.net/RealTimeInputFolder01/"

def getSqlContextInstance(sparkContext):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
    return globals()['sqlContextSingletonInstance']


def pre_parse(logline):
    """
    to read files as rows of sql in pyspark streaming using the pattern . for use of logging 
    added 0,1 in case there is any failure in processing by this pattern

    """
    match = re.search(pattern,logline)
    if match is None:
        return(line,0)
    else:
        return(
        Row(
        customer_id = match.group(8)
        trantype = match.group(5)
        amount = float(match.group(2))
        ),1)


def parse():
    """
    actual processing is happening  here 
    """
    parsed_tran = ssc.textFileStream(tranfiles).map(preparse)
    success = parsed_tran.filter(lambda s: s[1] == 1).map(lambda x:x[0])
    fail = parsed_tran.filter(lambda s:s[1] == 0).map(lambda x:x[0])
    if fail.count() > 0:
        print "no of non parsed file : %d", % fail.count()

    return success,fail

success ,fail = parse()

Now I want to evaluate it by the data frame that I get from the historical data

base_data = sqlContext.read.format("jdbc").options(driver=driver,url=dataurl,database=db,user=credential,password=credential,dbtable=table).load()

Now since this being returned as a data frame how do I use this for my purpose . The streaming programming guide here says
"You have to create a SQLContext using the SparkContext that the StreamingContext is using."

Now this makes me even more confused on how to use the existing dataframe with the streaming object . Any help is highly appreciated .

1
There are multiple ways to go about this. I would suggest looking at foreachRDD method for DStreams in which you can use your existing dataframe as a broadcast variable. You could also convert your dataframe to an RDD and join that with every DStream RDD using transformations. If you need the sparkContext from the sparkStreamingContext in the foreachRDD method to create the sqlContext you can simply call ssc.sparkContext()Saif Charaniya

1 Answers

0
votes

To manipulate DataFrames, you always need a SQLContext so you can instanciate it like :

sc = SparkContext("local[2]", "realtimeApp")
sqlc = SQLContext(sc)
ssc = StreamingContext(sc, 10)

These 2 contexts (SQLContext and StreamingContext) will coexist in the same job because they are associated with the same SparkContext. But, keep in mind, you can't instanciate two different SparkContext in the same job.

Once you have created your DataFrame from your DStreams, you can join your historical DataFrame with the DataFrame created from your stream. To do that, I would do something like :

yourDStream.foreachRDD(lambda rdd: sqlContext
    .createDataFrame(rdd)
    .join(historicalDF, ...)
    ...
)

Think about the amount of streamed data you need to use for your join when you manipulate streams, you may be interested by the windowed functions