1
votes

I am trying to run a structured streaming application which writes the output files as parquet to Google cloud storage. I don't see any errors. But it does not write the files to GCS location. I could see only spark-metadata folder. Any idea how I can debug?

WindowDuration = "60 minutes";
SlideDuration = "10 minutes";
Data_2 = complete_data;
Data_2 = data_2.withColumn("creationDt", functions.to_timestamp( functions.from_unixtime(col(topics+"."+event_timestamp).divide(1000.0))));
Data_2 = data_2
        .withWatermark("creationDt","1 minute")
        .groupBy(col(topics+"."+keyField),functions.window(col("creationDt"), windowDuration, slideDuration),col(topics+"."+aggregateByField))
        .count();

Query_2 = data_2
        .withColumn("startwindow", col("window.start"))
        .withColumn("endwindow", col("window.end"))
        .withColumn("endwindow_date", col("window.end").cast(DataTypes.DateType))
        .writeStream()
        .format("parquet")
        .partitionBy("endwindow_date")
        .option("path",dataFile_2)
        .option("truncate", "false")
        .outputMode("append")
                .option("checkpointLocation", checkpointFile_2).start();

Query_2.awaitTermination()
1
What's the Spark version? What's in web UI's SQL tab? What's under spark-metadata folder? What's the source(s)? Any aggregations? More, more, more... - Jacek Laskowski
I am using Spark version 2.2 provided by Google dataproc. Spark metadata folder contains many files named with numbers like 220,221 etc. All these files have "v1" as content. I don't see any valuable contents. - passionate
What about web UI? Anything? The invaluable contents is very valuable as it shows that the query's running. - Jacek Laskowski
I am just changing the firewall so that I can access UI. I will update in sometime. Your help is much appreciated. I have really tried a lot and I am not able to figure it out. - passionate
UI shows like "all the tasks succeeded overtime a batch runs. I have three streaming queries in the same program. Will that be a problem? Updated the question with code also. I have three similar queries with 30 min window, 60 min window and 24 hours window. - passionate

1 Answers

0
votes

I believe the issue is with .outputMode("append") line. GCS is not a file system and does not support append mode.

I am guessing that this line blows up and the exception is just gobbled up somewhere: https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemBase.java#L1175