2
votes

I'm trying to run some basic data exploration using Spark on a Hive table (hosted on CFS via DataStax 4.6). My dataset is about 3.1GO and I run the spark-shell with dse spark --executor-memory 16g (Yes I do have 16g available on my executors). So basically I would write into the spark-shell, the following:

val dataset = hc.sql("SELECT * FROM my_hive_table") ;
val data_sample = dataset.sample(false,.01,0) ;
data_sample.cache

and then I would try a count to actually cache something

data_sample.count

but when I check on the spark-shell web UI I see no RDD persisted and if I try a count again my whole dataset is read again from CFS.

So I tried accessing my dataset though CFS directly as a textfile as such

textFile.type = cfs:/user/hive/warehouse/my_hive_table/aaaammjj=20150526

and adapt the previous code to count the number of line after caching the RDD and this time the RDD is indeed cached using 7 GB across two workers ! From the web UI :

cfs:/user/hive/warehouse/my_hive_table/aaaammjj=20150526 Memory Deserialized 1x Replicated

Is there any reason why my schemaRDD is not cached using Hive ? That would be much pratical since schemaRDD provide ... well the schema.

Thx for any help.

2
set a name for your cache and check the UI again!eliasah
Do you mean something like val rdd_in_cache = data_sample.cache ? I tried this also with no success.Manu
I mean data_sample.cache.setName("Data Sample")eliasah
No success either ... with .cache and then .cache.setName("") or .cache.setName("") alone.Manu
please add a screenshot of your Spark Web UI storage tab!eliasah

2 Answers

1
votes

So based on my discussion with eliasah, I could eventually cache the table somehow with :

val dataset = hc.sql("SELECT * FROM autori_daily_import")
dataset.registerTempTable("data")
hc.cacheTable("data")
hc.sql("select count(*) from data")
res22: Array[org.apache.spark.sql.Row] = Array([6409331])
hc.sql("select sens,count(*) from data group by sens").collect().foreach(println)
[A,3672249]
[E,2737082]

And there is indeed a RDD in cache with name "RDD Storage Info for HiveTableScan ..."

What is a litle fuzzy to me is why I need to register a temporary table when I have a schemaRDD for which I have a .cache() method. If I run queries against the schemaRDD (with .select('sens).countByValue() ) then Spark scan the Hive table again and don't use the temporary in-memory table.

0
votes

According to the official documentation of Spark 1.2, Spark SQL can cache tables using an in-memory columnar format by calling sqlContext.cacheTable("tableName").

Then Spark SQL will scan only required columns and will automatically tune compression to minimize memory usage and GC pressure. You can call sqlContext.uncacheTable("tableName") to remove the table from memory.

Note that if you call schemaRDD.cache() rather than sqlContext.cacheTable(...), tables will not be cached using the in-memory columnar format, and therefore sqlContext.cacheTable(...) is strongly recommended for this use case.

Configuration of in-memory caching can be done using the setConf method on SQLContext or by running SET key=value commands using SQL.

So actually you'll need to cache your data_sample RDD using sqlContext.cacheTable("the name of the table you gave for your table")