I'm trying to write a dataframe to a *.csv file to HDFS using Databricks' spark-csv_2.10 dependency. The dependency seems to work fine as I'm able to read a .csv file to a DataFrame. But when I perform a write, I get the following error. The exception comes after the header is written to the file.
18/06/21 21:41:58 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
This is a simplified version of the code I use
DataFrame df = sqlContext.read().format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", "|")
.load("/user/abc/data.csv");
df.registerTempTable("empTable");
DataFrame result = sqlContext.sql("SELECT department, avg(salary) as avgSalary FROM empTable GROUP BY department").cache();
result.write()
.format("com.databricks.spark.csv")
.option("header", "true")
.save("/user/abc/csv/"); //InterruptedException comes here
//The directory "/user/abc/csv/" gets created and it even has temp files.
The write works fine when I change the query to SELECT department, salary FROM empTable
.
Can anyone help me with this?
Edit: As requested by Chandan here is the result of result.show();
+----------+---------+
|department|avgSalary|
+----------+---------+
| Finance| 5000.0|
| Travel| 5000.0|
+----------+---------+
I use Spark 1.6.0 and spark-csv_2.10:1.5.0
SQLContext.sql()
– Amber