Using Spark predicate push down in Spark SQL queries
Spark predicate push down to database allows for better optimized Spark queries.
A predicate is a condition on a query that returns true or false, typically located in the WHERE
clause.
A predicate push down filters the data in the database query, reducing the number of entries retrieved from the database and improving query performance.
By default the Spark Dataset API will automatically push down valid WHERE
clauses to the database.
You can also use predicate push down on DSE Search indices within SearchAnalytics data centers.
Restrictions on column filters
Partition key columns can be pushed down as long as:
-
All partition key columns are included in the filter.
-
No more than one equivalence predicate per column.
Use an IN
clause to specify multiple restrictions for a particular column:
val primaryColors = List("red", "yellow", "blue")
val df = spark.read.cassandraFormat("cars", "inventory").load
df.filter(df("car_color").isin(primaryColors: _*))
Clustering key columns can be pushed down with the following rules:
-
Only the last predicate in the filter can be a non equivalence predicate.
-
If there is more than one predicate for a column, the predicates cannot be equivalence predicates.
When predicate push down occurs
When a Dataset has no push down filters, all requests on the Dataset do a full unfiltered table scan. Adding predicate filters on the Dataset for eligible database columns modifies the underlying query to narrow its scope.
Determining if predicate push down is being used in queries
By using the explain
method on a Dataset (or EXPLAIN
in Spark SQL), queries can be analyzed to see if the predicates need to be cast to the correct data type.
For example, create the following CQL table:
CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
USE test;
CREATE table words (
user TEXT,
word TEXT,
count INT,
PRIMARY KEY (user, word));
INSERT INTO words (user, word, count ) VALUES ( 'Russ', 'dino', 10 );
INSERT INTO words (user, word, count ) VALUES ( 'Russ', 'fad', 5 );
INSERT INTO words (user, word, count ) VALUES ( 'Sam', 'alpha', 3 );
INSERT INTO words (user, word, count ) VALUES ( 'Zebra', 'zed', 100 );
Then create a Spark Dataset in the Spark console using that table and look for PushedFilters
in the output after issuing the EXPLAIN
command:
val df = spark.read.cassandraFormat("words", "test").load
df.explain
== Physical Plan ==
*Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [user#0,word#1,count#2] ReadSchema: struct<user:string,word:string,count:int>
Because this query doesn’t filter on columns capable of being pushed down, there are no PushedFilters
in the physical plan.
Adding a filter, however, does change the physical plan to include PushedFilters
:
val dfWithPushdown = df.filter(df("word") > "ham")
dfWithPushdown.explain
== Physical Plan ==
*Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [user#0,word#1,count#2] PushedFilters: [*GreaterThan(word,ham)], ReadSchema: struct<user:string,word:string,count:int>
The PushedFilters
section of the physical plan includes the GreaterThan
push down filter.
The asterisk indicates that push down filter will be handled only at the datasource level.
Troubleshooting predicate push down
When creating Spark SQL queries that use comparison operators, making sure the predicates are pushed down to the database correctly is critical to retrieving the correct data with the best performance.
For example, given a CQL table with the following schema:
CREATE TABLE test.common (
year int,
birthday timestamp,
userid uuid,
likes text,
name text,
PRIMARY KEY (year, birthday, userid)
)
Suppose you want to write a query that selects all entries where the birthday is earlier than a given date:
SELECT * FROM test.common WHERE birthday < '2001-1-1';
Use the EXPLAIN
command to see the query plan:
EXPLAIN SELECT * FROM test.common WHERE birthday < '2001-1-1';
== Physical Plan ==
*Filter (cast(birthday#1 as string) < 2001-1-1)
+- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [year#0,birthday#1,userid#2,likes#3,name#4] ReadSchema: struct<year:int,birthday:timestamp,userid:string,likes:string,name:string>
Time taken: 0.72 seconds, Fetched 1 row(s)
Note that the Filter
directive is treating the birthday
column, a CQL TIMESTAMP
, as a string.
The query optimizer looks at this comparison and needs to make the types match before generating a predicate.
In this case the optimizer decides to cast the birthday
column as a string to match the string '2001-1-1'
, but cast functions cannot be pushed down.
The predicate isn’t pushed down, and it doesn’t appear in PushedFilters
.
A full table scan will be performed at the database layer, with the results returned to Spark for further processing.
To push down the correct predicate for this query, use the cast
method to specify that the predicate is comparing the birthday
column to a TIMESTAMP
, so the types match and the optimizer can generate the correct predicate.
EXPLAIN SELECT * FROM test.common WHERE birthday < cast('2001-1-1' as TIMESTAMP);
== Physical Plan ==
*Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [year#0,birthday#1,userid#2,likes#3,name#4]
PushedFilters: [*LessThan(birthday,2001-01-01 00:00:00.0)],
ReadSchema: struct<year:int,birthday:timestamp,userid:string,likes:string,name:string>
Time taken: 0.034 seconds, Fetched 1 row(s)
Note the PushedFilters
indicating that the LessThan
predicate will be pushed down for the column data in birthday
.
This should speed up the query as a full table scan will be avoided.