My team is now stepping into the realm of Structured Streaming. I'm relatively new to Structured Streaming.
I have a requirement with
Source - CSV
Sink - JSON
Env Details:
Cluster: Spark 2.2.1
Programming Language: Scala
Build Tool: Gradle
Scope:
I have implemented this simple code
val schema = StructType(
Array(StructField("customer_id", StringType),
StructField("name", StringType),
StructField("pid", StringType),
StructField("product_name", StringType)))
val fileData = spark.readStream
.option("header", "true")
.schema(schema)
.csv(args(0))
Then I apply a simple aggregation as
// The actual business logic is more complex than this
val customerCount = fileData.groupBy("customer_id").count()
Finally, write to JSON
val query = customerCount
.writeStream
.format("json")
.option("path", "src/main/resources/output/myjson_dir")
.option("checkpointLocation", "src/main/resources/chkpoint_dir")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
Questions:
- This works as expected when I use
.format("console")
. But this throws an exception when I use.format("json")
-
Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;; Aggregate [customer_id#0], [customer_id#0, count(1) AS count#18L] +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@4b56b031,csv,List(),Some(StructType(StructField(customer_id,StringType,true), StructField(name,StringType,true), StructField(product_id,StringType,true), StructField(product_name,StringType,true))),List(),None,Map(header -> true, path -> /Users/Underwood/Documents/workspace/Spark_Streaming_Examples/src/main/resources/input),None), FileSource[/Users/Underwood/Documents/workspace/Spark_Streaming_Examples/src/main/resources/input], [customer_id#0, name#1, product_id#2, product_name#3, date#4]at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
I have tried is other combinations of outputMode = "update"
and outputMode = "complete"
. But these throw errors as well.
Why is this so? Is this expected behavior? How do I write the output to a JSON sink?
The above Exception talks about using watermarks. AFAIK, watermarks are used with Timestamp field, but I do not have a timestamp or date field in my input data. Please let me know if I'm wrong here. How will adding watermark make a difference here?
My next attempt was to write a custom ForEachSink. I refered to this post. But this did not help me either. The problem here was, I was getting 200 directories with 0-byte file in each.
How do I select non-group by columns in the final output? In a simple batch processing, I usually achieve this by joining the aggregated DF with the original DF and select the required rows. But Structured Streaming seems to not like this approach. Here's my sample code snippet
val customerCount = fileData.groupBy("customer_id").count() val finalDF = fileData.join(customerCount, Seq("customer_id")) .select("customer_id", "count", "product_name" )
PLease let me know if I have missed out any detials.