2
votes

I am reading data using Spark Streaming from a Kafka Source, from where I create a dataframe with columns wsid, year, month, day, oneHourPrecip:

val df = spark.readStream
    .format("kafka")
    .option("subscribe", "raw_weather")
    .option("kafka.bootstrap.servers", "<host1:port1,host2:port2>...")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.mechanism" , "PLAIN")
    .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"token\" password=\"" + "<some password>" + "\";")
    .option("kafka.ssl.protocol", "TLSv1.2")
    .option("kafka.ssl.enabled.protocols", "TLSv1.2")
    .option("kafka.ssl.endpoint.identification.algorithm", "HTTPS")
    .load()
    .selectExpr("CAST(value as STRING)")
    .as[String]
    .withColumn("_tmp", split(col("value"), "\\,"))
    .select(
        $"_tmp".getItem(0).as("wsid"),
        $"_tmp".getItem(1).as("year").cast("int"),
        $"_tmp".getItem(2).as("month").cast("int"),
        $"_tmp".getItem(3).as("day").cast("int"),
        $"_tmp".getItem(11).as("oneHourPrecip").cast("double")
    )
    .drop("_tmp")

I then perform a groupby and then try to write this stream data into a table using JDBC. For that purpose, this is my code:

val query= df.writeStream
    .outputMode(OutputMode.Append())
    .foreachBatch((df: DataFrame , id: Long) => {
        println(df.count())
        df.groupBy($"wsid" , $"year" , $"month" , $"day")
            .agg(sum($"oneHourPrecip").as("precipitation"))
            .write
            .mode(SaveMode.Append)
            .jdbc(url , s"$schema.$table" , getProperties)
    })
    .trigger(Trigger.ProcessingTime(1))
    .start()

The problem comes with the batch. With Spark Streaming, we cannot predict the number of rows that come every batch in a dataframe. So quite a lot of times, I get data that is disjointed (ie. for the given common values (wsid,year,month,day), some rows appear in one batch while some others appear in another batch).

Then when I groupby and try to write it using JDBC, this is the error I get:

com.ibm.db2.jcc.am.BatchUpdateException: [jcc][t4][102][10040][4.25.13] Batch failure.  The batch was submitted, but at least one exception occurred on an individual member of the batch.
Use getNextException() to retrieve the exceptions for specific batched elements. ERRORCODE=-4229, SQLSTATE=null
    at com.ibm.db2.jcc.am.b6.a(b6.java:502)
    at com.ibm.db2.jcc.am.Agent.endBatchedReadChain(Agent.java:434)
    at com.ibm.db2.jcc.am.k4.a(k4.java:5452)
    at com.ibm.db2.jcc.am.k4.c(k4.java:5026)
    at com.ibm.db2.jcc.am.k4.executeBatch(k4.java:3058)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:672)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Suppressed: com.ibm.db2.jcc.am.SqlIntegrityConstraintViolationException: Error for batch element #1: DB2 SQL Error: SQLCODE=-803, SQLSTATE=23505, SQLERRMC=1;SPARK.DAILY_PRECIPITATION_DATA, DRIVER=4.25.13
        at com.ibm.db2.jcc.am.b6.a(b6.java:806)
        at com.ibm.db2.jcc.am.b6.a(b6.java:66)
        at com.ibm.db2.jcc.am.b6.a(b6.java:140)
        at com.ibm.db2.jcc.t4.ab.a(ab.java:1283)
        at com.ibm.db2.jcc.t4.ab.a(ab.java:128)
        at com.ibm.db2.jcc.t4.p.a(p.java:57)
        at com.ibm.db2.jcc.t4.aw.a(aw.java:225)
        at com.ibm.db2.jcc.am.k4.a(k4.java:3605)
        at com.ibm.db2.jcc.am.k4.d(k4.java:6020)
        at com.ibm.db2.jcc.am.k4.a(k4.java:5372)
        ... 17 more

As evident from the SqlIntegrityConstraintViolationException above, it is because after one batch writes the groupbyed values using JDBC, insertion for the next set of values fail because of the primary key (wsid,year,month,day).

Given that there will be a fixed number of oneHourPrecip values (24) for a given (wsid,year,month,day) from the source, how do we ensure that groupBy works properly for all data that is streamed from the source, so that insertion into Database is not a problem?

2

2 Answers

2
votes

SaveMode.Upsert is not available :-) There is nothing to do with groupBy. group by just groups the values. integrity violation (com.ibm.db2.jcc.am.SqlIntegrityConstraintViolationException) you need to take care at sql level.

Option 1:

You can do insert update to avoid integrety violation.

for this you need to use like below pseudo code...

dataframe.foreachPartition {

update TABLE_NAME set FIELD_NAME=xxxxx where MyID=XXX;

INSERT INTO TABLE_NAME values (colid,col1,col2) 
WHERE NOT EXISTS(select 1 from TABLE_NAME where colid=xxxx);
}

Option 2 : or check merge statement in db2

one way is create a empty temp table ( with out any connstraints) which has same schema and populate it and at the end you can execute a script which will merge in to the target table.

0
votes

I did figure something out, but this may have some performance concerns. Anyways, it worked for me so am posting the answer:

I figured out that in order to store a groupbyed data into a DB2 table, we would have to wait until we retrieve all the data from the source. For that, I utilize OutputMode.Complete().

Then I realized if I were to write it into DB2 after grouping in the current method, it would still throw me the same error. For that, I had to use SaveMode.Overwrite inside foreachBatch.

I tried running my program with this approach, but it threw this error:

org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets

So I decided to do groupby and aggregation during readStream itself. Thus my code looks like this:

readStream part:

val df = spark.readStream
    .format("kafka")
    .option("subscribe", "raw_weather")
    .option("kafka.bootstrap.servers", "<host1:port1,host2:port2>...")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.mechanism" , "PLAIN")
    .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"token\" password=\"" + "<some password>" + "\";")
    .option("kafka.ssl.protocol", "TLSv1.2")
    .option("kafka.ssl.enabled.protocols", "TLSv1.2")
    .option("kafka.ssl.endpoint.identification.algorithm", "HTTPS")
    .load()
    .selectExpr("CAST(value as STRING)")
    .as[String]
    .withColumn("_tmp", split(col("value"), "\\,"))
    .select(
        $"_tmp".getItem(0).as("wsid"),
        $"_tmp".getItem(1).as("year").cast("int"),
        $"_tmp".getItem(2).as("month").cast("int"),
        $"_tmp".getItem(3).as("day").cast("int"),
        $"_tmp".getItem(11).as("oneHourPrecip").cast("double")
    )
    .drop("_tmp")
    .groupBy($"wsid" , $"year" , $"month" , $"day")
    .agg(sum($"oneHourPrecip").as("precipitation"))

writeStream part:

val query= df.writeStream
    .outputMode(OutputMode.Complete())
    .foreachBatch((df: DataFrame , id: Long) => {
        println(df.count())
        df.write
            .mode(SaveMode.Overwrite)
            .jdbc(url , s"$schema.$table" , getProperties)
    })
    .trigger(Trigger.ProcessingTime(1))
    .start()

query.awaitTermination()