テーブルへのテキスト・ファイルのインポート
この例では、Sparkを使用して、ローカルまたはCFS(Cassandraファイル・システム)ベースのテキスト・ファイルを既存のテーブルにインポートする方法を示します。
この例では、Sparkを使用して、ローカルまたはCFS(Cassandraファイル・システム)ベースのテキスト・ファイルを既存のテーブルにインポートする方法を示します。任意のRDDのデータベースへの保存には、Spark RDDに含まれているsaveToCassandraメソッドを使用します。
手順
- 
                キースペースとテーブルをデータベースで作成します。たとえば、cqlshを使用します。CREATE KEYSPACE int_ks WITH replication = {'class': 'NetworkTopologyStrategy', 'Analytics':1}; USE int_ks; CREATE TABLE int_compound ( pkey int, ckey1 int, data1 int , PRIMARY KEY (pkey,ckey1));
- 
                テーブルにデータを挿入します。
                INSERT INTO int_compound ( pkey, ckey1, data1 ) VALUES ( 1, 2, 3 ); INSERT INTO int_compound ( pkey, ckey1, data1 ) VALUES ( 2, 3, 4 ); INSERT INTO int_compound ( pkey, ckey1, data1 ) VALUES ( 3, 4, 5 ); INSERT INTO int_compound ( pkey, ckey1, data1 ) VALUES ( 4, 5, 1 ); INSERT INTO int_compound ( pkey, ckey1, data1 ) VALUES ( 5, 1, 2 );
- 
                以下のデータを含んでいる、normalfill.csvという名前のテキスト・ファイルを作成します。
                6,7,8 7,8,6 8,6,7
- 
                CSVファイルをCFSに配置します。たとえば、Linuxでは以下のようにします。
                bin/dse hadoop fs -put mypath/normalfill.csv / 
- Sparkシェルを起動します。
- 
                Sparkがint_ksキースペースにアクセスできることを確認します。scala> :showSchema int_ks ======================================== Keyspace: int_ks ======================================== Table: int_compound ---------------------------------------- - pkey : Int (partition key column) - ckey1 : Int (clustering column) - data1 : Intint_ksがキースペースのリストに表示されます。
- 
                CFSからファイルを読み込んで、コンマ・デリミターで分割します。各要素を整数に変換します。
                scala> val normalfill = sc.textFile("/normalfill.csv").map(line => line.split(",").map(_.toInt)); normalfill: org.apache.spark.rdd.RDD[Array[Int]] = MappedRDD[2] at map at console:22または、ローカル・ファイル・システムからファイルを読み込みます。scala> val file = sc.textFile("file:///local-path/normalfill.csv") file: org.apache.spark.rdd.RDD[String] = MappedRDD[4] at textFile at console:22
- 
                SparkがCSVファイルを見つけて読み取れることを確認します。
                scala> normalfill.take(1); res2: Array[Array[Int]] = Array(Array(6, 7, 8))
- 
                新しいデータをデータベースに保存します。
                scala> normalfill.map(line => (line(0), line(1), line(2))).saveToCassandra( "int_ks", "int_compound", Seq("pkey", "ckey1", "data1")) scala>このステップでは出力は生成されません。
- 
                cqlshを使用してデータが保存されたことを確認します。SELECT * FROM int_ks.int_compound; pkey | ckey1 | data1 ------+-------+------- 5 | 1 | 2 1 | 2 | 3 8 | 6 | 7 2 | 3 | 4 4 | 5 | 1 7 | 8 | 6 6 | 7 | 8 3 | 4 | 5 (8 rows)
