Integrate Celery with Astra DB Serverless
Celery is a flexible distributed task queue for asynchronous processing of messages. It is a Python package that you can integrate into your Python projects.
Celery consists of clients, which define and queue tasks, and workers, which run tasks and store the results. Clients and workers communicate through a message broker, such as RabbitMQ. Then, the caller can fetch the outcome of a task through a backend database. Deserialization and serialization are handled by the Celery infrastructure.
This guide assumes you are familiar with Celery and the celeryconfig
module.
Prepare Astra
-
Create an Astra DB Serverless database.
-
Create a keyspace for your Celery integration, but don’t create any tables in it.
DataStax recommends that you allow Celery to create the necessary tables.
-
Generate an application token with the Database Administrator role.
-
Download your database’s Secure Connect Bundle (SCB), and then securely store it where your Celery installation or Python project can access it.
Configure Celery
The following steps describe a minimal Celery setup that uses an Astra DB Serverless database as the backend. To test the integration, these steps run a sample task and retrieve the result on the client side.
-
Start a message broker, such as a Dockerized RabbitMQ instance:
docker run -d -p 5672:5672 rabbitmq
A message broker is required for this example to succeed. In alignment with the Celery quickstart, these steps use a containerized RabbitMQ instance. If you use a different message broker, modify the remaining steps as needed.
Wait for the broker to completely start up before proceeding.
-
Install Celery with the Cassandra backend in your Python project:
pip install celery[cassandra]
This backend enables configuration options for Apache Cassandra and Astra connections through the DataStax Python driver.
For Astra compatibility, you must install Celery version 5.3 or later.
-
In your Python project, create a
celeryconfig.py
module with your broker, backend, and other Celery configuration values:celeryconfig.pybroker_url = 'pyamqp://guest@localhost//' broker_connection_retry_on_startup = True task_serializer = 'json' result_serializer = 'json' accept_content = ['json'] enable_utc = True result_backend = 'cassandra://' cassandra_keyspace = 'KEYSPACE_NAME' cassandra_table = 'TABLE_NAME' cassandra_read_consistency = 'quorum' cassandra_write_consistency = 'quorum' cassandra_auth_provider = 'PlainTextAuthProvider' cassandra_auth_kwargs = { 'username': 'token', 'password': 'APPLICATION_TOKEN', } cassandra_secure_bundle_path = 'PATH/TO/SCB.zip'
Replace the following:
-
KEYSPACE_NAME
: The name of the keyspace that you created for your Celery integration. -
TABLE_NAME
: The name of the table that Celery will create to store task results. You don’t need to create this table beforehand; Celery will create it if it doesn’t exist at runtime. -
APPLICATION_TOKEN
: Your Astra application token. -
PATH/TO/SCB.zip
: The path to your database’s SCB zip file.
For more information, see Cassandra/Astra DB backend settings.
-
Create and run a task
-
In your Python project, create a
tasks.py
module with a task definition. You can use the following example or create your own task:tasks.pyfrom celery import Celery app = Celery('tasks') app.config_from_object('celeryconfig') @app.task def sortWords(text, capitalize): # Rearrange the text so that words are in alphabetical order. words = text.split(' ') sortedWords = sorted(words, key=str.upper) return ' '.join([ w if not capitalize else w.upper() for w in sortedWords ])
-
Start a Celery worker to run tasks:
celery -A tasks worker --loglevel=INFO
-
In a separate terminal, start a Python REPL in your project directory, and then run the following commands to queue, run, and retrieve the results some tasks:
from tasks import sortWords sorted1 = sortWords.delay('storage yay my DB is powerful results Astra', False) sorted1.ready() # Returns: True sorted1.get() # Returns: Astra DB is my powerful results storage yay sorted2 = sortWords.delay('In the land of another wizards day', capitalize=True) sorted2.get() # Returns: ANOTHER DAY IN LAND OF THE WIZARDS
-
Optional: Use
cqlsh
to check that the task results are present in Astra:SELECT * FROM KEYSPACE_NAME.TABLE_NAME;
Replace
KEYSPACE_NAME
andTABLE_NAME
with the values you used in yourceleryconfig.py
module.
See also
Because the Celery Cassandra backend uses the DataStax Python driver, you can set additional driver parameters in celeryconfig.py
, such as the protocol level, load-balancing policy, and execution profiles.
However, if you use a datacenter-aware load balancing policy, don’t set the local datacenter; the driver automatically detects the local datacenter from the SCB when connecting to Astra.
For more information, see the following documentation: