Spark SQLクエリーでのSpark述語プッシュ・ダウンの使用
データベースにSpark述語プッシュ・ダウンを使用すると、Spark SQLクエリーをさらに最適化できます。
データベースにSpark述語プッシュ・ダウンを使用すると、Sparkクエリーをさらに最適化できます。述語は、通常はWHERE
句にあるtrueまたはfalseを返すクエリーの条件です。述語プッシュ・ダウンは、データベース・クエリー内のデータをフィルター処理し、データベースから取得するエントリーの数を減らすことで、クエリーのパフォーマンスを向上させます。デフォルトでは、Spark Dataset APIは有効なWHERE
句をデータベースに自動的にプッシュダウンします。
また、SearchAnalyticsデータ・センター内のDSE検索インデックスで述語プッシュ・ダウンを使用することもできます。
列フィルターに対する制限
パーティション・キー列は、次の条件が当てはまる限り、プッシュ・ダウンすることができます。
- すべてのパーティション・キー列がフィルターに含まれている。
- 等価述語が列ごとに1つしかない。
特定の列に対して複数の制限を指定するには、IN
句を使用します。
val primaryColors = List("red", "yellow", "blue") val df = spark.read.cassandraFormat("cars", "inventory").load df.filter(df("car_color").isin(primaryColors: _*))
クラスター化キー列は、次のルールに従ってプッシュ・ダウンすることができます。
- フィルター内の最後の述部のみを非等価述部にすることができます。
- 1つの列に複数の述部がある場合、述部を等価述部にすることはできません。
述語プッシュ・ダウンが行われた場合
データセットにプッシュ・ダウン フィルターがない場合、データセット上のすべての要求によって、フィルター処理されていないテーブル全体のスキャンが実行されます。適用可能なデータベース列のデータセットに述部フィルターを追加すると、ベースとなるクエリーが変更され、その範囲が絞り込まれます。
述語プッシュ・ダウンがクエリーで使用されているかどうかを確認する
データセット(またはSpark SQLのEXPLAIN
)でexplain
メソッドを使用すると、クエリーを分析して、述語を適切なデータ型にキャストする必要があるかどうかを確認できます。たとえば、以下のCQLテーブルを作成します。
CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }; USE test; CREATE table words ( user TEXT, word TEXT, count INT, PRIMARY KEY (user, word)); INSERT INTO words (user, word, count ) VALUES ( 'Russ', 'dino', 10 ); INSERT INTO words (user, word, count ) VALUES ( 'Russ', 'fad', 5 ); INSERT INTO words (user, word, count ) VALUES ( 'Sam', 'alpha', 3 ); INSERT INTO words (user, word, count ) VALUES ( 'Zebra', 'zed', 100 );
次に、そのテーブルを使用してSparkコンソールにSpark Datasetを作成し、EXPLAIN
コマンドを発行した後、出力でPushedFilters
を探します。
val df = spark.read.cassandraFormat("words", "test").load df.explain
== Physical Plan == *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [user#0,word#1,count#2] ReadSchema: struct<user:string,word:string,count:int>
このクエリーでは、プッシュ・ダウンできる列にフィルターを適用しないため、物理プランにはPushedFilters
は含まれていません。
ただし、フィルターを追加すると、PushedFilters
を含めるように物理プランが変更されます。
val dfWithPushdown = df.filter(df("word") > "ham") dfWithPushdown.explain
== Physical Plan == *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [user#0,word#1,count#2] PushedFilters: [*GreaterThan(word,ham)], ReadSchema: struct<user:string,word:string,count:int>
物理プランのPushedFilters
セクションに、GreaterThan
プッシュ・ダウン・フィルターが含まれています。アスタリスクは、プッシュ・ダウン・フィルターがデータソース・レベルでのみ処理されることを示します。
述語プッシュ・ダウンのトラブルシューティング
比較演算子を使用するSpark SQLクエリーを作成する場合、述語がデータベースに正常にプッシュ・ダウンされることを確認するのは、最適なパフォーマンスで適切なデータを取得するために重要です。
たとえば、次のスキーマを持つCQLテーブルがあるとします。
CREATE TABLE test.common ( year int, birthday timestamp, userid uuid, likes text, name text, PRIMARY KEY (year, birthday, userid) )
誕生日が指定日より前のエントリーをすべて選択するクエリーを作成するとします。
SELECT * FROM test.common WHERE birthday < '2001-1-1';
EXPLAIN
コマンドを使用して、クエリー・プランを表示します。
EXPLAIN SELECT * FROM test.common WHERE birthday < '2001-1-1';
== Physical Plan == *Filter (cast(birthday#1 as string) < 2001-1-1) +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [year#0,birthday#1,userid#2,likes#3,name#4] ReadSchema: struct<year:int,birthday:timestamp,userid:string,likes:string,name:string> Time taken: 0.72 seconds, Fetched 1 row(s)
Filter
ディレクティブは、birthday
列であるCQL TIMESTAMP
を文字列として扱うことに注意してください。クエリー最適化ツールは、この比較を調べて、述語を生成する前に型を一致させる必要があります。この場合、最適化ツールは文字列'2001-1-1'
と一致するようにbirthday
列を文字列としてキャストすることを決定しますが、キャスト関数はプッシュ・ダウンできません。述語はプッシュ・ダウンされず、PushedFilters
には表示されません。データベース層でテーブル全体のスキャンが行われ、結果はSparkに返されて、さらに処理が行われます。
このクエリーに適切な述部をプッシュ・ダウンするには、cast
メソッドを使用して、述語がbirthday
列とTIMESTAMP
を比較するように指定します。したがって、型が一致し、最適化ツールによって適切な述部を生成できます。
EXPLAIN SELECT * FROM test.common WHERE birthday < cast('2001-1-1' as TIMESTAMP);
== Physical Plan == *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [year#0,birthday#1,userid#2,likes#3,name#4] PushedFilters: [*LessThan(birthday,2001-01-01 00:00:00.0)], ReadSchema: struct<year:int,birthday:timestamp,userid:string,likes:string,name:string> Time taken: 0.034 seconds, Fetched 1 row(s)
PushedFilters
が、birthday
の列データに対してLessThan
述部がプッシュ・ダウンされることを示していることに注意してください。これにより、テーブル全体のスキャンが回避されるため、クエリーの処理が迅速に行われます。