Getting started with PySpark
DataStax Enterprise 4.6 supports the Spark Python API (PySpark) that exposes the Spark programming model to Python.
DataStax Enterprise 4.6 supports the Spark Python API (PySpark) that exposes the Spark programming model to Python. You can use PySpark interactively from the command line.
Limitations
- Predicate pushdown and column selection is not supported in this release.
- Blobs cannot be used as set or map keys.
Data types
The following table lists CQL and corresponding Python data types.
CQL Type | Python Type | CQL Description |
---|---|---|
null | None | |
ascii | str/unicodr | US-ASCII character string |
bigint | long | 64-bit signed long |
blob | bytearray | Arbitrary bytes (no validation), expressed as hexadecimal |
boolean | bool | true or false |
counter | long | Distributed counter value (64-bit long) |
decimal | decimal | Variable-precision decimal Java type |
double | float | 64-bit IEEE-754 floating point Java type |
float | float | 32-bit IEEE-754 floating point Java type |
inet | str/unicode | IP address string in IPv4 or IPv6 format, used by the python-cql driver and CQL native protocols |
int | int | 32-bit signed integer |
list | list | A collection of one or more ordered elements |
map | dict | A JSON-style array of literals: { literal : literal, literal : literal ... } |
set | set | A collection of one or more elements |
text | str/unicode | UTF-8 encoded string |
timestamp | datetime.datetime | Date plus time, encoded as 8 bytes since epoch |
timeuuid | str/unicode | Type 1 UUID only |
uuid | str/unicode | A UUID in standard UUID format |
varchar | str/unicode | UTF-8 encoded string |
varint | long | Arbitrary-precision integer Java type |
On the Python side, both str/unicode and uuid.UUID types that represent UUIDs are properly mapped to Cassandra uuid or timeuuid type when saved as RDDs. However, Cassandra uuids are converted to str/unicode when read from Python.
PySpark prerequisites
- Python 2.6 or 2.7
- DSE 4.6 only:
Tarball and GUI-no services installations require the DataStax Enterprise startup
directory in the value of the PATH environment variable. On Linux, for example you set
the PATH environment variable as
follows:
export PATH=$PATH:~/install_location/dse-4.6.0/bin/
Note: DSE 4.6.1 and later releases do not require the DataStax Enterprise startup directory in the value of the PATH environment variable. - Start a DataStax Enterprise node in Spark mode.
Insert Cassandra data
- Start cqlsh.
$ cqlsh Connected to Test Cluster at 127.0.0.1:9160. [cqlsh 4.1.1 | Cassandra 2.0.11.83 | DSE 4.6.0 | CQL spec 3.1.1 | Thrift protocol 19.39.0] Use HELP for help.
- In cqlsh, create a keyspace and two tables in Cassandra using the Analytics data
center
name.
CREATE KEYSPACE IF NOT EXISTS test WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'Analytics' : 1}; USE test; CREATE TABLE kv (key int PRIMARY KEY, value text); CREATE TABLE kv2 (key int PRIMARY KEY, value text);
- Insert data into kv table only.
INSERT INTO kv (key, value) VALUES (1, 'abc'); INSERT INTO kv (key, value) VALUES (2, 'def');
The schema for both tables and the data for table kv exists in Cassandra before starting PySpark.
Access the data using PySpark
- Start PySpark using one of the following commands:
- Installer-Services and Package installations:
dse pyspark
- Installer-No Services and Tarball installations:
install_location/bin/dse pyspark
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.1.0 /_/ Using Python version 2.7.5 (default, Mar 9 2014 22:15:05) SparkContext available as sc. >>>
- Installer-Services and Package installations:
- Call the cassandraTable method to obtain an RDD representing the Cassandra table
test.kv.
>>> rdd = sc.cassandraTable("test", "kv") >>>
Cassandra rows are converted to Python objects of class pyspark.sql.Row, which allows for dictionary-like lookup of column values as well as directly formatting the rows as object fields.
- Query Cassandra.
>>> rdd.first() Row(key=2, value=u'def') >>> rdd.first().key 2 >>> rdd.first().value u'def' >>> rdd.first()[0] 2 >>> rdd.first()[1] u'def' >>> rdd.collect() [Row(key=2, value=u'def'), Row(key=1, value=u'abc')] >>> rdd.filter(lambda row: row.key > 1).collect() [Row(key=2, value=u'def')] >>>
- Call saveToCassandra and pass a keyspace, table, and optionally a list of columns on any
rdd of dictionary or pysparksql.Row objects. For example, save the "key" columns from the
kv table to the kv2 table.
>>> rdd = sc.cassandraTable("test", "kv") >>> rdd.saveToCassandra("test", "kv2", ["key"])
- In cqlsh, confirm that the keys from kv1 were added to table
kv2.
SELECT * FROM kv2; key | value ----+------- 1 | null 2 | null (2 rows)
- In PySpark, copy all columns from table kv to table kv2.
rdd.saveToCassandra("test", "kv2")
- In cqlsh, take a look at the kv2 table in cqlsh at this point to confirm that kv2 has
both keys and values now.
SELECT * FROM kv2; key | value -----+------- 1 | abc 2 | def (2 rows)
Insert new data into Cassandra from PySpark
- Distribute the current Python collection to form an RDD. The source RDD does not need to
be in
Cassandra.
otherRdd = sc.parallelize([{ "key": 3, "value": "foobar" }])
- Save otherRdd to the kv2 table in Cassandra.
>>> otherRdd.saveToCassandra("test", "kv2")
These steps add the key 3 and the value foobar to table k2.
- In cqlsh, select all the data from kv2 to confirm the addition of the
data.
SELECT * FROM kv2; key | value -----+-------- 1 | abc 2 | def 3 | foobar (3 rows)
Run a Python script using dse spark-submit
You run a Python script using the spark-submit command. For example, create the following file and save it as standalone.py:
#standalone.py from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName("Stand Alone Python Script") sc = SparkContext(conf=conf) x = sc.cassandraTable("test","kv").collect() print x
DataStax Enterprise sets cassandra.connection.host vars environment variables, eliminating the need to set the variables in the python file. Assuming you set up the kv table in the last example, execute standalone.py. On Linux, for example, from the installation directory, execute standalone.py as follows:
$ bin/dse spark-submit /<path>/standalone.py
Run Python jobs independently
If you create test-suites with nose or other tools that run raw Python code, you cannot explicitly call dse spark-submit. You can execute Python scripts independent of DataStax Enterprise using one or the other of these methods.
- Set the PYTHONPATH environment variable to the path of the DataStax Enterprise integrated python executable, as shown in the next section.
- Use an initialization file that you modify to describe your environment and then run, as shown in the example later.
These methods work with the DataStax Enterprise integrated Spark, not the open source version.
Configure PYTHONPATH to run jobs
This procedure uses the PYTHONPATH environment variable to find and use PySpark to execute a python job.
Run a Python job using PYTHONPATH
- Set the PYTHONPATH environment variable to the location of the Python executable in
your DataStax Enterprise Spark environment:
- Installer-Services and Package installations: /usr/share/dse/spark/python
- Installer-No Services and Tarball installations: install_location/resources/spark/python
- Run a python script. For example, run the standalone.py file you created in "Running a python script using dse
spark-submit."
python /<path>/standalone.py
Ths output is:[Row(key=1, value=u'abc'), Row(key=2, value=u'def')]
Create an initialization file to run a Python job
You can create an initialization file to run python jobs independent of DataStax Enterprise.
Use the __name__.py convention to name the initialization file. For example, name the file __init__.py.
An initialization file on a tarball installation might look like this:
import os from os import getenv from os.path import join import sys from subprocess import check_output HOME = getenv("HOME") DSE_HOME = getenv("DSE_HOME",join(HOME,"dse-4.6.0")) SPARK_HOME = join(DSE_HOME,"resources","spark") os.environ['SPARK_HOME']=SPARK_HOME PYSPARK_DIR = join(DSE_HOME,"resources","spark","python") ADD_PATH = [ PYSPARK_DIR ] for PATH in ADD_PATH: if PATH not in sys.path: sys.path.insert(1,PATH)
- Use HOME or USERPROFILE, depending on your operating system.
- Change the DSE_HOME definition to match the location of your DataStax Enterprise installation.
- Modify SPARK_HOME to match the location of Spark resources:
- Installer-Services and Package installations:
SPARK_HOME = join(DSE_HOME,"resources","spark")
matches location/etc/dse/spark - Installer-No Services and Tarball installations:
SPARK_HOME = join(DSE_HOME,"resources","spark")
matches location install_location/resources/spark
- Installer-Services and Package installations:
Use the initialization file to run jobs
This procedure uses the initialization file shown in the last section to set up a number of environment variables and execute a sample Python job.
- Create a directory and subdirectory for storing the python scripts. For example, create a directory named example that has a subdirectory named connector.
- Create the example initialization file. For example, create the __init__.py file shown in the last section, and save it to example/connector directory.
- Create scripts to configure Spark and query the kv table in Cassandra from Spark. For
example, create the a script named connector.py and save it to the
example/connector
directory.
#connector.py from pyspark import SparkContext, SparkConf def getSC(): conf = SparkConf().setAppName("Stand Alone Python Script") sc = SparkContext(conf=conf) return sc
- Create another script named moduleexample.py, for example, to print
the results. Save moduleexample.py to the example directory.
#!/usr/local/bin/python from connector.connector import getSC sc = getSC() print sc.cassandraTable("test","kv").collect()
The example directory now contains these files:- moduleexample.py
- connector/_init_.py
- connector/connector.py
- Execute the python job as the same user that launched DataStax Enterprise.
$ python example/moduleexample.py
The output of the kv table appears.
[Row(key=1, value=u'abc'), Row(key=2, value=u'def')]