Real-time data pipeline Astra objects

This guide is part of a series that creates a real-time data pipeline with Astra Streaming and Decodable. For context and prerequisites, start here.

Creating message topics to capture the stream of click data

  1. In the Astra Portal header, click Applications, select Streaming, and then click Create Tenant.

  2. Name the new streaming tenant webstore-clicks, select any cloud provider and region, and then click Create Tenant.

  3. From your tenant’s overview page, click the Namespace and Topics tab.

  4. Create a new namespace with the name production.

    In this example, namespaces represent logical development environments to illustrate how you could create a continuous delivery flow. You could also have namespaces for development and staging.

  5. Click Add Topic next to your new production namespace, name the topic all-clicks, make sure Persistent is selected, and then click Add Topic.

  6. Create another topic in the production namespace, name the topic product-clicks, make sure Persistent is selected, and then click Add Topic.

You now have a production namespace with two topics, as well as the default namespace that is automatically created by Pulsar whenever you create a streaming tenant.

Storing the stream of click data

  1. In the Astra Portal header, click Applications, and then select Astra.

  2. Click Create database, and then complete the fields as follows:

    • Type: Select Serverless (non-vector) to follow along with this tutorial.

      If you select Serverless (vector), you must modify the tutorial to use the default_keyspace keyspace or create the tutorial keyspace after you create your database.

    • Database name: Enter webstore-clicks.

    • Keyspace name: Enter click_data.

    • Provider and Region: Select the same cloud provider and region as your streaming tenant.

  3. Click Create Database, and then wait for the database to initialize. This can take several minutes.

  4. From your database’s overview page, click CQL Console, and then wait for cqlsh to start.

  5. Enter the following CQL statement into the CQL console, and then press Enter.

    This statement creates a table named all_clicks in the click_data keyspace that will store all unfiltered web click data.

    CREATE TABLE IF NOT EXISTS click_data.all_clicks (
      click_timestamp bigint,
      url_host text,
      url_protocol text,
      url_path text,
      url_query text,
      browser_type text,
      operating_system text,
      visitor_id uuid,
      PRIMARY KEY ((operating_system, browser_type, url_host, url_path), click_timestamp)
    );
  6. Run the following command in the CQL console to create another table that will store filtered web click data for product clicks only.

    CREATE TABLE click_data.product_clicks (
        catalog_area_name text,
        product_name text,
        click_timestamp timestamp,
        PRIMARY KEY ((catalog_area_name), product_name, click_timestamp)
    ) WITH CLUSTERING ORDER BY (product_name ASC, click_timestamp DESC);
  7. To verify that the tables were created, run describe click_data;.

    The console prints create statements describing the keyspace itself and the two tables.

    Result
    token@cqlsh> describe click_data;
    
    CREATE KEYSPACE click_data WITH replication = {'class': 'NetworkTopologyStrategy', 'us-east-1': '3'}  AND durable_writes = true;
    
    CREATE TABLE click_data.all_clicks (
        operating_system text,
        browser_type text,
        url_host text,
        url_path text,
        click_timestamp bigint,
        url_protocol text,
        url_query text,
        visitor_id uuid,
        PRIMARY KEY ((operating_system, browser_type, url_host, url_path), click_timestamp)
    ) WITH CLUSTERING ORDER BY (click_timestamp ASC)
        AND additional_write_policy = '99PERCENTILE'
        AND bloom_filter_fp_chance = 0.01
        AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
        AND comment = ''
        AND compaction = {'class': 'org.apache.cassandra.db.compaction.UnifiedCompactionStrategy'}
        AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
        AND crc_check_chance = 1.0
        AND default_time_to_live = 0
        AND gc_grace_seconds = 864000
        AND max_index_interval = 2048
        AND memtable_flush_period_in_ms = 0
        AND min_index_interval = 128
        AND read_repair = 'BLOCKING'
        AND speculative_retry = '99PERCENTILE';
    
    CREATE TABLE click_data.product_clicks (
        catalog_area_name text,
        product_name text,
        click_timestamp timestamp,
        PRIMARY KEY (catalog_area_name, product_name, click_timestamp)
    ) WITH CLUSTERING ORDER BY (product_name ASC, click_timestamp DESC)
        AND additional_write_policy = '99PERCENTILE'
        AND bloom_filter_fp_chance = 0.01
        AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
        AND comment = ''
        AND compaction = {'class': 'org.apache.cassandra.db.compaction.UnifiedCompactionStrategy'}
        AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
        AND crc_check_chance = 1.0
        AND default_time_to_live = 0
        AND gc_grace_seconds = 864000
        AND max_index_interval = 2048
        AND memtable_flush_period_in_ms = 0
        AND min_index_interval = 128
        AND read_repair = 'BLOCKING'
        AND speculative_retry = '99PERCENTILE';

Connecting the topics to the store

  1. In the Astra Portal header, click Applications, and then select Streaming.

  2. Click your webstore-clicks streaming tenant.

  3. Click the Sinks tab, click Create Sink, and then complete the fields as follows:

    • Namespace: Select your production namespace.

    • Sink Type: Select Astra DB.

    • Name: Enter all-clicks.

    • Input topic: Select your all-clicks topic in your production namespace.

    • Database: Select your webstore-clicks database.

    • Token: Click the link to create an Astra application token with the Organization Administrator role, and then enter the token in the sink’s Token field. Store the token securely, you will use it multiple times during this tutorial.

    • Keyspace: Enter click_data.

    • Table Name: Enter all_clicks.

    • Mapping: Use the default mapping, which maps the topic’s fields to the table’s columns.

  4. Click Create, and then wait for the sink to initialize.

    When the sink is ready, its status changes to Running.

  5. Create another sink with the following configuration:

    • Namespace: Select your production namespace.

    • Sink Type: Select Astra DB.

    • Name: Enter prd-click-astradb.

    • Input topic: Select your product-clicks topic in your production namespace.

    • Database: Select your webstore-clicks database.

    • Token: Use the same token that you created for the other sink.

    • Keyspace: Enter click_data.

    • Table Name: Enter product_clicks.

    • Mapping: Use the default mapping, which maps the topic’s fields to the table’s columns.

After the second sink initializes, you have two running sinks.

To debug a sink, you can view the sink’s logs in the Astra Portal. To do this, click the sink name, and then scroll to terminal output area on the sink’s overview page. The deployment logs are printed in this terminal output area, including semi-verbose starting, validating, and running logs.

Next step

Now that you created the required Astra objects, you can set up the Decodable processing.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax | Privacy policy | Terms of use | Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com