Sparkを使用したデータベースへの外部HDFSデータの読み込み

Hadoop HDFSデータは、DataStax Enterprise Analyticsノードからアクセスして、Sparkを使用してデータベース・テーブルに保存できます。

このタスクでは、Hadoopデータにアクセスして、DSE AnalyticsノードでSparkを使用してデータベースに保存する方法を示します。

Hadoopデータへのアクセスを簡略化する目的で、Hadoopクラスターと対話するためのRESTベースのサーバーであるWebHDFSが使用されます。WebHDFSがデータ・ノードへのリダイレクト要求を処理するため、すべてのDSE Analyticsノードは、Hadoopノードのホスト名を使用してすべてのHDFSノードにルーティングできる必要があります。

これらの手順では例として気象データを使用しますが、データベースに格納されるあらゆる種類のHadoopデータにこの原則を適用できます。

始める前に

以下が必要です。

  • HDFSおよびWebHDFSが有効になっていて実行されている、動作中のHadoopのインストール。Hadoopを実行しているマシンのホスト名が必要になり、クラスターにはDataStax Enterpriseクラスター内のDSE Analyticsノードからアクセスできる必要があります。
  • DSE Analyticsノードが有効になっている実行中のDataStax Enterpriseクラスター。
  • DSE AnalyticsノードにインストールされたGit。

手順

  1. テスト・データが含まれているGitHubリポジトリを複製します。
    git clone https://github.com/brianmhess/DSE-Spark-HDFS.git
  2. WebHDFSを使用して最高気温のテスト・データをHadoopクラスターに読み込みます。

    この例では、HadoopノードにはhadoopNode.example.comというホスト名が含まれています。これをHadoopクラスター内のノードのホスト名と置き換えます。

    hadoop fs -mkdir webhdfs://hadoopNode.example.com:50070/user/guest/data && 
    hadoop fs -copyFromLocal data/sftmax.csv webhdfs://hadoopNode:50070/user/guest/data/sftmax.csv
  3. キースペースとテーブルを作成し、cqlshを使用して最高気温のテスト・データを読み込みます。
    CREATE KEYSPACE IF NOT EXISTS spark_ex2 WITH REPLICATION = { 'class':'SimpleStrategy', 'replication_factor':1}
    DROP TABLE IF EXISTS spark_ex2.sftmin
    CREATE TABLE IF NOT EXISTS spark_ex2.sftmin(location TEXT, year INT, month INT, day INT, tmin DOUBLE, datestring TEXT, PRIMARY KEY ((location), year, month, day)) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC)
    COPY spark_ex2.sftmin(location, year, month, day, tmin, datestring) FROM 'data/sftmin.csv'
    
  4. hadoop fsを使用してデータと対話することで、HDFSデータにアクセスできることを確認します。

    以下のコマンドを実行すると、HDFSデータの行数がカウントされます。

    hadoop fs -cat webhdfs://hadoopNode.example.com:50070/user/guest/data/sftmax.csv | wc -l
    出力は以下のようになります。
    16/05/10 11:21:51 INFO snitch.Workload: Setting my workload to Cassandra 3606
  5. Sparkコンソールを起動して、DataStax Enterpriseクラスターに接続します。
    dse spark

    Spark Cassandraコネクターをインポートして、セッションを作成します。

    import com.datastax.spark.connector.cql.CassandraConnector
    val connector = CassandraConnector(csc.conf)
    val session = connector.openSession()
  6. テーブルを作成して、最高気温データを格納します。
    session.execute(s"DROP TABLE IF EXISTS spark_ex2.sftmax")
    session.execute(s"CREATE TABLE IF NOT EXISTS spark_ex2.sftmax(location TEXT, year INT, month INT, day INT, tmax DOUBLE, datestring TEXT, PRIMARY KEY ((location), year, month, day)) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC)")
  7. HDFSの最高気温データからSpark RDDを作成し、これをテーブルに保存します。

    最初に、最高気温のセンサー・データを表すcaseクラスを作成します。

    case class Tmax(location: String, year: Int, month: Int, day: Int, tmax: Double, datestring: String)

    RDDにデータを読み出します。

    val tmax_raw = sc.textFile("webhdfs://sandbox.hortonworks.com:50070/user/guest/data/sftmax.csv")

    RDDの各レコードがTmax caseクラスのインスタンスとなるようにデータを変換します。

    val tmax_c10 = tmax_raw.map(x=>x.split(",")).map(x => Tmax(x(0), x(1).toInt, x(2).toInt, x(3).toInt, x(4).toDouble, x(5)))

    caseクラス・インスタンスをカウントして、レコード数と一致することを確認します。

    tmax_c10.count
    res11: Long = 3606

    caseクラス・インスタンスをデータベースに保存します。

    tmax_c10.saveToCassandra("spark_ex2", "sftmax")
  8. CQLを使用して行数をカウントして、レコード数の一致を検証します。
    session.execute("SELECT COUNT(*) FROM spark_ex2.sftmax").all.get(0).getLong(0)
    res23: Long = 3606
  9. 最大データと最小データを新しいテーブルに結合します。

    Tmin caseクラスを作成して、最低気温のセンサー・データを格納します。

    case class Tmin(location: String, year: Int, month: Int, day: Int, tmin: Double, datestring: String)
    val tmin_raw = sc.cassandraTable("spark_ex2", "sftmin")
    val tmin_c10 = tmin_raw.map(x => Tmin(x.getString("location"), x.getInt("year"), x.getInt("month"), x.getInt("day"), x.getDouble("tmin"), x.getString("datestring")))
    

    RDDを結合するには、RDDがPairRDDである必要があり、ペアの最初の要素が結合キーになります。

    val tmin_pair = tmin_c10.map(x=>(x.datestring,x))
    val tmax_pair = tmax_c10.map(x=>(x.datestring,x))

    THiLoDelta caseクラスを作成して、最高気温と最低気温の差を格納します。

    case class THiLoDelta(location: String, year: Int, month: Int, day: Int, hi: Double, low: Double, delta: Double, datestring: String)

    PairRDDjoin操作を使用して、データを結合します。結合したデータをTHiLoDelta caseクラスに変換します。

    val tdelta_join1 = tmax_pair1.join(tmin_pair1)
    val tdelta_c10 = tdelta_join1.map(x => THiLoDelta(x._2._1._1, x._2._1._2, x._2._1._3, x._2._1._4, x._2._1._5, x._2._2._5, x._2._1._5 - x._2._2._5, x._1))
    

    CQLを使用してSpark内で新しいテーブルを作成し、温度差データを格納します。

    session.execute(s"DROP TABLE IF EXISTS spark_ex2.sftdelta")
    session.execute(s"CREATE TABLE IF NOT EXISTS spark_ex2.sftdelta(location TEXT, year INT, month INT, day INT, hi DOUBLE, low DOUBLE, delta DOUBLE, datestring TEXT, PRIMARY KEY ((location), year, month, day)) WITH CLUSTERING ORDER BY (year DESC, month DESC, day DESC)")
    

    温度差データをテーブルに保存します。

    tdelta_c10.saveToCassandra("spark_ex2", "sftdelta")