テーブルへのテキスト・ファイルのインポート

この例では、Sparkを使用して、ローカルまたはCFS(Cassandraファイル・システム)ベースのテキスト・ファイルを既存のテーブルにインポートする方法を示します。

この例では、Sparkを使用して、ローカルまたはCFS(Cassandraファイル・システム)ベースのテキスト・ファイルを既存のテーブルにインポートする方法を示します。任意のRDDのデータベースへの保存には、Spark RDDに含まれているsaveToCassandraメソッドを使用します。

手順

  1. キースペースとテーブルをデータベースで作成します。たとえば、cqlshを使用します。
    CREATE KEYSPACE int_ks WITH replication = 
      {'class': 'NetworkTopologyStrategy', 'Analytics':1};
    USE int_ks;
    CREATE TABLE int_compound ( pkey int, ckey1 int, data1 int , PRIMARY KEY (pkey,ckey1));
  2. テーブルにデータを挿入します。
    INSERT INTO int_compound ( pkey, ckey1, data1 ) VALUES ( 1, 2, 3 );
    INSERT INTO int_compound ( pkey, ckey1, data1 ) VALUES ( 2, 3, 4 );
    INSERT INTO int_compound ( pkey, ckey1, data1 ) VALUES ( 3, 4, 5 );
    INSERT INTO int_compound ( pkey, ckey1, data1 ) VALUES ( 4, 5, 1 );
    INSERT INTO int_compound ( pkey, ckey1, data1 ) VALUES ( 5, 1, 2 );
  3. 以下のデータを含んでいる、normalfill.csvという名前のテキスト・ファイルを作成します。
    6,7,8
    7,8,6
    8,6,7
  4. CSVファイルをCFSに配置します。たとえば、Linuxでは以下のようにします。
    bin/dse hadoop fs -put mypath/normalfill.csv /
  5. Sparkシェルを起動します。
  6. Sparkがint_ksキースペースにアクセスできることを確認します。
    scala> :showSchema int_ks
    ========================================
     Keyspace: int_ks
    ========================================
     Table: int_compound
    ----------------------------------------
     - pkey  : Int (partition key column)
     - ckey1 : Int (clustering column)
     - data1 : Int
    int_ksがキースペースのリストに表示されます。
  7. CFSからファイルを読み込んで、コンマ・デリミターで分割します。各要素を整数に変換します。
    scala> val normalfill = sc.textFile("/normalfill.csv").map(line => line.split(",").map(_.toInt));
    normalfill: org.apache.spark.rdd.RDD[Array[Int]] = MappedRDD[2] at map at console:22
    または、ローカル・ファイル・システムからファイルを読み込みます。
    scala> val file = sc.textFile("file:///local-path/normalfill.csv")
    file: org.apache.spark.rdd.RDD[String] = MappedRDD[4] at textFile at console:22
  8. SparkがCSVファイルを見つけて読み取れることを確認します。
    scala> normalfill.take(1);
    res2: Array[Array[Int]] = Array(Array(6, 7, 8))
  9. 新しいデータをデータベースに保存します。
    scala> normalfill.map(line => (line(0), line(1), line(2))).saveToCassandra(
     "int_ks", "int_compound", Seq("pkey", "ckey1", "data1"))
    
    scala>
    このステップでは出力は生成されません。
  10. cqlshを使用してデータが保存されたことを確認します。
    SELECT * FROM int_ks.int_compound;
         
     pkey | ckey1 | data1
    ------+-------+-------
        5 |     1 |     2
        1 |     2 |     3
        8 |     6 |     7
        2 |     3 |     4
        4 |     5 |     1
        7 |     8 |     6
        6 |     7 |     8
        3 |     4 |     5
    
      (8 rows)