0
votes

We use Cassandra 3.5 with Spark 1.6.1 in 2-node cluster (8 cores and 16G memory per node).

There is the following Cassandra table

CREATE TABLE schema.trade (
symbol text,
date int,
trade_time timestamp,
reporting_venue text,
trade_id bigint,
ref_trade_id bigint,
action_type text,
price double,
quantity int,
condition_code text,
PRIMARY KEY ((symbol, date), trade_time, trade_id)
) WITH compaction = {'class' : 'DateTieredCompactionStrategy', 'base_time_seconds' : '3600', 'max_sstable_age_days' : '93'};

And I want to calculate percentage of volume: sum of all volume from trades in the relevant security during the time period groupped by exchange and time bar (1 or 5 minutes). I've created an example:

void getPercentageOfVolume(String symbol, Integer date, Timestamp timeFrom, Timestamp timeTill, Integer barWidth) {
    char MILLISECOND_TO_MINUTE_MULTIPLIER = 60_000;
    LOG.info("start");
    JavaPairRDD<Tuple2, Integer> counts = javaFunctions(sparkContext).cassandraTable("schema", "trade")
            .filter(row ->
                        row.getString("symbol").equals(symbol) && row.getInt("date").equals(date) &&
                        row.getDateTime("trade_time").getMillis() >= timeFrom.getTime() &&
                        row.getDateTime("trade_time").getMillis() < timeTill.getTime())
            .mapToPair(row ->
                new Tuple2<>(
                    new Tuple2(
                            new Timestamp(
                                    (row.getDateTime("trade_time").getMillis() / (barWidth * MILLISECOND_TO_MINUTE_MULTIPLIER)) * barWidth * MILLISECOND_TO_MINUTE_MULTIPLIER
                            ),
                            row.getString("reporting_venue")),
                    row.getInt("quantity")
                )
            ).reduceByKey((a, b) -> a + b);
    LOG.info(counts.collect().toString());
    LOG.info("finish");
}

