0
votes

After days thinking about it I'm still stuck with this problem: I have one table where "timestamp" is the partition key. This table contains billions of rows.

I also have "timeseries" tables that contain timestamps related to specific measurement processes.

With Spark I want to analyze the content of the big table. Of course it is not efficient to do a full table scan, and with a rather fast lookup in the timeseries table I should be able to target only, say, 10k partitions.

What is the most efficient way to achieve this?

Is SparkSQL smart enough to optimize something like this

  sqlContext.sql("""
  SELECT timeseries.timestamp, bigtable.value1 FROM timeseries 
  JOIN bigtable ON bigtable.timestamp = timeseries.timestamp
  WHERE timeseries.parameter = 'xyz'
""")

Ideally I would expect Cassandra to fetch the timestamps from the timeseries table and then use that to query only that subset of partitions from bigtable.

2

2 Answers

2
votes

If you add an "Explain" call to your query you'll see what the Catalyst planner will do for your query but I know it will not do the optimizations you want.

Currently Catalyst has no support for pushing down joins to DataSources which means the structure of your query is most likely got to look like.

Read Data From Table timeseries with predicate parameter = 'xyz'
Read Data From Table bigtable
Join these two results
Filter on bigtable.timestamp == timeseries.timestamp

The Spark Cassandra Connector will be given the predicate from the timeseries table read and will be able to optimize it if is a clustering key or a partition key. See the Spark Cassandra Connector Docs. If it doesn't fit into one of those pushdown categories it will require a Full Table Scan followed by a filter in Spark.

Since the Read Data From Table bigtable has no restrictions on it, Spark will instruct the Connector to read the entire table (Full Table Scan).

1
votes

I can only take a guess on the optimizations done by the driver, but I'd surely expect a query such as that to restrict the JOIN on the WHERE, which means that your simple query will be optimized.

What I will do as well is point you in the general direction of optimizing Spark SQL. Have a look at Catalyst for Spark SQL, which is a tool for greatly optimizing queries all the way down to the physical level.

Here is a breakdown of how it works: Deep Dive into Spark SQL Catalyst Optimizer

And the link to the git-repo: Catalyst repo