I have a query that I ran in thrift that takes a very long time. I run it on a single partition of a table which has 500k rows.
the query looks like this:
select col0 from <table> where partition=<partition> and <col1>=<val>
I made it so col1 != val
, so the query returns 0 rows.
This query takes about a 30 seconds (a minute if I use select *
).
When I run the exact same query but with select count(col0)
it takes 2 seconds.
What could cause queries to take a long time with select col
but not with select count(col)
?
Here's the queries explained
explain select col0 from table where `partition` = partition and col=val;
*Project [col0#607]
+- *Filter (isnotnull(col1#607) && (col1#607 = aaaa))
+- *FileScan parquet
table[col1#607,partition#611]
Batched: true,
Format: Parquet,
Location: PrunedInMemoryFileIndex[...,
PartitionCount: 23,
PartitionFilters: [isnotnull(partition#611),
(cast(partition#611 as int) = partition_name)],
PushedFilters: [IsNotNull(col1),
EqualTo(col1,aaaa)],
ReadSchema: struct
explain select count(col0) from table where `partition` = partition and col=val;
*HashAggregate(keys=[], functions=[count(col0#625)])
+- Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_count(col0#625)])
+- *Project [col0#625]
+- *Filter (isnotnull(col1#625) && (col1#625 = aaaa))
+- *FileScan parquet
table[col1#625,partition#629] Batched: true,
Format: Parquet,
Location: PrunedInMemoryFileIndex[...,
PartitionCount: 23,
PartitionFilters: [isnotnull(partition#629),
(cast(partition#629 as int) = partition_name)],
PushedFilters: [IsNotNull(col1),
EqualTo(col1,aaaa)],
ReadSchema: struct
As far as I can tell, the process is exactly the same, only the count
query has more steps. So how come it's 15x faster?
Edit:
I found this interesting nugget in the logs:
with count:
18/06/28 11:42:55 INFO TaskSetManager: Starting task 0.0 in stage 2509.0 (TID 8092, ip-123456, executor 36, partition 0, RACK_LOCAL, 5521 bytes)
18/06/28 11:42:55 INFO TaskSetManager: Starting task 1.0 in stage 2509.0 (TID 8093, ip-123456, executor 35, partition 1, RACK_LOCAL, 5521 bytes)
18/06/28 11:42:55 INFO TaskSetManager: Starting task 2.0 in stage 2509.0 (TID 8094, ip-123456, executor 36, partition 2, RACK_LOCAL, 5521 bytes)
18/06/28 11:42:55 INFO TaskSetManager: Starting task 3.0 in stage 2509.0 (TID 8095, ip-123456, executor 35, partition 3, RACK_LOCAL, 5521 bytes)
18/06/28 11:42:55 INFO TaskSetManager: Starting task 4.0 in stage 2509.0 (TID 8096, ip-123456, executor 36, partition 4, RACK_LOCAL, 5521 bytes)
18/06/28 11:42:55 INFO TaskSetManager: Starting task 5.0 in stage 2509.0 (TID 8097, ip-123456, executor 35, partition 5, RACK_LOCAL, 5521 bytes)
18/06/28 11:42:55 INFO TaskSetManager: Starting task 6.0 in stage 2509.0 (TID 8098, ip-123456, executor 36, partition 6, RACK_LOCAL, 5521 bytes)
18/06/28 11:42:55 INFO TaskSetManager: Starting task 7.0 in stage 2509.0 (TID 8099, ip-123456, executor 35, partition 7, RACK_LOCAL, 5521 bytes)
18/06/28 11:42:55 INFO TaskSetManager: Starting task 8.0 in stage 2509.0 (TID 8100, ip-123456, executor 36, partition 8, RACK_LOCAL, 5521 bytes)
18/06/28 11:42:55 INFO TaskSetManager: Starting task 9.0 in stage 2509.0 (TID 8101, ip-123456, executor 35, partition 9, RACK_LOCAL, 5521 bytes)
- without: *
18/06/28 11:45:32 INFO TaskSetManager: Starting task 0.0 in stage 2512.0 (TID 8136, ip-10-117-49-97.eu-west-1.compute.internal, executor 37, partition 1, RACK_LOCAL, 5532 bytes)
18/06/28 11:45:32 INFO BlockManagerInfo: Added broadcast_2352_piece0 in memory on ip-10-117-49-97.eu-west-1.compute.internal:40489 (size: 12.6 KB, free: 11.6 GB)
18/06/28 11:45:32 INFO TaskSetManager: Finished task 0.0 in stage 2512.0 (TID 8136) in 667 ms on ip-10-117-49-97.eu-west-1.compute.internal (executor 37) (1/1)
18/06/28 11:45:32 INFO YarnScheduler: Removed TaskSet 2512.0, whose tasks have all completed, from pool
18/06/28 11:45:32 INFO DAGScheduler: ResultStage 2512 (getNextRowSet at OperationManager.java:220) finished in 0.668 s
18/06/28 11:45:32 INFO DAGScheduler: Job 2293 finished: getNextRowSet at OperationManager.java:220, took 0.671740 s
18/06/28 11:45:32 INFO SparkContext: Starting job: getNextRowSet at OperationManager.java:220
18/06/28 11:45:32 INFO DAGScheduler: Got job 2294 (getNextRowSet at OperationManager.java:220) with 1 output partitions
18/06/28 11:45:32 INFO DAGScheduler: Final stage: ResultStage 2513 (getNextRowSet at OperationManager.java:220)
18/06/28 11:45:32 INFO DAGScheduler: Parents of final stage: List()
18/06/28 11:45:32 INFO DAGScheduler: Missing parents: List()
18/06/28 11:45:32 INFO DAGScheduler: Submitting ResultStage 2513 (MapPartitionsRDD[312] at run at AccessController.java:0), which has no missing parents
18/06/28 11:45:32 INFO MemoryStore: Block broadcast_2353 stored as values in memory (estimated size 66.6 KB, free 12.1 GB)
18/06/28 11:45:32 INFO MemoryStore: Block broadcast_2353_piece0 stored as bytes in memory (estimated size 12.6 KB, free 12.1 GB)
18/06/28 11:45:32 INFO BlockManagerInfo: Added broadcast_2353_piece0 in memory on 10.117.48.68:41493 (size: 12.6 KB, free: 12.1 GB)
18/06/28 11:45:32 INFO SparkContext: Created broadcast 2353 from broadcast at DAGScheduler.scala:1047
18/06/28 11:45:32 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2513 (MapPartitionsRDD[312] at run at AccessController.java:0) (first 15 tasks are for partitions
Vector(2)) 18/06/28 11:45:32 INFO YarnScheduler: Adding task set 2513.0 with 1 tasks
18/06/28 11:45:32 INFO TaskSetManager: Starting task 0.0 in stage 2513.0 (TID 8137, ip-10-117-49-97.eu-west-1.compute.internal, executor 37, partition 2, RACK_LOCAL, 5532 bytes)
18/06/28 11:45:33 INFO BlockManagerInfo: Added broadcast_2353_piece0 in memory on ip-10-117-49-97.eu-west-1.compute.internal:40489 (size: 12.6 KB, free: 11.6 GB)
18/06/28 11:45:38 INFO TaskSetManager: Finished task 0.0 in stage 2513.0 (TID 8137) in 5238 ms on ip-10-117-49-97.eu-west-1.compute.internal (executor 37) (1/1)
18/06/28 11:45:38 INFO YarnScheduler: Removed TaskSet 2513.0, whose tasks have all completed, from pool
18/06/28 11:45:38 INFO DAGScheduler: ResultStage 2513 (getNextRowSet at OperationManager.java:220) finished in 5.238 s
18/06/28 11:45:38 INFO DAGScheduler: Job 2294 finished: getNextRowSet at OperationManager.java:220, took 5.242084 s
18/06/28 11:45:38 INFO SparkContext: Starting job: getNextRowSet at OperationManager.java:220
18/06/28 11:45:38 INFO DAGScheduler: Got job 2295 (getNextRowSet at OperationManager.java:220) with 1 output partitions
18/06/28 11:45:38 INFO DAGScheduler: Final stage: ResultStage 2514 (getNextRowSet at OperationManager.java:220)
18/06/28 11:45:38 INFO DAGScheduler: Parents of final stage: List()
18/06/28 11:45:38 INFO DAGScheduler: Missing parents: List()
18/06/28 11:45:38 INFO DAGScheduler: Submitting ResultStage 2514 (MapPartitionsRDD[312] at run at AccessController.java:0), which has no missing parents
18/06/28 11:45:38 INFO MemoryStore: Block broadcast_2354 stored as values in memory (estimated size 66.6 KB, free 12.1 GB)
18/06/28 11:45:38 INFO MemoryStore: Block broadcast_2354_piece0 stored as bytes in memory (estimated size 12.6 KB, free 12.1 GB)
18/06/28 11:45:38 INFO BlockManagerInfo: Added broadcast_2354_piece0 in memory on 10.117.48.68:41493 (size: 12.6 KB, free: 12.1 GB)
18/06/28 11:45:38 INFO SparkContext: Created broadcast 2354 from broadcast at DAGScheduler.scala:1047
18/06/28 11:45:38 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2514 (MapPartitionsRDD[312] at run at AccessController.java:0) (first 15 tasks are for partitions Vector(3))
(i.e. it repeats this block, looks like it runs tasks sequentially ant not in parallel like in the count case)
I also tried doing "order by" and it actually made the query run 2x faster
Running the same query on the same data using spark instead of thrift was much faster.
I run thrift on aws emr-5.11.1
Hive 2.3.2
Spark 2.2.1
thrift 0.11.0