[2016-06-15 09:25:27.014] [INFO ] [main] [EquityTCAAnalytics] start
[2016-06-15 09:25:28.000] [INFO ] [main] [NettyUtil] Found Netty's native epoll transport in the classpath, using it
[2016-06-15 09:25:28.518] [INFO ] [main] [Cluster] New Cassandra host /node1:9042 added
[2016-06-15 09:25:28.519] [INFO ] [main] [LocalNodeFirstLoadBalancingPolicy] Added host node1 (datacenter1)
[2016-06-15 09:25:28.519] [INFO ] [main] [Cluster] New Cassandra host /node2:9042 added
[2016-06-15 09:25:28.520] [INFO ] [main] [CassandraConnector] Connected to Cassandra cluster: Cassandra
[2016-06-15 09:25:29.115] [INFO ] [main] [SparkContext] Starting job: collect at EquityTCAAnalytics.java:88
[2016-06-15 09:25:29.385] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] Registering RDD 2 (mapToPair at EquityTCAAnalytics.java:78)
[2016-06-15 09:25:29.388] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] Got job 0 (collect at EquityTCAAnalytics.java:88) with 5 output partitions
[2016-06-15 09:25:29.388] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] Final stage: ResultStage 1 (collect at EquityTCAAnalytics.java:88)
[2016-06-15 09:25:29.389] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] Parents of final stage: List(ShuffleMapStage 0)
[2016-06-15 09:25:29.391] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] Missing parents: List(ShuffleMapStage 0)
[2016-06-15 09:25:29.400] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at mapToPair at EquityTCAAnalytics.java:78), which has no missing parents
[2016-06-15 09:25:29.594] [INFO ] [dag-scheduler-event-loop] [MemoryStore] Block broadcast_0 stored as values in memory (estimated size 10.8 KB, free 10.8 KB)
[2016-06-15 09:25:29.642] [INFO ] [dag-scheduler-event-loop] [MemoryStore] Block broadcast_0_piece0 stored as bytes in memory (estimated size 5.4 KB, free 16.3 KB)
[2016-06-15 09:25:29.647] [INFO ] [dispatcher-event-loop-7] [BlockManagerInfo] Added broadcast_0_piece0 in memory on node2:44871 (size: 5.4 KB, free: 2.4 GB)
[2016-06-15 09:25:29.650] [INFO ] [dag-scheduler-event-loop] [SparkContext] Created broadcast 0 from broadcast at DAGScheduler.scala:1006
[2016-06-15 09:25:29.658] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] Submitting 5 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at mapToPair at EquityTCAAnalytics.java:78)
[2016-06-15 09:25:29.661] [INFO ] [dag-scheduler-event-loop] [TaskSchedulerImpl] Adding task set 0.0 with 5 tasks
[2016-06-15 09:25:30.006] [INFO ] [dispatcher-event-loop-7] [SparkDeploySchedulerBackend] Registered executor NettyRpcEndpointRef(null) (node1:41122) with ID 0
[2016-06-15 09:25:30.040] [INFO ] [dispatcher-event-loop-7] [TaskSetManager] Starting task 0.0 in stage 0.0 (TID 0, node1, partition 0,NODE_LOCAL, 11725 bytes)
[2016-06-15 09:25:30.051] [INFO ] [dispatcher-event-loop-7] [TaskSetManager] Starting task 1.0 in stage 0.0 (TID 1, node1, partition 1,NODE_LOCAL, 11317 bytes)
[2016-06-15 09:25:30.054] [INFO ] [dispatcher-event-loop-7] [TaskSetManager] Starting task 2.0 in stage 0.0 (TID 2, node1, partition 2,NODE_LOCAL, 11929 bytes)
[2016-06-15 09:25:30.057] [INFO ] [dispatcher-event-loop-7] [TaskSetManager] Starting task 3.0 in stage 0.0 (TID 3, node1, partition 3,NODE_LOCAL, 11249 bytes)
[2016-06-15 09:25:30.059] [INFO ] [dispatcher-event-loop-7] [TaskSetManager] Starting task 4.0 in stage 0.0 (TID 4, node1, partition 4,NODE_LOCAL, 11560 bytes)
[2016-06-15 09:25:30.077] [INFO ] [dispatcher-event-loop-7] [SparkDeploySchedulerBackend] Registered executor NettyRpcEndpointRef(null) (CassandraCH4.ehubprod.local:33668) with ID 1
[2016-06-15 09:25:30.111] [INFO ] [dispatcher-event-loop-4] [BlockManagerMasterEndpoint] Registering block manager node1:36512 with 511.1 MB RAM, BlockManagerId(0, node1, 36512)
[2016-06-15 09:25:30.168] [INFO ] [dispatcher-event-loop-3] [BlockManagerMasterEndpoint] Registering block manager CassandraCH4.ehubprod.local:33610 with 511.1 MB RAM, BlockManagerId(1, CassandraCH4.ehubprod.local, 33610)
[2016-06-15 09:25:30.818] [INFO ] [dispatcher-event-loop-2] [BlockManagerInfo] Added broadcast_0_piece0 in memory on node1:36512 (size: 5.4 KB, free: 511.1 MB)
[2016-06-15 09:25:36.764] [INFO ] [pool-21-thread-1] [CassandraConnector] Disconnected from Cassandra cluster: Cassandra
[2016-06-15 09:25:48.914] [INFO ] [task-result-getter-0] [TaskSetManager] Finished task 4.0 in stage 0.0 (TID 4) in 18854 ms on node1 (1/5)
[2016-06-15 09:25:55.541] [INFO ] [task-result-getter-1] [TaskSetManager] Finished task 2.0 in stage 0.0 (TID 2) in 25489 ms on node1 (2/5)
[2016-06-15 09:25:57.837] [INFO ] [task-result-getter-2] [TaskSetManager] Finished task 1.0 in stage 0.0 (TID 1) in 27795 ms on node1 (3/5)
[2016-06-15 09:25:57.931] [INFO ] [task-result-getter-3] [TaskSetManager] Finished task 0.0 in stage 0.0 (TID 0) in 27919 ms on node1 (4/5)
[2016-06-15 09:26:01.357] [INFO ] [task-result-getter-0] [TaskSetManager] Finished task 3.0 in stage 0.0 (TID 3) in 31302 ms on node1 (5/5)
[2016-06-15 09:26:01.358] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] ShuffleMapStage 0 (mapToPair at EquityTCAAnalytics.java:78) finished in 31.602 s
[2016-06-15 09:26:01.360] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] looking for newly runnable stages
[2016-06-15 09:26:01.360] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] running: Set()
[2016-06-15 09:26:01.360] [INFO ] [task-result-getter-0] [TaskSchedulerImpl] Removed TaskSet 0.0, whose tasks have all completed, from pool
[2016-06-15 09:26:01.362] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] waiting: Set(ResultStage 1)
[2016-06-15 09:26:01.362] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] failed: Set()
[2016-06-15 09:26:01.365] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] Submitting ResultStage 1 (ShuffledRDD[3] at reduceByKey at EquityTCAAnalytics.java:87), which has no missing parents
[2016-06-15 09:26:01.373] [INFO ] [dag-scheduler-event-loop] [MemoryStore] Block broadcast_1 stored as values in memory (estimated size 3.6 KB, free 19.9 KB)
[2016-06-15 09:26:01.382] [INFO ] [dag-scheduler-event-loop] [MemoryStore] Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.1 KB, free 21.9 KB)
[2016-06-15 09:26:01.383] [INFO ] [dispatcher-event-loop-1] [BlockManagerInfo] Added broadcast_1_piece0 in memory on node2:44871 (size: 2.1 KB, free: 2.4 GB)
[2016-06-15 09:26:01.384] [INFO ] [dag-scheduler-event-loop] [SparkContext] Created broadcast 1 from broadcast at DAGScheduler.scala:1006
[2016-06-15 09:26:01.385] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] Submitting 5 missing tasks from ResultStage 1 (ShuffledRDD[3] at reduceByKey at EquityTCAAnalytics.java:87)
[2016-06-15 09:26:01.386] [INFO ] [dag-scheduler-event-loop] [TaskSchedulerImpl] Adding task set 1.0 with 5 tasks
[2016-06-15 09:26:01.390] [INFO ] [dispatcher-event-loop-4] [TaskSetManager] Starting task 0.0 in stage 1.0 (TID 5, node1, partition 0,NODE_LOCAL, 2786 bytes)
[2016-06-15 09:26:01.390] [INFO ] [dispatcher-event-loop-4] [TaskSetManager] Starting task 1.0 in stage 1.0 (TID 6, node1, partition 1,NODE_LOCAL, 2786 bytes)
[2016-06-15 09:26:01.397] [INFO ] [dispatcher-event-loop-4] [TaskSetManager] Starting task 2.0 in stage 1.0 (TID 7, node1, partition 2,NODE_LOCAL, 2786 bytes)
[2016-06-15 09:26:01.398] [INFO ] [dispatcher-event-loop-4] [TaskSetManager] Starting task 3.0 in stage 1.0 (TID 8, node1, partition 3,NODE_LOCAL, 2786 bytes)
[2016-06-15 09:26:01.406] [INFO ] [dispatcher-event-loop-4] [TaskSetManager] Starting task 4.0 in stage 1.0 (TID 9, node1, partition 4,NODE_LOCAL, 2786 bytes)
[2016-06-15 09:26:01.429] [INFO ] [dispatcher-event-loop-4] [BlockManagerInfo] Added broadcast_1_piece0 in memory on node1:36512 (size: 2.1 KB, free: 511.1 MB)
[2016-06-15 09:26:01.452] [INFO ] [dispatcher-event-loop-6] [MapOutputTrackerMasterEndpoint] Asked to send map output locations for shuffle 0 to node1:41122
[2016-06-15 09:26:01.456] [INFO ] [dispatcher-event-loop-6] [MapOutputTrackerMaster] Size of output statuses for shuffle 0 is 161 bytes
[2016-06-15 09:26:01.526] [INFO ] [task-result-getter-1] [TaskSetManager] Finished task 4.0 in stage 1.0 (TID 9) in 128 ms on node1 (1/5)
[2016-06-15 09:26:01.575] [INFO ] [task-result-getter-3] [TaskSetManager] Finished task 2.0 in stage 1.0 (TID 7) in 184 ms on node1 (2/5)
[2016-06-15 09:26:01.580] [INFO ] [task-result-getter-2] [TaskSetManager] Finished task 0.0 in stage 1.0 (TID 5) in 193 ms on node1 (3/5)
[2016-06-15 09:26:01.589] [INFO ] [task-result-getter-3] [TaskSetManager] Finished task 1.0 in stage 1.0 (TID 6) in 199 ms on node1 (4/5)
[2016-06-15 09:26:01.599] [INFO ] [task-result-getter-2] [TaskSetManager] Finished task 3.0 in stage 1.0 (TID 8) in 200 ms on node1 (5/5)
[2016-06-15 09:26:01.599] [INFO ] [task-result-getter-2] [TaskSchedulerImpl] Removed TaskSet 1.0, whose tasks have all completed, from pool
[2016-06-15 09:26:01.599] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] ResultStage 1 (collect at EquityTCAAnalytics.java:88) finished in 0.202 s
[2016-06-15 09:26:01.612] [INFO ] [main] [DAGScheduler] Job 0 finished: collect at EquityTCAAnalytics.java:88, took 32.496470 s
[2016-06-15 09:26:01.634] [INFO ] [main] [EquityTCAAnalytics] [((2016-06-10 13:45:00.0,DA),6944), ((2016-06-10 14:25:00.0,B),5241), ..., ((2016-06-10 10:55:00.0,QD),109080), ((2016-06-10 14:55:00.0,A),1300)]
[2016-06-15 09:26:01.641] [INFO ] [main] [EquityTCAAnalytics] finish

Is 32.5 s normal?

1
Being normal is relative to the task you are trying to achieve and the amount of data you are dealing with. Also possible bottlenecks network IO. So actually your question isn't answerable with the given informations. - eliasah

1 Answers

0
votes

I'd say %% of CPU and/or memory usage would be a starting point. It your cores are underutilised that can mean your process is not parallel enough. Memory-size lots depend on your data, but generally it's to use more of RAM than go back to IO.