This problem is easier to explain inline with the code:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
// I have a file (fileToProcess) in HDFS that contains the name of another HDFS file:
val fs = FileSystem.get(new Configuration())
val fileToProcess = "hdfs://production/user/robin/fileToProcess"
// The contents of fileToProcess is just the name of another file. In this case
// hdfs://production/user/robin/people
// I want to read hdfs://production/user/robin/people and use it in the data frame as coded below.
// However, if I do this by reading the HDFS file (fileToProcess) to get the file name like this:
val file = fs.open(new Path(fileToProcess)).readLine().toString
// I will get a Task not serializable exception on the last line of the script
// If I hardcode the file name like this:
val file2 = "hdfs://production/user/robin/people"
// It works great; however, I can't do this as I don't know the file I need to read in reality
// file2 and file are both Strings seem equal in every way so I am really perplexed!
// Here is what I am doing with the file to get the exception
// The contents of people:
// { "person" : "John"}
// { "person" : "Sue"}
// { "person" : "Jean"}
// { "person" : "Jane"}
// { "person" : "John"}
val df = sqlcontext.read.json(file)
val peopleList = df.map(r => r(0).toString).distinct.collect
val anotherList = sc.parallelize(Array("Jean", "Sue", "Bill"))
val peopleListBroadcast = sc.broadcast(peopleList)
// everything works great up to this point
val filteredPeople = anotherList.filter(x=> peopleListBroadcast.value contains x)
// here I get a Task not serializable exception if I use the file name read from the HDFS file but it works fine if I hardcode it (like with file2)
I have been stuck on this stage problem for days now. I can't seem to find a work around either. Are there differences I can't see in the string? How can two strings that are equal behave so differently. Please help me as I am going nuts trying to figure this out!
The specific exception I am getting is:
Caused by: java.io.NotSerializableException: org.apache.hadoop.hdfs.DistributedFileSystem Serialization stack: - object not serializable (class: org.apache.hadoop.hdfs.DistributedFileSystem, value: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1011603383_1, ugi=robin (auth:SIMPLE)]]) - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: fs, type: class org.apache.hadoop.fs.FileSystem)
Btw, I am using Spark 1.6.1 and Scala 2.10.5. Anyone should be able to recreate this I think by making the two files in hdfs and then putting the code above in spark-shell
Thanks, Robin