Spark SQLクエリーでのSpark述語プッシュ・ダウンの使用
データベースにSpark述語プッシュ・ダウンを使用すると、Spark SQLクエリーをさらに最適化できます。
データベースにSpark述語プッシュ・ダウンを使用すると、Sparkクエリーをさらに最適化できます。述語は、通常はWHERE句にあるtrueまたはfalseを返すクエリーの条件です。述語プッシュ・ダウンは、データベース・クエリー内のデータをフィルター処理し、データベースから取得するエントリーの数を減らすことで、クエリーのパフォーマンスを向上させます。デフォルトでは、Spark Dataset APIは有効なWHERE句をデータベースに自動的にプッシュダウンします。
また、SearchAnalyticsデータ・センター内のDSE検索インデックスで述語プッシュ・ダウンを使用することもできます。
列フィルターに対する制限
パーティション・キー列は、次の条件が当てはまる限り、プッシュ・ダウンすることができます。
- すべてのパーティション・キー列がフィルターに含まれている。
- 等価述語が列ごとに1つしかない。
特定の列に対して複数の制限を指定するには、IN句を使用します。
val primaryColors = List("red", "yellow", "blue")
val df = spark.read.cassandraFormat("cars", "inventory").load
df.filter(df("car_color").isin(primaryColors: _*))
      クラスター化キー列は、次のルールに従ってプッシュ・ダウンすることができます。
- フィルター内の最後の述部のみを非等価述部にすることができます。
- 1つの列に複数の述部がある場合、述部を等価述部にすることはできません。
述語プッシュ・ダウンが行われた場合
データセットにプッシュ・ダウン フィルターがない場合、データセット上のすべての要求によって、フィルター処理されていないテーブル全体のスキャンが実行されます。適用可能なデータベース列のデータセットに述部フィルターを追加すると、ベースとなるクエリーが変更され、その範囲が絞り込まれます。
述語プッシュ・ダウンがクエリーで使用されているかどうかを確認する
データセット(またはSpark SQLのEXPLAIN)でexplainメソッドを使用すると、クエリーを分析して、述語を適切なデータ型にキャストする必要があるかどうかを確認できます。たとえば、以下のCQLテーブルを作成します。
CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
USE test;
CREATE table words (
    user  TEXT, 
    word  TEXT, 
    count INT, 
    PRIMARY KEY (user, word));
INSERT INTO words (user, word, count ) VALUES ( 'Russ', 'dino', 10 );
INSERT INTO words (user, word, count ) VALUES ( 'Russ', 'fad', 5 );
INSERT INTO words (user, word, count ) VALUES ( 'Sam', 'alpha', 3 );
INSERT INTO words (user, word, count ) VALUES ( 'Zebra', 'zed', 100 );
      次に、そのテーブルを使用してSparkコンソールにSpark Datasetを作成し、EXPLAINコマンドを発行した後、出力でPushedFiltersを探します。
val df = spark.read.cassandraFormat("words", "test").load
df.explain
      == Physical Plan == *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [user#0,word#1,count#2] ReadSchema: struct<user:string,word:string,count:int>
このクエリーでは、プッシュ・ダウンできる列にフィルターを適用しないため、物理プランにはPushedFiltersは含まれていません。
ただし、フィルターを追加すると、PushedFiltersを含めるように物理プランが変更されます。
val dfWithPushdown = df.filter(df("word") > "ham")
dfWithPushdown.explain
      == Physical Plan == *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [user#0,word#1,count#2] PushedFilters: [*GreaterThan(word,ham)], ReadSchema: struct<user:string,word:string,count:int>
物理プランのPushedFiltersセクションに、GreaterThanプッシュ・ダウン・フィルターが含まれています。アスタリスクは、プッシュ・ダウン・フィルターがデータソース・レベルでのみ処理されることを示します。
述語プッシュ・ダウンのトラブルシューティング
比較演算子を使用するSpark SQLクエリーを作成する場合、述語がデータベースに正常にプッシュ・ダウンされることを確認するのは、最適なパフォーマンスで適切なデータを取得するために重要です。
たとえば、次のスキーマを持つCQLテーブルがあるとします。
CREATE TABLE test.common (
    year int,
    birthday timestamp,
    userid uuid,
    likes text,
    name text,
    PRIMARY KEY (year, birthday, userid)
)
      誕生日が指定日より前のエントリーをすべて選択するクエリーを作成するとします。
SELECT * FROM test.common WHERE birthday < '2001-1-1';
      EXPLAINコマンドを使用して、クエリー・プランを表示します。
EXPLAIN SELECT * FROM test.common WHERE birthday < '2001-1-1';== Physical Plan == *Filter (cast(birthday#1 as string) < 2001-1-1) +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [year#0,birthday#1,userid#2,likes#3,name#4] ReadSchema: struct<year:int,birthday:timestamp,userid:string,likes:string,name:string> Time taken: 0.72 seconds, Fetched 1 row(s)
Filterディレクティブは、birthday列であるCQL TIMESTAMPを文字列として扱うことに注意してください。クエリー最適化ツールは、この比較を調べて、述語を生成する前に型を一致させる必要があります。この場合、最適化ツールは文字列'2001-1-1'と一致するようにbirthday列を文字列としてキャストすることを決定しますが、キャスト関数はプッシュ・ダウンできません。述語はプッシュ・ダウンされず、PushedFiltersには表示されません。データベース層でテーブル全体のスキャンが行われ、結果はSparkに返されて、さらに処理が行われます。
このクエリーに適切な述部をプッシュ・ダウンするには、castメソッドを使用して、述語がbirthday列とTIMESTAMPを比較するように指定します。したがって、型が一致し、最適化ツールによって適切な述部を生成できます。
EXPLAIN SELECT * FROM test.common WHERE birthday < cast('2001-1-1' as TIMESTAMP);== Physical Plan == *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [year#0,birthday#1,userid#2,likes#3,name#4] PushedFilters: [*LessThan(birthday,2001-01-01 00:00:00.0)], ReadSchema: struct<year:int,birthday:timestamp,userid:string,likes:string,name:string> Time taken: 0.034 seconds, Fetched 1 row(s)
PushedFiltersが、birthdayの列データに対してLessThan述部がプッシュ・ダウンされることを示していることに注意してください。これにより、テーブル全体のスキャンが回避されるため、クエリーの処理が迅速に行われます。
