Integrate Celery with Astra DB Serverless

Celery is a flexible distributed task queue for asynchronous processing of messages. It is a Python package that you can integrate into your Python projects.

Celery consists of clients, which define and queue tasks, and workers, which run tasks and store the results. Clients and workers communicate through a message broker, such as RabbitMQ. Then, the caller can fetch the outcome of a task through a backend database. Deserialization and serialization are handled by the Celery infrastructure.

This guide assumes you are familiar with Celery and the celeryconfig module.

Prepare Astra

  1. Create an Astra DB Serverless database.

  2. Create a keyspace for your Celery integration, but don’t create any tables in it.

    DataStax recommends that you allow Celery to create the necessary tables.

  3. Generate an application token with the Database Administrator role.

  4. Download your database’s Secure Connect Bundle (SCB), and then securely store it where your Celery installation or Python project can access it.

Configure Celery

The following steps describe a minimal Celery setup that uses an Astra DB Serverless database as the backend. To test the integration, these steps run a sample task and retrieve the result on the client side.

  1. Start a message broker, such as a Dockerized RabbitMQ instance:

    docker run -d -p 5672:5672 rabbitmq

    A message broker is required for this example to succeed. In alignment with the Celery quickstart, these steps use a containerized RabbitMQ instance. If you use a different message broker, modify the remaining steps as needed.

    Wait for the broker to completely start up before proceeding.

  2. Install Celery with the Cassandra backend in your Python project:

    pip install celery[cassandra]

    This backend enables configuration options for Apache Cassandra and Astra connections through the DataStax Python driver.

    For Astra compatibility, you must install Celery version 5.3 or later.

  3. In your Python project, create a celeryconfig.py module with your broker, backend, and other Celery configuration values:

    celeryconfig.py
    broker_url = 'pyamqp://guest@localhost//'
    
    broker_connection_retry_on_startup = True
    
    task_serializer = 'json'
    result_serializer = 'json'
    accept_content = ['json']
    enable_utc = True
    
    result_backend = 'cassandra://'
    cassandra_keyspace = 'KEYSPACE_NAME'
    cassandra_table = 'TABLE_NAME'
    cassandra_read_consistency = 'quorum'
    cassandra_write_consistency = 'quorum'
    cassandra_auth_provider = 'PlainTextAuthProvider'
    cassandra_auth_kwargs = {
      'username': 'token',
      'password': 'APPLICATION_TOKEN',
    }
    cassandra_secure_bundle_path = 'PATH/TO/SCB.zip'

    Replace the following:

    • KEYSPACE_NAME: The name of the keyspace that you created for your Celery integration.

    • TABLE_NAME: The name of the table that Celery will create to store task results. You don’t need to create this table beforehand; Celery will create it if it doesn’t exist at runtime.

    • APPLICATION_TOKEN: Your Astra application token.

    • PATH/TO/SCB.zip: The path to your database’s SCB zip file.

    For more information, see Cassandra/Astra DB backend settings.

Create and run a task

  1. In your Python project, create a tasks.py module with a task definition. You can use the following example or create your own task:

    tasks.py
    from celery import Celery
    
    app = Celery('tasks')
    app.config_from_object('celeryconfig')
    
    @app.task
    def sortWords(text, capitalize):
        # Rearrange the text so that words are in alphabetical order.
        words = text.split(' ')
        sortedWords = sorted(words, key=str.upper)
        return ' '.join([
            w if not capitalize else w.upper()
            for w in sortedWords
        ])
  2. Start a Celery worker to run tasks:

    celery -A tasks worker --loglevel=INFO
  3. In a separate terminal, start a Python REPL in your project directory, and then run the following commands to queue, run, and retrieve the results some tasks:

    from tasks import sortWords
    sorted1 = sortWords.delay('storage yay my DB is powerful results Astra', False)
    sorted1.ready()
    # Returns: True
    
    sorted1.get()
    # Returns: Astra DB is my powerful results storage yay
    
    sorted2 = sortWords.delay('In the land of another wizards day', capitalize=True)
    sorted2.get()
    # Returns: ANOTHER DAY IN LAND OF THE WIZARDS
  4. Optional: Use cqlsh to check that the task results are present in Astra:

    SELECT * FROM KEYSPACE_NAME.TABLE_NAME;

    Replace KEYSPACE_NAME and TABLE_NAME with the values you used in your celeryconfig.py module.

See also

Because the Celery Cassandra backend uses the DataStax Python driver, you can set additional driver parameters in celeryconfig.py, such as the protocol level, load-balancing policy, and execution profiles.

However, if you use a datacenter-aware load balancing policy, don’t set the local datacenter; the driver automatically detects the local datacenter from the SCB when connecting to Astra.

For more information, see the following documentation:

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax, an IBM Company | Privacy policy | Terms of use | Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com