1
votes

This problem is easier to explain inline with the code:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

// I have a file (fileToProcess) in HDFS that contains the name of another HDFS file:
val fs = FileSystem.get(new Configuration())
val fileToProcess = "hdfs://production/user/robin/fileToProcess"
// The contents of fileToProcess is just the name of another file.  In this case
// hdfs://production/user/robin/people
// I want to read hdfs://production/user/robin/people and use it in the data frame as coded below.
// However, if I do this by reading the HDFS file (fileToProcess) to get the file name like this:
val file = fs.open(new Path(fileToProcess)).readLine().toString

// I will get a Task not serializable exception on the last line of the script

// If I hardcode the file name like this:
val file2 = "hdfs://production/user/robin/people"
// It works great; however, I can't do this as I don't know the file I need to read in reality
// file2 and file are both Strings seem equal in every way so I am really perplexed!

// Here is what I am doing with the file to get the exception

// The contents of people:
// { "person" : "John"}
// { "person" : "Sue"}
// { "person" : "Jean"}
// { "person" : "Jane"}
// { "person" : "John"}

val df = sqlcontext.read.json(file)
val peopleList = df.map(r => r(0).toString).distinct.collect
val anotherList = sc.parallelize(Array("Jean", "Sue", "Bill"))

val peopleListBroadcast = sc.broadcast(peopleList)

// everything works great up to this point

val filteredPeople = anotherList.filter(x=> peopleListBroadcast.value contains x)
// here I get a Task not serializable exception if I use the file name read from the HDFS file but it works fine if I hardcode it (like with file2)

I have been stuck on this stage problem for days now. I can't seem to find a work around either. Are there differences I can't see in the string? How can two strings that are equal behave so differently. Please help me as I am going nuts trying to figure this out!

The specific exception I am getting is:

Caused by: java.io.NotSerializableException: org.apache.hadoop.hdfs.DistributedFileSystem Serialization stack: - object not serializable (class: org.apache.hadoop.hdfs.DistributedFileSystem, value: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1011603383_1, ugi=robin (auth:SIMPLE)]]) - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: fs, type: class org.apache.hadoop.fs.FileSystem)

Btw, I am using Spark 1.6.1 and Scala 2.10.5. Anyone should be able to recreate this I think by making the two files in hdfs and then putting the code above in spark-shell

Thanks, Robin

2

2 Answers

1
votes

It has nothing to do with strings. You put in the scope an instance of org.apache.hadoop.fs.FileSystem which is not Serializable. Marking it as transient should resolve this particular issue:

@transient val fs = FileSystem.get(new Configuration())
0
votes

Another option is

fs = sc.textFile(fileToProcess,1).collect()

this will give you the file name