0
votes

Ahoi almighty devs. I'm running some basic analytics in Spark where I'm querying multi node Cassandra. The code I'm running and where I'm dealing with some unlogic is:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext
import com.datastax.spark.connector._


object cassSpark {
  def main(args: Array[String]): Unit = {

val conf = new SparkConf()
  .set("spark.cassandra.connection.host","192.168.56.101")
  .set("spark.cassandra.connection.host","192.168.56.102")
  .set("spark.cassandra.connection.local_dc", "datacenter1")
  .setMaster("local[*]")
  .setAppName("cassSpark")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)



time{
val df = sqlContext
 .read
 .format("org.apache.spark.sql.cassandra")
 .options(Map( "table" -> "nth1", "keyspace"->"nth", "cluster" -> "Test Cluster"))
 .load().cache()

df.count()


}


def time[A](f: => A) = {
  val s = System.nanoTime
  val ret = f
  println("time: " + (System.nanoTime - s) / 1e6 + "ms")
  ret
}
 }
}

So, the version of Spark is 1.6.0, Cassandra v3.0.10, connector is also 1.6.0. Keyspace has replication_factor: 1, table has 5 columns and acutally just 1 row in it. There 2 nodes ( virtual macines made in OracleVM) as you can see.

My problem is that when I'm measuring the time of query from spark to cassandra I got as result about 20seconds which is not normal to me beacuse there is just one row in table. Am I missing something, or I'm measuring something wrong, or maybe is up to someting in mine code. Can someone help me, or show me the way how efficiently do this or process it.

[EDIT]

