1
votes

I have a spark-1.5.1 for HADOOP 2.6 running in stand alone mode on my local machine. I am trying to run a hive query from a sample java application, pointing spark.master to (spark://impetus-i0248u:7077) spark master running on my local machine. Here is the piece of java code:

 SparkConf sparkconf = new SparkConf().set("spark.master", "spark://impetus-i0248u:7077").set("spark.app.name", "sparkhivesqltest")
        .set("spark.cores.max", "2").set("spark.executor.memory", "2g").set("worker_max_heapsize","2g").set("spark.driver.memory", "2g");

 SparkContext sc = new SparkContext(sparkconf);

HiveContext sqlContext = new HiveContext(sc);
DataFrame jdbcDF = sqlContext.sql("select * from bm.rutest");

List<Row> employeeFullNameRows = jdbcDF.collectAsList();

HiveContext is getting initialized properly as it is able to establish connection with hive metastore. I am getting exception at jdbcDF.collectAsList();

Here is the error coming when spark tries to submit the job:

Submitting 15/12/10 20:00:42 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at collectAsList at HiveJdbcTest.java:30) 15/12/10 20:00:42 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 15/12/10 20:00:42 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 172.26.52.54, ANY, 2181 bytes) 15/12/10 20:00:42 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 172.26.52.54, ANY, 2181 bytes)

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "sparkDriver-akka.remote.default-remote-dispatcher-5" Exception in thread "shuffle-server-1" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "shuffle-server-1" Exception in thread "threadDeathWatcher-2-1" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "threadDeathWatcher-2-1"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "sparkDriver-akka.remote.default-remote-dispatcher-6" Exception in thread "qtp1003369013-56" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "qtp1003369013-56"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "sparkDriver-akka.remote.default-remote-dispatcher-21"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "sparkDriver-akka.actor.default-dispatcher-17"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "sparkDriver-akka.remote.default-remote-dispatcher-23" Exception in thread "shuffle-server-2" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "shuffle-server-2"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "sparkDriver-akka.actor.default-dispatcher-2"

Below is the configuration added in spark-env.sh

SPARK_EXECUTOR_CORES=2
SPARK_EXECUTOR_MEMORY=3G
SPARK_WORKER_CORES=2
SPARK_WORKER_MEMORY=2G
SPARK_EXECUTOR_INSTANCES=2
SPARK_WORKER_INSTANCES=1

If I set, spark.master to local[*], it works fine but when I point it to master running on my machine, I get this above mentioned exception. If I try connecting to mysql db, with the same configuration, it works fine.

PS: The table has only single row.

Please help..!

1
The collect() method gathers all the data from all the workers and stores it on the driver, using the driver memory. If you're running on a large dataset, it is only logical that you will get an OOM. Consider weather collect() is the action you really looking for. - Avihoo Mamka
I want all set of records for the given query, and in this case my table is having a single row. - Reena Upadhyay
Why do you want it back to your driver? for what cause? - Avihoo Mamka
Able to solve this OOM error using this piece of code instead of using collectAsList(). Iterator<Row> iter = jdbcDF.javaRDD().toLocalIterator(); while (iter.hasNext()){ Row row = iter.next(); } Thanks :) - Reena Upadhyay
But I still did not get, why collectAsList() was throwing OOM error for a single record. Even I configured driver memory 2GB still faced the same error. Can you please elaborate on it. I am new to Spark.. - Reena Upadhyay

1 Answers

2
votes

Here is the explanations to the concepts in your question: -

  1. local[*] = The execution is multi threaded and not distributed. Good for development when jobs are tested on Single Machine. It works in your case because data is not Shuffled or moved from Executors to driver...All is in one Single JVM and local.
  2. collectAsList - this method will collect all data from Executors on Driver Node, which causes Shuffling and Shuffling is a memory intensive process as it requires serialization, network and Disk IO.
  3. javaRDD().toLocalIterator() = Produce the same results as collect() but works on each partition sequentially and does not involve shuffling. Take care that we use this only order of partitions in RDDs and the order of items in partitions is well defined.

So considering above, as you are using local box, there is quite possibility that local(*) or collectAsList() may not give any OOM but collect() may produce memory exceptions.