I have logic as below using Spark Structured Streaming 2.3: Where I join two streams on id and then output the join stream data. Not using watermarking.This is a simple socket stream setup.
val df1 = spark.readStream.format("socket").option("host","localhost").option("port","5431").load()
val df2 = df1.as[String].map(x=>x.split(","))
val df3 = df2.select($"value"(0).as("name"),$"value"(1).cast(IntegerType).as("id"),$"value"(2).cast(TimestampType).as("ts"))
val df1_1 = spark.readStream.format("socket").option("host","localhost").option("port","5430").load()
val df2_1 = df1_1.as[String].map(x=>x.split(","))
val df3_1 = df2_1.select($"value"(0).as("name"),$"value"(1).cast(IntegerType).as("id"),$"value"(2).cast(TimestampType).as("ts"))
val joindf = df3.join(df3_1,df3("id") <=> df3_1("id"))
val res = joindf.writeStream.outputMode("append").trigger(Trigger.ProcessingTime(15 seconds))
.format("console").option("truncate","false").start()
res.awaitTermination()
Say the first trigger has data in two streams as :
df3:
vinyas,1,2018-03-17 09:04:21
namratha,2,2018-03-17 09:04:23
varsha,3,2018-03-17 09:04:33
df3_1:
vinyas,1,2018-03-17 09:04:21
shetty,2,2018-03-17 09:04:23
varsha,3,2018-03-17 09:04:33
I got the result as expected :
-------------------------------------------
Batch: 0
-------------------------------------------
+--------+---+-------------------+------+---+-------------------+
|name |id |ts |name |id |ts |
+--------+---+-------------------+------+---+-------------------+
|vinyas |1 |2018-03-17 09:04:21|vinyas|1 |2018-03-17 09:04:21|
|varsha |3 |2018-03-17 09:04:33|varsha|3 |2018-03-17 09:04:33|
|namratha|2 |2018-03-17 09:04:23|shetty|2 |2018-03-17 09:04:23|
+--------+---+-------------------+------+---+-------------------+
Next i stream data : df3:
shrinivas,2,2018-03-17 09:04:23
df3_1:
vidya,2,2018-03-17 09:04:23
Get output as :
-------------------------------------------
Batch: 1
-------------------------------------------
+---------+---+-------------------+------+---+-------------------+
|name |id |ts |name |id |ts |
+---------+---+-------------------+------+---+-------------------+
|shrinivas|2 |2018-03-17 09:04:23|shetty|2 |2018-03-17 09:04:23|
|namratha |2 |2018-03-17 09:04:23|vidya |2 |2018-03-17 09:04:23|
|shrinivas|2 |2018-03-17 09:04:23|vidya |2 |2018-03-17 09:04:23|
+---------+---+-------------------+------+---+-------------------+
Can someone explain how i am getting the Batch 1 result,should it not have 4 records instead of 3?