Getting started with PySpark

DataStax Enterprise supports the Spark Python API (PySpark) that exposes the Spark programming model to Python. You can use PySpark interactively from the command line.

DataStax Enterprise supports the Spark Python API (PySpark) that exposes the Spark programming model to Python. You can use PySpark interactively from the command line.

Limitations

  • Predicate pushdown and column selection is not supported in this release.
  • Blobs cannot be used as set or map keys.

Data types

The following table lists CQL and corresponding Python data types.

CQL-Python data mapping
CQL Type Python Type CQL Description
null None  
ascii str/unicodr US-ASCII character string
bigint long 64-bit signed long
blob bytearray Arbitrary bytes (no validation), expressed as hexadecimal
boolean bool true or false
counter long Distributed counter value (64-bit long)
decimal decimal Variable-precision decimal

Java type

double float 64-bit IEEE-754 floating point

Java type

float float 32-bit IEEE-754 floating point

Java type

inet str/unicode IP address string in IPv4 or IPv6 format, used by the python-cql driver and CQL native protocols
int int 32-bit signed integer
list list A collection of one or more ordered elements
map dict A JSON-style array of literals: { literal : literal, literal : literal ... }
set set A collection of one or more elements
text str/unicode UTF-8 encoded string
timestamp datetime.datetime Date plus time, encoded as 8 bytes since epoch
timeuuid str/unicode Type 1 UUID only
uuid str/unicode A UUID in standard UUID format
varchar str/unicode UTF-8 encoded string
varint long Arbitrary-precision integer

Java type

On the Python side, both str/unicode and uuid.UUID types that represent UUIDs are properly mapped to Cassandra uuid or timeuuid type when saved as RDDs. However, Cassandra uuids are converted to str/unicode when read from Python.

PySpark prerequisites 

The prerequisites for starting PySpark are:

Insert Cassandra data 

  1. Start cqlsh.
    $ cqlsh
    Connected to Test Cluster at 127.0.0.1:9160.
    [cqlsh 4.1.1 | Cassandra 2.1.0.0 | DSE 4.7.0 | CQL spec 3.1.1 | Thrift protocol 19.39.0]
    Use HELP for help.
  2. In cqlsh, create a keyspace and two tables in Cassandra using the Analytics data center name.
    CREATE KEYSPACE IF NOT EXISTS test WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'Analytics' : 1};
    USE test;
    CREATE TABLE kv (key int PRIMARY KEY, value text);
    CREATE TABLE kv2 (key int PRIMARY KEY, value text);
  3. Insert data into kv table only.
    INSERT INTO kv (key, value) VALUES (1, 'abc');
    INSERT INTO kv (key, value) VALUES (2, 'def');
    The schema for both tables and the data for table kv exists in Cassandra before starting PySpark.

Access the data using PySpark 

  1. Start PySpark using one of the following commands:
    • Installer-Services and Package installations:
      dse pyspark
    • Installer-No Services and Tarball installations:
      install_location/bin/dse pyspark
    Note: The directory in which you run the dse Spark commands must be writable by the current user.
    The Spark prompt will appear.
  2. Call the cassandraTable method to obtain an RDD representing the Cassandra table test.kv.
    rdd = sc.cassandraTable("test", "kv")

    Cassandra rows are converted to Python objects of class pyspark.sql.Row, which allows for dictionary-like lookup of column values as well as directly formatting the rows as object fields.

  3. Query Cassandra.
    rdd.first()
    Row(key=2, value=u'def')
    rdd.first().key
    2
    rdd.first().value
    u'def'
    rdd.first()[0]
    2
    rdd.first()[1]
    u'def'
    rdd.collect()
    [Row(key=2, value=u'def'), Row(key=1, value=u'abc')]
    rdd.filter(lambda row: row.key > 1).collect()
    [Row(key=2, value=u'def')]
    
  4. Call saveToCassandra and pass a keyspace, table, and optionally a list of columns on any RDD of dictionary or pysparksql.Row objects. For example, save the key columns from the kv table to the kv2 table.
    rdd = sc.cassandraTable("test", "kv")
    rdd.saveToCassandra("test", "kv2", ["key"])
  5. In cqlsh, confirm that the keys from kv1 were added to table kv2.
    SELECT  * FROM  kv2;
    
     key | value
     ----+-------
       1 |  null
       2 |  null
    
         (2 rows)
    
  6. In PySpark, copy all columns from table kv to table kv2.
    rdd.saveToCassandra("test", "kv2")
  7. In cqlsh, take a look at the kv2 table to confirm that kv2 has both keys and values now.
    SELECT * FROM kv2;
    
     key | value
    -----+-------
       1 |   abc
       2 |   def
    
    (2 rows)

