Loading external HDFS data into the database using Spark

This task demonstrates how to access Hadoop data and save it to the database using Spark on DSE Analytics nodes.

To simplify accessing the Hadoop data, it uses WebHDFS, a REST-based server for interacting with a Hadoop cluster. WebHDFS handles redirect requests to the data nodes, so every DSE Analytics node needs to be able to route to every HDFS node using the Hadoop node’s hostname.

These instructions use example weather data, but the principles can be applied to any kind of Hadoop data that can be stored in the database.

You will need:

  • A working Hadoop installation with HDFS and WebHDFS enabled and running. You will need the hostname of the machine on which Hadoop is running, and the cluster must be accessible from the DSE Analytics nodes in your DataStax Enterprise cluster.

  • A running DataStax Enterprise cluster with DSE Analytics nodes enabled.

  • Git installed on a DSE Analytics node.

    1. Clone the GitHub repository containing the test data.

      $ git clone https://github.com/brianmhess/DSE-Spark-HDFS.git
    2. Load the maximum temperature test data into the Hadoop cluster using WebHDFS.

      In this example, the Hadoop node has a hostname of hadoopNode.example.com. Replace it with the hostname of a node in your Hadoop cluster.

      $ dse hadoop fs -mkdir webhdfs://hadoopNode.example.com:50070/user/guest/data
      $ dse hadoop fs -copyFromLocal data/sftmax.csv webhdfs://hadoopNode:50070/user/guest/data/sftmax.csv
    3. Create the keyspace and table and load the minimum temperature test data using cqlsh.

      $ cqlsh -e "CREATE KEYSPACE IF NOT EXISTS spark_ex2 WITH REPLICATION = { 'class':'SimpleStrategy', 'replication_factor':1}"
      $ cqlsh -e "DROP TABLE IF EXISTS spark_ex2.sftmin"
      $ cqlsh -e "CREATE TABLE IF NOT EXISTS spark_ex2.sftmin(location TEXT, year INT, month INT, day INT, tmin DOUBLE, datestring TEXT, PRIMARY KEY ((location), year, month, day)) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC)"
      $ cqlsh -e "COPY spark_ex2.sftmin(location, year, month, day, tmin, datestring) FROM 'data/sftmin.csv'"
    4. Ensure that we can access the HDFS data by interacting with the data using dse hadoop fs.

      The following command counts the number of lines of HDFS data.

      $ dse hadoop fs -cat webhdfs://hadoopNode.example.com:50070/user/guest/data/sftmax.csv | wc -l

      You should see output similar to the following:

      $ 16/05/10 11:21:51 INFO snitch.Workload: Setting my workload to Cassandra 3606
    5. Start the Spark console and connect to the DataStax Enterprise cluster.

      $ dse spark

      Import the Spark Cassandra connector and create the session.

      import com.datastax.spark.connector.cql.CassandraConnector
      val connector = CassandraConnector(csc.conf)
      val session = connector.openSession()
    6. Create the table to store the maximum temperature data.

      session.execute(s"DROP TABLE IF EXISTS spark_ex2.sftmax")
      session.execute(s"CREATE TABLE IF NOT EXISTS spark_ex2.sftmax(location TEXT, year INT, month INT, day INT, tmax DOUBLE, datestring TEXT, PRIMARY KEY ((location), year, month, day)) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC)")
    7. Create a Spark RDD from the HDFS maximum temperature data and save it to the table.

      First create a case class representing the maximum temperature sensor data:

      case class Tmax(location: String, year: Int, month: Int, day: Int, tmax: Double, datestring: String)

      Read the data into an RDD.

      val tmax_raw = sc.textFile("webhdfs://sandbox.hortonworks.com:50070/user/guest/data/sftmax.csv")

      Transform the data so each record in the RDD is an instance of the Tmax case class.

      val tmax_c10 = tmax_raw.map(x=>x.split(",")).map(x => Tmax(x(0), x(1).toInt, x(2).toInt, x(3).toInt, x(4).toDouble, x(5)))

      Count the case class instances to make sure it matches the number of records.

      res11: Long = 3606

      Save the case class instances to the database.

      tmax_c10.saveToCassandra("spark_ex2", "sftmax")
    8. Verify the records match by counting the rows using CQL.

      session.execute("SELECT COUNT(*) FROM spark_ex2.sftmax").all.get(0).getLong(0)
      res23: Long = 3606
    9. Join the maximum and minimum data into a new table.

      Create a Tmin case class to store the minimum temperature sensor data.

      case class Tmin(location: String, year: Int, month: Int, day: Int, tmin: Double, datestring: String)
      val tmin_raw = sc.cassandraTable("spark_ex2", "sftmin")
      val tmin_c10 = tmin_raw.map(x => Tmin(x.getString("location"), x.getInt("year"), x.getInt("month"), x.getInt("day"), x.getDouble("tmin"), x.getString("datestring")))

      In order to join RDDs, they need to be PairRDDs, with the first element in the pair being the join key.

      val tmin_pair = tmin_c10.map(x=>(x.datestring,x))
      val tmax_pair = tmax_c10.map(x=>(x.datestring,x))

      Create a THiLoDelta case class to store the difference between the maximum and minimum temperatures.

      case class `THiLoDelta`(location: `String`, year: `Int`, month: `Int`, day: `Int`, hi: `Double`, low: `Double`, delta: `Double`, datestring: `String`)

      Join the data using the join operation on the PairRDDs. Convert the joined data to the THiLoDelta case class.

      val tdelta_join1 = tmax_pair1.join(tmin_pair1)
      val tdelta_c10 = tdelta_join1.map(x => THiLoDelta(x._2._1._1, x._2._1._2, x._2._1._3, x._2._1._4, x._2._1._5, x._2._2._5, x._2._1._5 - x._2._2._5, x._1))

      Create a new table within Spark using CQL to store the temperature difference data.

      session.execute(s"DROP TABLE IF EXISTS spark_ex2.sftdelta")
      session.execute(s"CREATE TABLE IF NOT EXISTS spark_ex2.sftdelta(location TEXT, year INT, month INT, day INT, hi DOUBLE, low DOUBLE, delta DOUBLE, datestring TEXT, PRIMARY KEY ((location), year, month, day)) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC)")

      Save the temperature difference data to the table.

      tdelta_c10.saveToCassandra("spark_ex2", "sftdelta")

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com