0
votes

I have a query that I ran in thrift that takes a very long time. I run it on a single partition of a table which has 500k rows.

the query looks like this:

select col0 from <table> where partition=<partition> and <col1>=<val>

I made it so col1 != val, so the query returns 0 rows.

This query takes about a 30 seconds (a minute if I use select *).

When I run the exact same query but with select count(col0) it takes 2 seconds.

What could cause queries to take a long time with select col but not with select count(col)?

Here's the queries explained

explain select col0 from table where `partition` = partition and col=val;

*Project [col0#607]
+- *Filter (isnotnull(col1#607) && (col1#607 = aaaa))
+- *FileScan parquet
table[col1#607,partition#611]
Batched: true,
Format: Parquet,
Location: PrunedInMemoryFileIndex[...,
PartitionCount: 23,
PartitionFilters: [isnotnull(partition#611),
(cast(partition#611 as int) = partition_name)],
PushedFilters: [IsNotNull(col1),
EqualTo(col1,aaaa)],
ReadSchema: struct

explain select count(col0) from table where `partition` = partition and col=val;

*HashAggregate(keys=[], functions=[count(col0#625)])
+- Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_count(col0#625)])
+- *Project [col0#625]
+- *Filter (isnotnull(col1#625) && (col1#625 = aaaa))
+- *FileScan parquet
table[col1#625,partition#629] Batched: true,
Format: Parquet,
Location: PrunedInMemoryFileIndex[...,
PartitionCount: 23,
PartitionFilters: [isnotnull(partition#629),
(cast(partition#629 as int) = partition_name)],
PushedFilters: [IsNotNull(col1),
EqualTo(col1,aaaa)],
ReadSchema: struct

As far as I can tell, the process is exactly the same, only the count query has more steps. So how come it's 15x faster?

Edit:

I found this interesting nugget in the logs:

with count:

18/06/28 11:42:55 INFO TaskSetManager: Starting task 0.0 in stage 2509.0 (TID 8092, ip-123456, executor 36, partition 0, RACK_LOCAL, 5521 bytes)
18/06/28 11:42:55 INFO TaskSetManager: Starting task 1.0 in stage 2509.0 (TID 8093, ip-123456, executor 35, partition 1, RACK_LOCAL, 5521 bytes)
18/06/28 11:42:55 INFO TaskSetManager: Starting task 2.0 in stage 2509.0 (TID 8094, ip-123456, executor 36, partition 2, RACK_LOCAL, 5521 bytes)
18/06/28 11:42:55 INFO TaskSetManager: Starting task 3.0 in stage 2509.0 (TID 8095, ip-123456, executor 35, partition 3, RACK_LOCAL, 5521 bytes)
18/06/28 11:42:55 INFO TaskSetManager: Starting task 4.0 in stage 2509.0 (TID 8096, ip-123456, executor 36, partition 4, RACK_LOCAL, 5521 bytes)
18/06/28 11:42:55 INFO TaskSetManager: Starting task 5.0 in stage 2509.0 (TID 8097, ip-123456, executor 35, partition 5, RACK_LOCAL, 5521 bytes)
18/06/28 11:42:55 INFO TaskSetManager: Starting task 6.0 in stage 2509.0 (TID 8098, ip-123456, executor 36, partition 6, RACK_LOCAL, 5521 bytes)
18/06/28 11:42:55 INFO TaskSetManager: Starting task 7.0 in stage 2509.0 (TID 8099, ip-123456, executor 35, partition 7, RACK_LOCAL, 5521 bytes)
18/06/28 11:42:55 INFO TaskSetManager: Starting task 8.0 in stage 2509.0 (TID 8100, ip-123456, executor 36, partition 8, RACK_LOCAL, 5521 bytes)
18/06/28 11:42:55 INFO TaskSetManager: Starting task 9.0 in stage 2509.0 (TID 8101, ip-123456, executor 35, partition 9, RACK_LOCAL, 5521 bytes)

  • without: *

18/06/28 11:45:32 INFO TaskSetManager: Starting task 0.0 in stage 2512.0 (TID 8136, ip-10-117-49-97.eu-west-1.compute.internal, executor 37, partition 1, RACK_LOCAL, 5532 bytes)
18/06/28 11:45:32 INFO BlockManagerInfo: Added broadcast_2352_piece0 in memory on ip-10-117-49-97.eu-west-1.compute.internal:40489 (size: 12.6 KB, free: 11.6 GB)
18/06/28 11:45:32 INFO TaskSetManager: Finished task 0.0 in stage 2512.0 (TID 8136) in 667 ms on ip-10-117-49-97.eu-west-1.compute.internal (executor 37) (1/1)
18/06/28 11:45:32 INFO YarnScheduler: Removed TaskSet 2512.0, whose tasks have all completed, from pool
18/06/28 11:45:32 INFO DAGScheduler: ResultStage 2512 (getNextRowSet at OperationManager.java:220) finished in 0.668 s
18/06/28 11:45:32 INFO DAGScheduler: Job 2293 finished: getNextRowSet at OperationManager.java:220, took 0.671740 s
18/06/28 11:45:32 INFO SparkContext: Starting job: getNextRowSet at OperationManager.java:220
18/06/28 11:45:32 INFO DAGScheduler: Got job 2294 (getNextRowSet at OperationManager.java:220) with 1 output partitions
18/06/28 11:45:32 INFO DAGScheduler: Final stage: ResultStage 2513 (getNextRowSet at OperationManager.java:220)
18/06/28 11:45:32 INFO DAGScheduler: Parents of final stage: List()
18/06/28 11:45:32 INFO DAGScheduler: Missing parents: List()
18/06/28 11:45:32 INFO DAGScheduler: Submitting ResultStage 2513 (MapPartitionsRDD[312] at run at AccessController.java:0), which has no missing parents
18/06/28 11:45:32 INFO MemoryStore: Block broadcast_2353 stored as values in memory (estimated size 66.6 KB, free 12.1 GB)
18/06/28 11:45:32 INFO MemoryStore: Block broadcast_2353_piece0 stored as bytes in memory (estimated size 12.6 KB, free 12.1 GB)
18/06/28 11:45:32 INFO BlockManagerInfo: Added broadcast_2353_piece0 in memory on 10.117.48.68:41493 (size: 12.6 KB, free: 12.1 GB)
18/06/28 11:45:32 INFO SparkContext: Created broadcast 2353 from broadcast at DAGScheduler.scala:1047
18/06/28 11:45:32 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2513 (MapPartitionsRDD[312] at run at AccessController.java:0) (first 15 tasks are for partitions
Vector(2)) 18/06/28 11:45:32 INFO YarnScheduler: Adding task set 2513.0 with 1 tasks
18/06/28 11:45:32 INFO TaskSetManager: Starting task 0.0 in stage 2513.0 (TID 8137, ip-10-117-49-97.eu-west-1.compute.internal, executor 37, partition 2, RACK_LOCAL, 5532 bytes)
18/06/28 11:45:33 INFO BlockManagerInfo: Added broadcast_2353_piece0 in memory on ip-10-117-49-97.eu-west-1.compute.internal:40489 (size: 12.6 KB, free: 11.6 GB)
18/06/28 11:45:38 INFO TaskSetManager: Finished task 0.0 in stage 2513.0 (TID 8137) in 5238 ms on ip-10-117-49-97.eu-west-1.compute.internal (executor 37) (1/1)
18/06/28 11:45:38 INFO YarnScheduler: Removed TaskSet 2513.0, whose tasks have all completed, from pool
18/06/28 11:45:38 INFO DAGScheduler: ResultStage 2513 (getNextRowSet at OperationManager.java:220) finished in 5.238 s
18/06/28 11:45:38 INFO DAGScheduler: Job 2294 finished: getNextRowSet at OperationManager.java:220, took 5.242084 s
18/06/28 11:45:38 INFO SparkContext: Starting job: getNextRowSet at OperationManager.java:220
18/06/28 11:45:38 INFO DAGScheduler: Got job 2295 (getNextRowSet at OperationManager.java:220) with 1 output partitions
18/06/28 11:45:38 INFO DAGScheduler: Final stage: ResultStage 2514 (getNextRowSet at OperationManager.java:220)
18/06/28 11:45:38 INFO DAGScheduler: Parents of final stage: List()
18/06/28 11:45:38 INFO DAGScheduler: Missing parents: List()
18/06/28 11:45:38 INFO DAGScheduler: Submitting ResultStage 2514 (MapPartitionsRDD[312] at run at AccessController.java:0), which has no missing parents
18/06/28 11:45:38 INFO MemoryStore: Block broadcast_2354 stored as values in memory (estimated size 66.6 KB, free 12.1 GB)
18/06/28 11:45:38 INFO MemoryStore: Block broadcast_2354_piece0 stored as bytes in memory (estimated size 12.6 KB, free 12.1 GB)
18/06/28 11:45:38 INFO BlockManagerInfo: Added broadcast_2354_piece0 in memory on 10.117.48.68:41493 (size: 12.6 KB, free: 12.1 GB)
18/06/28 11:45:38 INFO SparkContext: Created broadcast 2354 from broadcast at DAGScheduler.scala:1047
18/06/28 11:45:38 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2514 (MapPartitionsRDD[312] at run at AccessController.java:0) (first 15 tasks are for partitions Vector(3))

(i.e. it repeats this block, looks like it runs tasks sequentially ant not in parallel like in the count case)

I also tried doing "order by" and it actually made the query run 2x faster


Running the same query on the same data using spark instead of thrift was much faster.

I run thrift on aws emr-5.11.1

Hive 2.3.2

Spark 2.2.1

thrift 0.11.0

1
Thrift does not run queries. It is a RPC protocol used by Facebook, also used by Hive (developed @Facebook) for its Metastore API and for its JDBC wire protocol. Hive runs queries.Samson Scharfrichter
Spark also uses Thrift for its JDBC wire protocol (and uses the same JDBC driver). Same for Impala.Samson Scharfrichter
Hive is a batch query engine. It has to spawn a YARN job, allocate containers, start the Mapper and Reducer phases (with lots of I/O in between), wait for job completion. Spark is also a batch query engine but it pre-allocates containers (or even does not use containers) and runs its query in-memory. All that is common knowledge for anyone even remotely interested in Data Processing and Big Data.Samson Scharfrichter
For faster Hive execution: install TEZ batch framework and use instead of MapReduce => queries will run 2-6x faster. Or install LLAP service (developed by HortonWorks, may not be available in EMR) with its persistent containers and memory cache grid, and it may blow out Spark.Samson Scharfrichter

1 Answers

1
votes

Found the problem. I had this flag

spark.sql.thriftServer.incrementalCollect=true

in thriftserver. It collects the output from every worker sequentially which is what creates this massive overhead. Removing the flag fixed the issue. I guess it's optimized to not do is sequentially when doing "count", as it will necessarily not have a lot of data.