Insert new data into Cassandra from PySpark 

  1. Distribute the current Python collection to form an RDD. The source RDD does not need to be in Cassandra.
    otherRdd = sc.parallelize([{ "key": 3, "value": "foobar" }])  
  2. Save otherRdd to the kv2 table in Cassandra.
    otherRdd.saveToCassandra("test", "kv2")

    These steps add the key 3 and the value foobar to table k2.

  3. In cqlsh, select all the data from kv2 to confirm the addition of the data.
    SELECT * FROM kv2;
    
     key | value
    -----+--------
       1 |    abc
       2 |    def
       3 | foobar
    
    (3 rows)

Run a Python script using dse spark-submit 

You run a Python script using the spark-submit command. For example, create the following file and save it as standalone.py:

#standalone.py

from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("Stand Alone Python Script")
sc = SparkContext(conf=conf)
x = sc.cassandraTable("test","kv").collect()
print x

DataStax Enterprise sets the cassandra.connection.host environment variable, eliminating the need to set the variable in the Python file. Assuming you set up the kv table in the last example, execute standalone.py. On Linux, for example, from the installation directory, execute standalone.py as follows:

$ bin/dse spark-submit /<path>/standalone.py

Run Python jobs independently 

If you create test-suites with nose or other tools that run raw Python code, you cannot explicitly call dse spark-submit. You can execute Python scripts independent of DataStax Enterprise using one or the other of these methods.

  • Set the PYTHONPATH environment variable to the path of the DataStax Enterprise integrated python executable, as shown in the next section.
  • Use an initialization file that you modify to describe your environment and then run, as shown in the example later.

These methods work with the DataStax Enterprise integrated Spark, not the open source version.

Configure PYTHONPATH to run jobs 

This procedure uses the PYTHONPATH environment variable to find and use PySpark to execute a Python job.

Run a Python job using PYTHONPATH

  1. Set the PYTHONPATH environment variable to the location of the Python executable in your DataStax Enterprise Spark environment.
    The default Python location depends on the type of installation:
    Installer-Services and Package installations /usr/share/dse/spark/python
    Installer-No Services and Tarball installations install_location/resources/spark/python
  2. Run the Python script. For example, run the standalone.py file that you created in "Running a python script using dse spark-submit."
    $ python /<path>/standalone.py
    The output is:
    [Row(key=1, value=u'abc'), Row(key=2, value=u'def')]

Create an initialization file to run a Python job 

You can create an initialization file to run Python jobs independent of DataStax Enterprise.

Use the __name__.py convention to name the initialization file. For example, name the file __init__.py.

An initialization file on a tarball installation might look like this:
import os 
from os import getenv 
from os.path import join 
import sys
from subprocess import check_output 

HOME = getenv("HOME") 
DSE_HOME = getenv("DSE_HOME",join(HOME,"dse-4.7.0"))
SPARK_HOME = join(DSE_HOME,"resources","spark") 
os.environ['SPARK_HOME']=SPARK_HOME

PYSPARK_DIR = join(DSE_HOME,"resources","spark","python") 
ADD_PATH = [ PYSPARK_DIR ] 
for PATH in ADD_PATH: 
  if PATH not in sys.path: 
    sys.path.insert(1,PATH)
Use this sample initialization file as a guide to modify the file to match the operating system and DataStax Enterprise environment. For example:
  • Use HOME or USERPROFILE, depending on the operating system.
  • Change the DSE_HOME definition to match the location of the DataStax Enterprise installation.
  • Modify SPARK_HOME to match the location of Spark resources:
    • Installer-Services and Package installations: SPARK_HOME = join(DSE_HOME,"resources","spark") matches /etc/dse/spark
    • Installer-No Services and Tarball installations: SPARK_HOME = join(DSE_HOME,"resources","spark") matches the install_location/resources/spark location.

Use the initialization file to run jobs 

This procedure uses the initialization file shown in the last section to set up a number of environment variables and execute a sample Python job.

  1. Create a directory and subdirectory for storing the python scripts. For example, create a directory named example that has a subdirectory named connector.
  2. Create the example initialization file. For example, create the __init__.py file shown in the last section, and save it to example/connector directory.
  3. Create scripts to configure Spark and query the kv table in Cassandra from Spark. For example, create the a script named connector.py and save it to the example/connector directory.
    #connector.py
    
    from pyspark import SparkContext, SparkConf
    
    def getSC():
        conf = SparkConf().setAppName("Stand Alone Python Script")
        sc = SparkContext(conf=conf)
        return sc
  4. Create another script named moduleexample.py, for example, to print the results. Save moduleexample.py to the example directory.
    #!/usr/local/bin/python
    
    from connector.connector import getSC
    
    sc = getSC()
    print sc.cassandraTable("test","kv").collect()
    The example directory now contains these files:
    • moduleexample.py
    • connector/_init_.py
    • connector/connector.py
  5. Execute the Python job as the same user that launched DataStax Enterprise.
    $ python example/moduleexample.py

    The output of the kv table appears.

    [Row(key=1, value=u'abc'), Row(key=2, value=u'def')]