2
votes
Spark Version: 1.6.2.   

I registered a temporary table, whose data source is HDFS, and do query twice on it.

Then job failed with this error:

ERROR ApplicationMaster: User class threw exception:
java.io.IOException: Not a file: hdfs://my_server:8020/2017/01/01
java.io.IOException: Not a file: hdfs://my_server:8020/2017/01/01 at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:322) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:240)

Tricky part is that job succeeded if only one query is run.
Do I use Spark SQL in the wrong way, or is this intended?

This is what my code looks like:

val rdd = sc.textFile("hdfs://my_server:8020/2017/*/*/*")
val table = sqlc.read.json(rdd).cache()

table.registerTempTable("my_table")

sql("""
    | SELECT contentsId,
    |   SUM(CASE WHEN gender = 'M' then 1 else 0 end)
    | FROM my_table
    | GROUP BY contentsId
  """.stripMargin)
  .write.format("com.databricks.spark.csv")
  .save("hdfs://my_server:8020/gender.csv")

sql("""
    | SELECT contentsId,
    |   SUM(CASE WHEN age > 0 AND age < 20 then 1 else 0 end),
    |   SUM(CASE WHEN age >= 20 AND age < 30 then 1 else 0 end)
    | FROM my_table
    | GROUP BY contentsId
  """.stripMargin)
  .write.format("com.databricks.spark.csv")
  .save("hdfs://my_server:8020/age.csv")

Thanks in advance!

1
why are you trying to save two different dataframes in a single output file as gender.csv?koiralo
Error Indicates its not able to read file Not a file: hdfs://my_server:8020/2017/01/01 <--? is it a file or empty directory?Bhavesh
@ShankarKoirala My mistake. Saving them in two different files is what I need to do. I edited query.NaHeon
@Bhavesh hdfs://my_server:8020/2017/01/01 is a directory. Moreover, this exception only occurs when two queries are in a job.NaHeon

1 Answers

1
votes

I think you can try applying filter only for files something like this.

val filesRDD = rdd.filter{path => (new java.io.File(path).isFile)}

This will remove all the Directories that are contain in RDD And also for second time saving the DataFrame use this

sql("""
    | SELECT contentsId,
    |   SUM(CASE WHEN age > 0 AND age < 20 then 1 else 0 end),
    |   SUM(CASE WHEN age >= 20 AND age < 30 then 1 else 0 end)
    | FROM my_table
    | GROUP BY contentsId
  """.stripMargin)
  .write.format("com.databricks.spark.csv")
  .mode("append")
  .save("hdfs://my_server:8020/gender.csv")

If storing values are same Or try storing Second DataFrame into some different file