Using Spark predicate push down in Spark SQL queries

Spark predicate push down to database allows for better optimized Spark SQL queries.

Spark predicate push down to database allows for better optimized Spark queries. A predicate is a condition on a query that returns true or false, typically located in the WHERE clause. A predicate push down filters the data in the database query, reducing the number of entries retrieved from the database and improving query performance. By default the Spark Dataset API will automatically push down valid WHERE clauses to the database.

You can also use predicate push down on DSE Search indices within SearchAnalytics data centers.

Restrictions on column filters

Partition key columns can be pushed down as long as:

  • All partition key columns are included in the filter.
  • No more than one equivalence predicate per column.

Use an IN clause to specify multiple restrictions for a particular column:

val primaryColors = List("red", "yellow", "blue")

val df = spark.read.cassandraFormat("cars", "inventory").load
df.filter(df("car_color").isin(primaryColors: _*))

Clustering key columns can be pushed down with the following rules:

  • Only the last predicate in the filter can be a non equivalence predicate.
  • If there is more than one predicate for a column, the predicates cannot be equivalence predicates.

When predicate push down occurs

When a Dataset has no push down filters, all requests on the Dataset do a full unfiltered table scan. Adding predicate filters on the Dataset for eligible database columns modifies the underlying query to narrow its scope.

Determining if predicate push down is being used in queries

By using the explain method on a Dataset (or EXPLAIN in Spark SQL), queries can be analyzed to see if the predicates need to be cast to the correct data type. For example, create the following CQL table:

CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
USE test;
CREATE table words (
    user  TEXT, 
    word  TEXT, 
    count INT, 
    PRIMARY KEY (user, word));

INSERT INTO words (user, word, count ) VALUES ( 'Russ', 'dino', 10 );
INSERT INTO words (user, word, count ) VALUES ( 'Russ', 'fad', 5 );
INSERT INTO words (user, word, count ) VALUES ( 'Sam', 'alpha', 3 );
INSERT INTO words (user, word, count ) VALUES ( 'Zebra', 'zed', 100 );

Then create a Spark Dataset in the Spark console using that table and look for PushedFilters in the output after issuing the EXPLAIN command:

val df = spark.read.cassandraFormat("words", "test").load
df.explain
== Physical Plan ==
*Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [user#0,word#1,count#2] ReadSchema: struct<user:string,word:string,count:int>

Because this query doesn't filter on columns capable of being pushed down, there are no PushedFilters in the physical plan.

Adding a filter, however, does change the physical plan to include PushedFilters:

val dfWithPushdown = df.filter(df("word") > "ham")
dfWithPushdown.explain
== Physical Plan ==
*Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [user#0,word#1,count#2] PushedFilters: [*GreaterThan(word,ham)], ReadSchema: struct<user:string,word:string,count:int>

The PushedFilters section of the physical plan includes the GreaterThan push down filter. The asterisk indicates that push down filter will be handled only at the datasource level.

Troubleshooting predicate push down

When creating Spark SQL queries that use comparison operators, making sure the predicates are pushed down to the database correctly is critical to retrieving the correct data with the best performance.

For example, given a CQL table with the following schema:

CREATE TABLE test.common (
    year int,
    birthday timestamp,
    userid uuid,
    likes text,
    name text,
    PRIMARY KEY (year, birthday, userid)
)

Suppose you want to write a query that selects all entries where the birthday is earlier than a given date:

SELECT * FROM test.common WHERE birthday < '2001-1-1';

Use the EXPLAIN command to see the query plan:

EXPLAIN SELECT * FROM test.common WHERE birthday < '2001-1-1';
== Physical Plan ==
*Filter (cast(birthday#1 as string) < 2001-1-1)
+- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [year#0,birthday#1,userid#2,likes#3,name#4] ReadSchema: struct<year:int,birthday:timestamp,userid:string,likes:string,name:string>
Time taken: 0.72 seconds, Fetched 1 row(s)

Note that the Filter directive is treating the birthday column, a CQL TIMESTAMP, as a string. The query optimizer looks at this comparison and needs to make the types match before generating a predicate. In this case the optimizer decides to cast the birthday column as a string to match the string '2001-1-1', but cast functions cannot be pushed down. The predicate isn't pushed down, and it doesn't appear in PushedFilters. A full table scan will be performed at the database layer, with the results returned to Spark for further processing.

To push down the correct predicate for this query, use the cast method to specify that the predicate is comparing the birthday column to a TIMESTAMP, so the types match and the optimizer can generate the correct predicate.

EXPLAIN SELECT * FROM test.common WHERE birthday < cast('2001-1-1' as TIMESTAMP);
== Physical Plan ==
*Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [year#0,birthday#1,userid#2,likes#3,name#4] 
PushedFilters: [*LessThan(birthday,2001-01-01 00:00:00.0)], 
ReadSchema: struct<year:int,birthday:timestamp,userid:string,likes:string,name:string>
Time taken: 0.034 seconds, Fetched 1 row(s)

Note the PushedFilters indicating that the LessThan predicate will be pushed down for the column data in birthday. This should speed up the query as a full table scan will be avoided.