My Spark job consists of 3 workers, co-located with the data they need to read. I submit a RDD with some metadata and the job tasks turn that metadata in into real data. For instance the metadata could contain a file to read from the local worker filesystem and the first stage of the spark job would be to read that file into a RDD partition.
In my environment the data may not be present on all 3 workers and it is way too expensive to read across workers (i.e. if the data is on worker1 then worker2 can not reach out and fetch it). For this reason I have to force partitions onto the appropriate worker for the data they are reading. I have a mechanism for achieving this where I check the worker against the expected worker in the metadata and fail the task with a descriptive error message if they don't match. Using blacklisting I can ensure that the task is rescheduled on a different node until the right one is found. This works fine but as an optimization I wanted to use preferredLocations to help the tasks get assigned to the right workers initially without having to go through the try/reschedule process.
Is use makeRDD to create my initial RDD (of metadata) with the correct preferredLocations as per the answer here: How to control preferred locations of RDD partitions?, however it's not exhibiting the behaviour I expect. The code to makeRDD is below:
sc.makeRDD(taskAssigments)
where taskAssignments takes the form:
val taskAssignments = mutable.ArrayBuffer[(String, Seq[String])]()
metadataMappings.foreach { case(k , v) => {
taskAssignments += (k + ":" + v.mkString(",") -> Seq(idHostnameMappings(k)))
}}
idHostMappings is just a map of id -> hostName and I've verified that it contains the correct information.
Given that my test Spark cluster is completely clean with no other jobs running on it and there is no skew in the input RDD (it has 3 partitions to match the 3 workers) I would have expected the tasks to be assigned to their preferredLocations. Instead I still the error messages indicating that tasks are going through the fail/reschedule process.
Is my assumption that tasks would be scheduled at their preferredLocations on a clean cluster correct and is there anything further I can do to force this?
Follow up:
I was also able to create a much simpler test case. My 3 spark workers are named worker1,worker2 and worker3 and I run the following:
import scala.collection.mutable
val someData = mutable.ArrayBuffer[(String, Seq[String])]()
someData += ("1" -> Seq("worker1"))
someData += ("2" -> Seq("worker2"))
someData += ("3" -> Seq("worker3"))
val someRdd = sc.makeRDD(someData)
someRdd.map(i=>i + ":" + java.net.InetAddress.getLocalHost().getHostName()).collect().foreach(println)
I'd expect to see 1:worker1 etc but in fact see
1:worker3
2:worker1
3:worker2
can anyone explain this behaviour?
makeRDD()? - mazaneicha