Spark SQLクエリーでのSpark述語プッシュ・ダウンの使用

データベースにSpark述語プッシュ・ダウンを使用すると、Spark SQLクエリーをさらに最適化できます。

データベースにSpark述語プッシュ・ダウンを使用すると、Sparkクエリーをさらに最適化できます。述語は、通常はWHERE句にあるtrueまたはfalseを返すクエリーの条件です。述語プッシュ・ダウンは、データベース・クエリー内のデータをフィルター処理し、データベースから取得するエントリーの数を減らすことで、クエリーのパフォーマンスを向上させます。デフォルトでは、Spark Dataset APIは有効なWHERE句をデータベースに自動的にプッシュダウンします。

また、SearchAnalyticsデータ・センター内のDSE検索インデックスで述語プッシュ・ダウンを使用することもできます。

列フィルターに対する制限

パーティション・キー列は、次の条件が当てはまる限り、プッシュ・ダウンすることができます。

  • すべてのパーティション・キー列がフィルターに含まれている。
  • 等価述語が列ごとに1つしかない。

特定の列に対して複数の制限を指定するには、IN句を使用します。

val primaryColors = List("red", "yellow", "blue")

val df = spark.read.cassandraFormat("cars", "inventory").load
df.filter(df("car_color").isin(primaryColors: _*))

クラスター化キー列は、次のルールに従ってプッシュ・ダウンすることができます。

  • フィルター内の最後の述部のみを非等価述部にすることができます。
  • 1つの列に複数の述部がある場合、述部を等価述部にすることはできません。

述語プッシュ・ダウンが行われた場合

データセットにプッシュ・ダウン フィルターがない場合、データセット上のすべての要求によって、フィルター処理されていないテーブル全体のスキャンが実行されます。適用可能なデータベース列のデータセットに述部フィルターを追加すると、ベースとなるクエリーが変更され、その範囲が絞り込まれます。

述語プッシュ・ダウンがクエリーで使用されているかどうかを確認する

データセット(またはSpark SQLのEXPLAIN)でexplainメソッドを使用すると、クエリーを分析して、述語を適切なデータ型にキャストする必要があるかどうかを確認できます。たとえば、以下のCQLテーブルを作成します。

CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
USE test;
CREATE table words (
    user  TEXT, 
    word  TEXT, 
    count INT, 
    PRIMARY KEY (user, word));

INSERT INTO words (user, word, count ) VALUES ( 'Russ', 'dino', 10 );
INSERT INTO words (user, word, count ) VALUES ( 'Russ', 'fad', 5 );
INSERT INTO words (user, word, count ) VALUES ( 'Sam', 'alpha', 3 );
INSERT INTO words (user, word, count ) VALUES ( 'Zebra', 'zed', 100 );

次に、そのテーブルを使用してSparkコンソールにSpark Datasetを作成し、EXPLAINコマンドを発行した後、出力でPushedFiltersを探します。

val df = spark.read.cassandraFormat("words", "test").load
df.explain
== Physical Plan ==
*Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [user#0,word#1,count#2] ReadSchema: struct<user:string,word:string,count:int>

このクエリーでは、プッシュ・ダウンできる列にフィルターを適用しないため、物理プランにはPushedFiltersは含まれていません。

ただし、フィルターを追加すると、PushedFiltersを含めるように物理プランが変更されます。

val dfWithPushdown = df.filter(df("word") > "ham")
dfWithPushdown.explain
== Physical Plan ==
*Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [user#0,word#1,count#2] PushedFilters: [*GreaterThan(word,ham)], ReadSchema: struct<user:string,word:string,count:int>

物理プランのPushedFiltersセクションに、GreaterThanプッシュ・ダウン・フィルターが含まれています。アスタリスクは、プッシュ・ダウン・フィルターがデータソース・レベルでのみ処理されることを示します。

述語プッシュ・ダウンのトラブルシューティング

比較演算子を使用するSpark SQLクエリーを作成する場合、述語がデータベースに正常にプッシュ・ダウンされることを確認するのは、最適なパフォーマンスで適切なデータを取得するために重要です。

たとえば、次のスキーマを持つCQLテーブルがあるとします。

CREATE TABLE test.common (
    year int,
    birthday timestamp,
    userid uuid,
    likes text,
    name text,
    PRIMARY KEY (year, birthday, userid)
)

誕生日が指定日より前のエントリーをすべて選択するクエリーを作成するとします。

SELECT * FROM test.common WHERE birthday < '2001-1-1';

EXPLAINコマンドを使用して、クエリー・プランを表示します。

EXPLAIN SELECT * FROM test.common WHERE birthday < '2001-1-1';
== Physical Plan ==
*Filter (cast(birthday#1 as string) < 2001-1-1)
+- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [year#0,birthday#1,userid#2,likes#3,name#4] ReadSchema: struct<year:int,birthday:timestamp,userid:string,likes:string,name:string>
Time taken: 0.72 seconds, Fetched 1 row(s)

Filterディレクティブは、birthday列であるCQL TIMESTAMPを文字列として扱うことに注意してください。クエリー最適化ツールは、この比較を調べて、述語を生成する前に型を一致させる必要があります。この場合、最適化ツールは文字列'2001-1-1'と一致するようにbirthday列を文字列としてキャストすることを決定しますが、キャスト関数はプッシュ・ダウンできません。述語はプッシュ・ダウンされず、PushedFiltersには表示されません。データベース層でテーブル全体のスキャンが行われ、結果はSparkに返されて、さらに処理が行われます。

このクエリーに適切な述部をプッシュ・ダウンするには、castメソッドを使用して、述語がbirthday列とTIMESTAMPを比較するように指定します。したがって、型が一致し、最適化ツールによって適切な述部を生成できます。

EXPLAIN SELECT * FROM test.common WHERE birthday < cast('2001-1-1' as TIMESTAMP);
== Physical Plan ==
*Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [year#0,birthday#1,userid#2,likes#3,name#4] 
PushedFilters: [*LessThan(birthday,2001-01-01 00:00:00.0)], 
ReadSchema: struct<year:int,birthday:timestamp,userid:string,likes:string,name:string>
Time taken: 0.034 seconds, Fetched 1 row(s)

PushedFiltersが、birthdayの列データに対してLessThan述部がプッシュ・ダウンされることを示していることに注意してください。これにより、テーブル全体のスキャンが回避されるため、クエリーの処理が迅速に行われます。