Using the DataFrames API
The Spark DataFrames API encapsulates data sources, including DataStax Enterprise data, organized into named columns.
The Spark Cassandra Connector provides an integrated DataSource to simplify creating DataFrames. For more technical details, see the Spark Cassandra Connector documentation that is maintained by DataStax and the Cassandra and PySpark DataFrames post.
Examples of using the DataFrames API
This Python example shows using the DataFrames API to read from the table ks.kv
and insert into a different table ks.othertable
.
dse pyspark
table1 = spark.read.format("org.apache.spark.sql.cassandra")
.options(table="kv", keyspace="ks")
.load()
table1.write.format("org.apache.spark.sql.cassandra")
.options(table="othertable", keyspace = "ks")
.save(mode ="append")
Using the DSE Spark console, the following Scala example shows how to create a DataFrame object from one table and save it to another.
dse spark
val table1 = spark.read.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "words", "keyspace" -> "test"))
.load()
table1.createCassandraTable("test", "otherwords", partitionKeyColumns = Some(Seq("word")), clusteringKeyColumns = Some(Seq("count")))
table1.write.cassandraFormat("otherwords", "test").save()
The write operation uses one of the helper methods, cassandraFormat, included in the Spark Cassandra Connector. This is a simplified way of setting the format and options for a standard DataFrame operation. The following command is equivalent to write operation using cassandraFormat:
table1.write.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "othertable", "keyspace" -> "test"))
.save()
Reading and writing TTL and write time properties
Time to live (TTL) and writetime values can be read and written using DataFrames.
There are two ways of reading TTL and writetime values: DataFrame functions, and DataFrame options.
Use the ttl.<column_name>
option and set the name of the column in the schema to read the TTL value.
INSERT INTO test.words (word, count) VALUES ('yikes', 30) USING TTL 99999;
val df = spark
.read
.format("org.apache.spark.sql.cassandra")
.option("ttl.count", "ttlOfCount") // adds a ttlOfCount column to the schema
.options(
Map(
"table" -> "words",
"keyspace" -> "test"
)
)
.load()
.show()
Similarly, the writetime.<column_name>
option is used to read the time the data was last modified.
val df = spark
.read
.format("org.apache.spark.sql.cassandra")
.option("writetime.count", "writetimeOfCount") // adds a writetimeOfV column to the schema
.options(
Map(
"table" -> "words",
"keyspace" -> "test"
)
)
.load()
.show()
There are ttl
and writeTime
helper functions that can be automatically loaded with:
import org.apache.spark.sql.cassandra._
Starting the Spark shell with dse spark
automatically adds the import.
Use the functions with a select
.
val df = spark.read.cassandraFormat("words", "test").load()
df.select(ttl("count")).show()
df.select(writeTime("count")).show()
The same read functions can be used within a Spark SQL query, and are loaded automatically, requiring no import statements.
spark.sql("SELECT ttl(count)) FROM test.words").show()
spark.sql("SELECT writeTime(count) FROM test.words").show()
Writing TTL and writetime values can be done using functions or options, and must be done when updating a full row, not per column.
This example uses the writetime
and ttl
options:
import java.time.LocalDateTime;
import java.time.temporal.ChronoField;
val df = spark.read.cassandraFormat("words", "test").load()
val ts = LocalDateTime.now().getLong(ChronoField.MICRO_OF_SECOND)
df.write.cassandraFormat("words","test").option("writetime",ts).mode("APPEND").save()
df.select(writeTime("count")).show()
df.write.cassandraFormat("words","test").option("ttl",999999).mode("APPEND").save()
df.select(ttl("count")).show(
If you added the import described earlier, use the helper write functions withTTL
and withWriteTime
.
import java.time.LocalDateTime;
import java.time.temporal.ChronoField;
val df = spark.read.cassandraFormat("words", "test").load()
val ts = LocalDateTime.now().getLong(ChronoField.MICRO_OF_SECOND)
df.write.cassandraFormat("words","test").withWriteTime(ts).mode("APPEND").save()
df.select(writeTime("count")).show()
df.write.cassandraFormat("words","test").withTTL(999999).mode("APPEND").save()
df.select(ttl("count")).show(
Write functions are not supported in Spark SQL.
See Using TTL and writetime in DseGraphFrame for information on reading and writing TTL and write time values with DataStax Graph.