1
votes

I'm trying to write a dataframe to a *.csv file to HDFS using Databricks' spark-csv_2.10 dependency. The dependency seems to work fine as I'm able to read a .csv file to a DataFrame. But when I perform a write, I get the following error. The exception comes after the header is written to the file.

18/06/21 21:41:58 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)

This is a simplified version of the code I use

DataFrame df = sqlContext.read().format("com.databricks.spark.csv")
                        .option("header", "true")
                        .option("inferSchema", "true")
                        .option("delimiter", "|")
                        .load("/user/abc/data.csv");
df.registerTempTable("empTable");
DataFrame result = sqlContext.sql("SELECT department, avg(salary) as avgSalary FROM empTable GROUP BY department").cache();
result.write()
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .save("/user/abc/csv/"); //InterruptedException comes here
//The directory "/user/abc/csv/" gets created and it even has temp files.

The write works fine when I change the query to SELECT department, salary FROM empTable.

Can anyone help me with this?

Edit: As requested by Chandan here is the result of result.show();

+----------+---------+
|department|avgSalary|
+----------+---------+
|   Finance|   5000.0|
|    Travel|   5000.0|
+----------+---------+

I use Spark 1.6.0 and spark-csv_2.10:1.5.0

2
Which version of Spark are you using?Shaido
Do you have the permissions to write to the HDFS directory used in the code?wandermonk
can you use save("/user/abc/csv/test"); instead of save("/user/abc/csv/");wandermonk
@PhaniKumarYadavilli Would that make a difference? Because both /csv and /test would be directories. Plus, the same path works fine when I change the query run by SQLContext.sql()Amber
Usually, the code should work. Because when I am trying in my local your code is working fine. Either you have permission issue or it could be treated as some malformed URI.wandermonk

2 Answers

0
votes

Is it the unix file system or HDFS you are writing to. I am able to execute with the above code. Are you able to see the result DataFrame. Try result.show and post the result here. Will check what is the issue. If possible please post the complete log

Try to save it in parquet and see if it’s working. If it’s working then there must be some issue with csv which we can check. It’s working for me. No issue with query tried in spark 2.2 and 1.6.3. try to write in your local unix file system. I am thinking it might be some issue with HDFS. Your code seems to be correct

0
votes

You can ignore this warning. It's a bug in Hadoop.

There is an issue for this: https://issues.apache.org/jira/browse/HDFS-10429