Getting started with PySpark
DataStax Enterprise supports the Spark Python API (PySpark) that exposes the Spark programming model to Python. You can use PySpark interactively from the command line.
DataStax Enterprise supports the Spark Python API (PySpark) that exposes the Spark programming model to Python. You can use PySpark interactively from the command line.
Limitations
- Predicate pushdown and column selection is not supported in this release.
- Blobs cannot be used as set or map keys.
Data types
The following table lists CQL and corresponding Python data types.
CQL Type | Python Type | CQL Description |
---|---|---|
null | None | |
ascii | str/unicodr | US-ASCII character string |
bigint | long | 64-bit signed long |
blob | bytearray | Arbitrary bytes (no validation), expressed as hexadecimal |
boolean | bool | true or false |
counter | long | Distributed counter value (64-bit long) |
decimal | decimal | Variable-precision decimal Java type |
double | float | 64-bit IEEE-754 floating point Java type |
float | float | 32-bit IEEE-754 floating point Java type |
inet | str/unicode | IP address string in IPv4 or IPv6 format, used by the python-cql driver and CQL native protocols |
int | int | 32-bit signed integer |
list | list | A collection of one or more ordered elements |
map | dict | A JSON-style array of literals: { literal : literal, literal : literal ... } |
set | set | A collection of one or more elements |
text | str/unicode | UTF-8 encoded string |
timestamp | datetime.datetime | Date plus time, encoded as 8 bytes since epoch |
timeuuid | str/unicode | Type 1 UUID only |
uuid | str/unicode | A UUID in standard UUID format |
varchar | str/unicode | UTF-8 encoded string |
varint | long | Arbitrary-precision integer Java type |
On the Python side, both str/unicode
and uuid.UUID
types
that represent UUIDs are properly mapped to Cassandra uuid
or
timeuuid
type when saved as RDDs. However, Cassandra uuids are converted to
str/unicode when read from Python.
PySpark prerequisites
- Python 2.6 or 2.7
- Start a DataStax Enterprise node in Spark mode.
Insert Cassandra data
- Start cqlsh.
$ cqlsh Connected to Test Cluster at 127.0.0.1:9160. [cqlsh 4.1.1 | Cassandra 2.1.0.0 | DSE 4.7.0 | CQL spec 3.1.1 | Thrift protocol 19.39.0] Use HELP for help.
- In cqlsh, create a keyspace and two tables in Cassandra using the Analytics data
center
name.
CREATE KEYSPACE IF NOT EXISTS test WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'Analytics' : 1}; USE test; CREATE TABLE kv (key int PRIMARY KEY, value text); CREATE TABLE kv2 (key int PRIMARY KEY, value text);
- Insert data into kv table only.
The schema for both tables and the data for table kv exists in Cassandra before starting PySpark.INSERT INTO kv (key, value) VALUES (1, 'abc'); INSERT INTO kv (key, value) VALUES (2, 'def');
Access the data using PySpark
- Start PySpark using one of the following commands:
- Installer-Services and Package installations:
dse pyspark
- Installer-No Services and Tarball installations:
install_location/bin/dse pyspark
Note: The directory in which you run the dse Spark commands must be writable by the current user.The Spark prompt will appear. - Installer-Services and Package installations:
- Call the cassandraTable method to obtain an RDD representing the
Cassandra table
test.kv
.rdd = sc.cassandraTable("test", "kv")
Cassandra rows are converted to Python objects of class pyspark.sql.Row, which allows for dictionary-like lookup of column values as well as directly formatting the rows as object fields.
- Query Cassandra.
rdd.first() Row(key=2, value=u'def') rdd.first().key 2 rdd.first().value u'def' rdd.first()[0] 2 rdd.first()[1] u'def' rdd.collect() [Row(key=2, value=u'def'), Row(key=1, value=u'abc')] rdd.filter(lambda row: row.key > 1).collect() [Row(key=2, value=u'def')]
- Call saveToCassandra and pass a keyspace, table, and optionally a
list of columns on any RDD of dictionary or pysparksql.Row objects. For
example, save the
key
columns from thekv
table to thekv2
table.rdd = sc.cassandraTable("test", "kv") rdd.saveToCassandra("test", "kv2", ["key"])
- In cqlsh, confirm that the keys from
kv1
were added to tablekv2
.SELECT * FROM kv2; key | value ----+------- 1 | null 2 | null (2 rows)
- In PySpark, copy all columns from table
kv
to tablekv2
.rdd.saveToCassandra("test", "kv2")
- In cqlsh, take a look at the
kv2
table to confirm thatkv2
has both keys and values now.SELECT * FROM kv2; key | value -----+------- 1 | abc 2 | def (2 rows)
Insert new data into Cassandra from PySpark
- Distribute the current Python collection to form an RDD. The source RDD does not need to
be in
Cassandra.
otherRdd = sc.parallelize([{ "key": 3, "value": "foobar" }])
- Save
otherRdd
to thekv2
table in Cassandra.otherRdd.saveToCassandra("test", "kv2")
These steps add the key
3
and the valuefoobar
to tablek2
. - In cqlsh, select all the data from
kv2
to confirm the addition of the data.SELECT * FROM kv2; key | value -----+-------- 1 | abc 2 | def 3 | foobar (3 rows)
Run a Python script using dse spark-submit
You run a Python script using the spark-submit command. For example, create the following file and save it as standalone.py:
#standalone.py
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("Stand Alone Python Script")
sc = SparkContext(conf=conf)
x = sc.cassandraTable("test","kv").collect()
print x
DataStax Enterprise sets the cassandra.connection.host
environment
variable, eliminating the need to set the variable in the Python file. Assuming you set up
the kv
table in the last example, execute
standalone.py. On Linux, for example, from the installation
directory, execute standalone.py as follows:
$ bin/dse spark-submit /<path>/standalone.py
Run Python jobs independently
If you create test-suites with nose or other tools that run raw Python code, you cannot explicitly call dse spark-submit. You can execute Python scripts independent of DataStax Enterprise using one or the other of these methods.
- Set the
PYTHONPATH
environment variable to the path of the DataStax Enterprise integrated python executable, as shown in the next section. - Use an initialization file that you modify to describe your environment and then run, as shown in the example later.
These methods work with the DataStax Enterprise integrated Spark, not the open source version.
Configure PYTHONPATH
to run jobs
This procedure uses the PYTHONPATH
environment variable to find and use
PySpark to execute a Python job.
Run a Python job using PYTHONPATH
- Set the
PYTHONPATH
environment variable to the location of the Python executable in your DataStax Enterprise Spark environment.The default Python location depends on the type of installation:Installer-Services and Package installations /usr/share/dse/spark/python Installer-No Services and Tarball installations install_location/resources/spark/python - Run the Python script. For example, run the standalone.py file that
you created in "Running a python script using dse
spark-submit."
$ python /<path>/standalone.py
The output is:[Row(key=1, value=u'abc'), Row(key=2, value=u'def')]
Create an initialization file to run a Python job
You can create an initialization file to run Python jobs independent of DataStax Enterprise.
Use the __name__.py convention to name the initialization file. For example, name the file __init__.py.
import os
from os import getenv
from os.path import join
import sys
from subprocess import check_output
HOME = getenv("HOME")
DSE_HOME = getenv("DSE_HOME",join(HOME,"dse-4.7.0"))
SPARK_HOME = join(DSE_HOME,"resources","spark")
os.environ['SPARK_HOME']=SPARK_HOME
PYSPARK_DIR = join(DSE_HOME,"resources","spark","python")
ADD_PATH = [ PYSPARK_DIR ]
for PATH in ADD_PATH:
if PATH not in sys.path:
sys.path.insert(1,PATH)
- Use
HOME
orUSERPROFILE
, depending on the operating system. - Change the
DSE_HOME
definition to match the location of the DataStax Enterprise installation. - Modify
SPARK_HOME
to match the location of Spark resources:- Installer-Services and Package installations:
SPARK_HOME = join(DSE_HOME,"resources","spark")
matches /etc/dse/spark - Installer-No Services and Tarball installations:
SPARK_HOME = join(DSE_HOME,"resources","spark")
matches the install_location/resources/spark location.
- Installer-Services and Package installations:
Use the initialization file to run jobs
This procedure uses the initialization file shown in the last section to set up a number of environment variables and execute a sample Python job.
- Create a directory and subdirectory for storing the python scripts. For example, create a directory named example that has a subdirectory named connector.
- Create the example initialization file. For example, create the __init__.py file shown in the last section, and save it to example/connector directory.
- Create scripts to configure Spark and query the
kv
table in Cassandra from Spark. For example, create the a script named connector.py and save it to the example/connector directory.#connector.py from pyspark import SparkContext, SparkConf def getSC(): conf = SparkConf().setAppName("Stand Alone Python Script") sc = SparkContext(conf=conf) return sc
- Create another script named moduleexample.py, for example, to print
the results. Save moduleexample.py to the
example
directory.
#!/usr/local/bin/python from connector.connector import getSC sc = getSC() print sc.cassandraTable("test","kv").collect()
The example directory now contains these files:- moduleexample.py
- connector/_init_.py
- connector/connector.py
- Execute the Python job as the same user that launched DataStax Enterprise.
$ python example/moduleexample.py
The output of the
kv
table appears.[Row(key=1, value=u'abc'), Row(key=2, value=u'def')]