As @Artem Aliev wanted, this is whole info log:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/12/06 12:31:04 INFO SparkContext: Running Spark version 1.6.0
16/12/06 12:31:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/12/06 12:31:07 INFO SecurityManager: Changing view acls to: superbrainbug
16/12/06 12:31:07 INFO SecurityManager: Changing modify acls to: Ivan Majnaric
16/12/06 12:31:07 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(superbrainbug); users with modify permissions: Set(superbrainbug)
16/12/06 12:31:12 INFO Utils: Successfully started service 'sparkDriver' on port 62101.
16/12/06 12:31:14 INFO Slf4jLogger: Slf4jLogger started
16/12/06 12:31:14 INFO Remoting: Starting remoting
16/12/06 12:31:15 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:62114]
16/12/06 12:31:15 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 62114.
16/12/06 12:31:15 INFO SparkEnv: Registering MapOutputTracker
16/12/06 12:31:15 INFO SparkEnv: Registering BlockManagerMaster
16/12/06 12:31:15 INFO DiskBlockManager: Created local directory at C:\Users\superbrainbug\AppData\Local\Temp\blockmgr-8b664e71-ead1-4462-b171-bf542a5eb444
16/12/06 12:31:15 INFO MemoryStore: MemoryStore started with capacity 1124.6 MB
16/12/06 12:31:18 INFO SparkEnv: Registering OutputCommitCoordinator
16/12/06 12:31:20 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/12/06 12:31:20 INFO SparkUI: Started SparkUI at http://192.168.56.1:4040
16/12/06 12:31:20 INFO Executor: Starting executor ID driver on host localhost
16/12/06 12:31:20 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 62136.
16/12/06 12:31:20 INFO NettyBlockTransferService: Server created on 62136
16/12/06 12:31:20 INFO BlockManagerMaster: Trying to register BlockManager
16/12/06 12:31:21 INFO BlockManagerMasterEndpoint: Registering block manager localhost:62136 with 1124.6 MB RAM, BlockManagerId(driver, localhost, 62136)
16/12/06 12:31:21 INFO BlockManagerMaster: Registered BlockManager
16/12/06 12:31:23 WARN NettyUtil: Found Netty's native epoll transport, but not running on linux-based operating system. Using NIO instead.
16/12/06 12:31:24 INFO Cluster: New Cassandra host /192.168.56.101:9042 added
16/12/06 12:31:24 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
16/12/06 12:31:27 INFO CassandraSourceRelation: Input Predicates: []
16/12/06 12:31:27 INFO CassandraSourceRelation: Input Predicates: []
16/12/06 12:31:31 INFO SparkContext: Starting job: count at cassSpark.scala:69
16/12/06 12:31:31 INFO DAGScheduler: Registering RDD 7 (count at cassSpark.scala:69)
16/12/06 12:31:31 INFO DAGScheduler: Got job 0 (count at cassSpark.scala:69) with 1 output partitions
16/12/06 12:31:31 INFO DAGScheduler: Final stage: ResultStage 1 (count at cassSpark.scala:69)
16/12/06 12:31:31 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
16/12/06 12:31:31 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
16/12/06 12:31:31 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[7] at count at cassSpark.scala:69), which has no missing parents
16/12/06 12:31:35 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 20.0 KB, free 20.0 KB)
16/12/06 12:31:35 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 9.2 KB, free 29.2 KB)
16/12/06 12:31:35 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:62136 (size: 9.2 KB, free: 1124.6 MB)
16/12/06 12:31:35 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/12/06 12:31:35 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[7] at count at cassSpark.scala:69)
16/12/06 12:31:35 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/12/06 12:31:35 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,ANY, 20008 bytes)
16/12/06 12:31:35 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/12/06 12:31:35 INFO CacheManager: Partition rdd_4_0 not found, computing it
16/12/06 12:31:36 INFO GenerateUnsafeProjection: Code generated in 395.192925 ms
16/12/06 12:31:39 INFO MemoryStore: Block rdd_4_0 stored as values in memory (estimated size 1168.0 B, free 30.3 KB)
16/12/06 12:31:39 INFO BlockManagerInfo: Added rdd_4_0 in memory on localhost:62136 (size: 1168.0 B, free: 1124.6 MB)
16/12/06 12:31:39 INFO GeneratePredicate: Code generated in 35.193967 ms
16/12/06 12:31:39 INFO GenerateColumnAccessor: Code generated in 52.555004 ms
16/12/06 12:31:39 INFO GenerateMutableProjection: Code generated in 14.129896 ms
16/12/06 12:31:39 INFO GenerateUnsafeProjection: Code generated in 14.130749 ms
16/12/06 12:31:40 INFO GenerateMutableProjection: Code generated in 19.217034 ms
16/12/06 12:31:40 INFO GenerateUnsafeRowJoiner: Code generated in 72.736302 ms
16/12/06 12:31:40 INFO GenerateUnsafeProjection: Code generated in 133.407346 ms
16/12/06 12:31:41 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 3895 bytes result sent to driver
16/12/06 12:31:41 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 5452 ms on localhost (1/1)
16/12/06 12:31:41 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/12/06 12:31:41 INFO DAGScheduler: ShuffleMapStage 0 (count at cassSpark.scala:69) finished in 5,542 s
16/12/06 12:31:41 INFO DAGScheduler: looking for newly runnable stages
16/12/06 12:31:41 INFO DAGScheduler: running: Set()
16/12/06 12:31:41 INFO DAGScheduler: waiting: Set(ResultStage 1)
16/12/06 12:31:41 INFO DAGScheduler: failed: Set()
16/12/06 12:31:41 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[10] at count at cassSpark.scala:69), which has no missing parents
16/12/06 12:31:41 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 9.3 KB, free 39.6 KB)
16/12/06 12:31:41 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.6 KB, free 44.1 KB)
16/12/06 12:31:41 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:62136 (size: 4.6 KB, free: 1124.6 MB)
16/12/06 12:31:41 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/12/06 12:31:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[10] at count at cassSpark.scala:69)
16/12/06 12:31:41 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/12/06 12:31:41 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,NODE_LOCAL, 1999 bytes)
16/12/06 12:31:41 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
16/12/06 12:31:41 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/12/06 12:31:41 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 15 ms
16/12/06 12:31:41 INFO GenerateMutableProjection: Code generated in 92.969655 ms
16/12/06 12:31:41 INFO GenerateMutableProjection: Code generated in 11.48414 ms
16/12/06 12:31:41 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1830 bytes result sent to driver
16/12/06 12:31:41 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 602 ms on localhost (1/1)
16/12/06 12:31:41 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/12/06 12:31:41 INFO DAGScheduler: ResultStage 1 (count at cassSpark.scala:69) finished in 0,605 s
16/12/06 12:31:41 INFO DAGScheduler: Job 0 finished: count at cassSpark.scala:69, took 10,592173 s
time: 19242.858679ms
16/12/06 12:31:48 INFO CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
16/12/06 12:31:48 INFO SparkContext: Invoking stop() from shutdown hook
16/12/06 12:31:48 INFO SerialShutdownHooks: Successfully executed shutdown hook: Clearing session cache for C* connector
16/12/06 12:31:48 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040
16/12/06 12:31:48 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/12/06 12:31:48 INFO MemoryStore: MemoryStore cleared
16/12/06 12:31:48 INFO BlockManager: BlockManager stopped
16/12/06 12:31:48 INFO BlockManagerMaster: BlockManagerMaster stopped
16/12/06 12:31:48 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/12/06 12:31:48 INFO SparkContext: Successfully stopped SparkContext
16/12/06 12:31:48 INFO ShutdownHookManager: Shutdown hook called
16/12/06 12:31:48 INFO ShutdownHookManager: Deleting directory C:\Users\superbrainbug\AppData\Local\Temp\spark-17606f05-6bb8-4144-acb5-015f15ea1ea9

Process finished with exit code 0

Also, for @Yves DARMAILLAC if I remove .cache() runtime is 12s

2

2 Answers

0
votes

20 sec for you environment looks a little bit bigger then expected, it should be around 2 sec Could you enable INFO level spark logging and post result here

The main time consumers in your example are initializations, the second run will be much faster

  1. Spark is lazy framework, so a lot of initialization at the first query. You use "local" master, so it will be slower when you will run on spark cluster.

  2. Connector is optimized for bulk queries, so at it do a lot of preparations, to run the bulk query faster: Query C* cluster meta informations with table sizing and token ranges location, create split, establish C* connections.

0
votes

Why are you persisting the load() stream ? This is not necessary as you do not reuse it. Is timing still the same wihtout caching ?

Try this :

time{
   val df = sqlContext
       .read
       .format("org.apache.spark.sql.cassandra")
       .options(Map( "table" -> "nth1", "keyspace"->"nth", "cluster" -> Test Cluster"))
       .load()

    df.count()
}