Sparkを使用した外部HDFSデータのCassandraへの読み込み

Hadoop HDFSデータは、Sparkを使用してDataStax Enterprise Analyticsノードからアクセスし、Cassandraテーブルに保存できます。

以下のタスクは、DSE AnalyticsノードでSparkを使用してHadoopデータにアクセスし、それをCassandraに保存する方法を示します。

Hadoopデータへのアクセスを簡素化するために、Hadoopクラスターと対話するためのRESTベースのサーバーであるWebHDFSが使用されます。WebHDFSはデータ・ノードへのリダイレクト要求を処理するため、すべてのDSE AnalyticsノードはHadoopノードのホスト名を使用してすべてのHDFSノードにルーティングできるようになっている必要があります。

以下の手順では気象データの例を使用していますが、その原理はCassandraに格納可能なあらゆる種類のHadoopデータに適用できます。

始める前に

必要なもの:

  • 有効なHDFSとWebHDFSを実行しているHadoopの実稼働環境のインストール。Hadoopを実行しているマシンのホスト名が必要で、クラスターはDataStax Enterpriseクラスター内のDSE Analyticsノードからアクセスできるようになっている必要があります。
  • 有効なDSE Analyticsノードを含む実稼働環境のDataStax Enterpriseクラスター。
  • DSE Analyticsノードにインストールされているgit。

手順

  1. テスト・データを含むGitHubリポジトリを複製します。
    git clone https://github.com/brianmhess/DSE-Spark-HDFS.git
  2. WebHDFSを使用して、最高気温テスト・データをHadoopクラスターに読み込みます。

    この例では、Hadoopノードのホスト名はhadoopNode.example.comです。これをHadoopクラスター内のノードのホスト名に置換します。

    dse hadoop fs -mkdir webhdfs://hadoopNode.example.com:50070/user/guest/data
    $ dse hadoop fs -copyFromLocal data/sftmax.csv webhdfs://hadoopNode:50070/user/guest/data/sftmax.csv
  3. cqlshを使用してキースペースとテーブルを作成し、最低気温テスト・データを読み込みます。
    cqlsh -e "CREATE KEYSPACE IF NOT EXISTS spark_ex2 WITH REPLICATION = { 'class':'SimpleStrategy', 'replication_factor':1}"
    $ cqlsh -e "DROP TABLE IF EXISTS spark_ex2.sftmin"
    $ cqlsh -e "CREATE TABLE IF NOT EXISTS spark_ex2.sftmin(location TEXT, year INT, month INT, day INT, tmin DOUBLE, datestring TEXT, PRIMARY KEY ((location), year, month, day)) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC)"
    $ cqlsh -e "COPY spark_ex2.sftmin(location, year, month, day, tmin, datestring) FROM 'data/sftmin.csv'"
    
  4. dse hadoop fsを使用してデータと対話することで、HDFSデータにアクセスできることを確認します。

    次のコマンドは、HDFSデータの行数をカウントします。

    dse hadoop fs -cat webhdfs://hadoopNode.example.com:50070/user/guest/data/sftmax.csv | wc -l
    

    出力は以下のようになります。

    16/05/10 11:21:51 INFO snitch.Workload:Setting my workload to Cassandra
    3606
  5. Sparkコンソールを起動し、DataStax Enterpriseクラスターに接続します。
    dse spark

    Spark Cassandraコネクターをインポートしてセッションを作成します。

    import com.datastax.spark.connector.cql.CassandraConnector
    val connector = CassandraConnector(csc.conf)
    val session = connector.openSession()
  6. 最高気温データを格納するためのテーブルを作成します。
    session.execute(s"DROP TABLE IF EXISTS spark_ex2.sftmax")
    session.execute(s"CREATE TABLE IF NOT EXISTS spark_ex2.sftmax(location TEXT, year INT, month INT, day INT, tmax DOUBLE, datestring TEXT, PRIMARY KEY ((location), year, month, day)) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC)")
    
  7. Spark RDDをHDFS最高気温データから作成し、Cassandraテーブルに保存します。

    まず、最高気温センサー・データを表すcaseクラスを作成します。

    case class Tmax(location:String, year:Int, month:Int, day:Int, tmax:Double, datestring:String)

    データをRDDに読み込みます。

    val tmax_raw = sc.textFile("webhdfs://sandbox.hortonworks.com:50070/user/guest/data/sftmax.csv")

    データを変換し、RDD内の各レコードがTmax caseクラスのインスタンスになるようにします。

    val tmax_c10 = tmax_raw.map(x=>x.split(",")).map(x => Tmax(x(0), x(1).toInt, x(2).toInt, x(3).toInt, x(4).toDouble, x(5)))

    caseクラスのインスタンスをカウントし、レコードの数に一致することを確認します。

    tmax_c10.count
    res11:Long = 3606

    caseクラスのインスタンスをCassandraに保存します。

    tmax_c10.saveToCassandra("spark_ex2", "sftmax")
  8. CQLを使用して行をカウントすることで、レコードが一致することを確認します。
    session.execute("SELECT COUNT(*) FROM spark_ex2.sftmax").all.get(0).getLong(0)
    res23:Long = 3606
  9. 最高気温データと最低気温データをCassandraの新しいテーブルに結合します。

    最低気温センサー・データを格納するためのTmin caseクラスを作成します。

    case class Tmin(location:String, year:Int, month:Int, day:Int, tmin:Double, datestring:String)
    val tmin_raw = sc.cassandraTable("spark_ex2", "sftmin")
    val tmin_c10 = tmin_raw.map(x => Tmin(x.getString("location"), x.getInt("year"), x.getInt("month"), x.getInt("day"), x.getDouble("tmin"), x.getString("datestring")))
    

    RDDを結合するには、これらがPairRDDであり、ペアの最初の要素が結合キーである必要があります。

    val tmin_pair = tmin_c10.map(x=>(x.datestring,x))
    val tmax_pair = tmax_c10.map(x=>(x.datestring,x))

    最高気温と最低気温の差を格納するためのTHiLoDelta caseクラスを作成します。

    case class THiLoDelta(location:String, year:Int, month:Int, day:Int, hi:Double, low:Double, delta:Double, datestring:String)

    PairRDDjoin操作を使用してデータを結合し、結合したデータをTHiLoDelta caseクラスに変換します。

    val tdelta_join1 = tmax_pair1.join(tmin_pair1)
    val tdelta_c10 = tdelta_join1.map(x => THiLoDelta(x._2._1._1, x._2._1._2, x._2._1._3, x._2._1._4, x._2._1._5, x._2._2._5, x._2._1._5 - x._2._2._5, x._1))
    

    CQLを使用して、温度差データを格納するための新しいテーブルをSpark内に作成します。

    session.execute(s"DROP TABLE IF EXISTS spark_ex2.sftdelta")
    session.execute(s"CREATE TABLE IF NOT EXISTS spark_ex2.sftdelta(location TEXT, year INT, month INT, day INT, hi DOUBLE, low DOUBLE, delta DOUBLE, datestring TEXT, PRIMARY KEY ((location), year, month, day)) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC)")
    

    温度差データをCassandraに保存します。

    tdelta_c10.saveToCassandra("spark_ex2", "sftdelta")