Importing a text file into a table

This example shows how to use Spark to import a local or DSEFS based text file into an existing table.

This example shows how to use Spark to import a local or DSEFS based text file into an existing table. You use the saveToCassandra method present in the Spark RDDs to save an arbitrary RDD to the database.

Procedure

  1. Create a keyspace and a table in the database. For example, use cqlsh.
    CREATE KEYSPACE int_ks WITH replication = 
      {'class': 'NetworkTopologyStrategy', 'Analytics':1};
    USE int_ks;
    CREATE TABLE int_compound ( pkey int, ckey1 int, data1 int , PRIMARY KEY (pkey,ckey1));
  2. Insert data into the table
    INSERT INTO int_compound ( pkey, ckey1, data1 ) VALUES ( 1, 2, 3 );
    INSERT INTO int_compound ( pkey, ckey1, data1 ) VALUES ( 2, 3, 4 );
    INSERT INTO int_compound ( pkey, ckey1, data1 ) VALUES ( 3, 4, 5 );
    INSERT INTO int_compound ( pkey, ckey1, data1 ) VALUES ( 4, 5, 1 );
    INSERT INTO int_compound ( pkey, ckey1, data1 ) VALUES ( 5, 1, 2 );
  3. Create a text file named normalfill.csv that contains this data.
    6,7,8
    7,8,6
    8,6,7
  4. Put the CSV file into DSEFS. For example:
    dse hadoop fs -put mypath/normalfill.csv /
  5. Start the Spark shell.
  6. Verify that Spark can access the int_ks keyspace:
    :showSchema int_ks
    
    ========================================
     Keyspace: int_ks
    ========================================
     Table: int_compound
    ----------------------------------------
     - pkey  : Int (partition key column)
     - ckey1 : Int (clustering column)
     - data1 : Int
    int_ks appears in the list of keyspaces.
  7. Read in the file, splitting it on the comma delimiter. Transform each element into an Integer.
    val normalfill = sc.textFile("/normalfill.csv").map(line => line.split(",").map(_.toInt));
    normalfill: org.apache.spark.rdd.RDD[Array[Int]] = MappedRDD[2] at map at console:22
    Alternatively, read in the file from the local file system.
    val file = sc.textFile("file:///local-path/normalfill.csv")
    
    file: org.apache.spark.rdd.RDD[String] = MappedRDD[4] at textFile at console:22
  8. Check that Spark can find and read the CSV file.
    normalfill.take(1);
    
    res2: Array[Array[Int]] = Array(Array(6, 7, 8))
  9. Save the new data to the database.
    normalfill.map(line => (line(0), line(1), line(2))).saveToCassandra(
     "int_ks", "int_compound", Seq("pkey", "ckey1", "data1"))
    
    The step produces no output.
  10. Check that the data was saved using cqlsh.
    SELECT * FROM int_ks.int_compound;
         
     pkey | ckey1 | data1
    ------+-------+-------
        5 |     1 |     2
        1 |     2 |     3
        8 |     6 |     7
        2 |     3 |     4
        4 |     5 |     1
        7 |     8 |     6
        6 |     7 |     8
        3 |     4 |     5
    
      (8 rows)