Importing a text file into a table

This example shows how to use Spark to import a local or CFS (Cassandra File System)-based text file into an existing table. You use the saveToCassandra method present in the Spark RDDs to save an arbitrary RDD to the database.

  1. Create a keyspace and a table in the database. For example, use cqlsh.

    CREATE KEYSPACE int_ks WITH replication =
      {'class': 'NetworkTopologyStrategy', 'Analytics':1};
    USE int_ks;
    CREATE TABLE int_compound ( pkey int, ckey1 int, data1 int , PRIMARY KEY (pkey,ckey1));
  2. Insert data into the table

    INSERT INTO int_compound ( pkey, ckey1, data1 ) VALUES ( 1, 2, 3 );
    INSERT INTO int_compound ( pkey, ckey1, data1 ) VALUES ( 2, 3, 4 );
    INSERT INTO int_compound ( pkey, ckey1, data1 ) VALUES ( 3, 4, 5 );
    INSERT INTO int_compound ( pkey, ckey1, data1 ) VALUES ( 4, 5, 1 );
    INSERT INTO int_compound ( pkey, ckey1, data1 ) VALUES ( 5, 1, 2 );
  3. Create a text file named normalfill.csv that contains this data.

    6,7,8
    7,8,6
    8,6,7
  4. Put the CSV file into CFS. For example, on Linux:

    $ bin/dse hadoop fs -put mypath/normalfill.csv /
  5. Start the Spark shell.

  6. Verify that Spark can access the int_ks keyspace:

    scala> :showSchema int_ks
    ========================================
     Keyspace: int_ks
    ========================================
     Table: int_compound
    ----------------------------------------
     - pkey  : Int (partition key column)
     - ckey1 : Int (clustering column)
     - data1 : Int

    int_ks appears in the list of keyspaces.

  7. Read in the file from CFS, splitting it on the comma delimiter. Transform each element into an Integer.

    scala> val normalfill = sc.textFile("/normalfill.csv").map(line => line.split(",").map(_.toInt));
    normalfill: org.apache.spark.rdd.RDD[Array[Int]] = MappedRDD[2] at map at console:22

    Alternatively, read in the file from the local file system.

    scala> val file = sc.textFile("file:///local-path/normalfill.csv")
    file: org.apache.spark.rdd.RDD[String] = MappedRDD[4] at textFile at console:22
  8. Check that Spark can find and read the CSV file.

    scala> normalfill.take(1);
    res2: Array[Array[Int]] = Array(Array(6, 7, 8))
  9. Save the new data to the database.

    scala> normalfill.map(line => (line(0), line(1), line(2))).saveToCassandra(
     "int_ks", "int_compound", Seq("pkey", "ckey1", "data1"))
    
    scala>

    The step produces no output.

  10. Check that the data was saved using cqlsh.

    SELECT * FROM int_ks.int_compound;
    
     pkey | ckey1 | data1
    ------+-------+-------
        5 |     1 |     2
        1 |     2 |     3
        8 |     6 |     7
        2 |     3 |     4
        4 |     5 |     1
        7 |     8 |     6
        6 |     7 |     8
        3 |     4 |     5
    
      (8 rows)

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com