Real-time data pipeline Astra objects

This guide is part of a series that creates a real-time data pipeline with Astra and Decodable. For context and prerequisites, start here.

Creating message topics to capture the stream of click data

  1. Navigate to your Astra portal home and click "Create a Stream".

    image4

  2. Name the new streaming tenant “webstore-clicks". Choose any cloud provider and region. Click “Create Tenant”.

    image6

  3. You will be redirected to your new tenant’s quickstart. Navigate to the “Namespace and Topics” tab at the top of the screen.

    image16

  4. Create a new namespace with the name “production”. We are treating namespaces as logical development environments to illustrate how you could create a continuous delivery flow. You could also have namespaces for “development” and “staging”.

    image11

  5. The namespaces view will refresh to display your new "production" namespace. Click the “Add Topic” button associated with the "production" namespace. Name your new topic “all-clicks” and leave it as a “Persistent” topic. Click the “Add Topic” button to finish creating the topic.

    image15

  6. Create a second new topic. Click the “Add Topic” button associated with the "production" namespace. Name your second new topic “product-clicks” and leave it as a “Persistent” topic. Click the “Add Topic” button to finish creating the topic.

    image8

  7. You should have 2 namespaces. The “production” namespace should contain the "all-clicks" and "product-clicks" topics you created. The "default" namespace is automatically created by Pulsar within each new streaming tenant.

    image13

Storing the stream of click data

  1. From the Astra portal home, click “Create a Database”.

    image18

  2. Name the database “webstore-clicks” and the keyspace “click_data”. Choose any cloud provider and region. Click “Create Database”.

    image5

  3. The page will refresh with your new token details. Don’t worry about saving the tokens - we will retrieve these later. You can “Esc” or just return to your Astra portal home, where you will see your new streaming tenant and database.

    image1

  4. Copy and paste the following CQL statement into the CQL console and press “Enter”. This will create a table in the database to hold our "all-clicks" web click data (ie: the raw data).

    CREATE TABLE IF NOT EXISTS click_data.all_clicks (
      click_timestamp bigint,
      url_host text,
      url_protocol text,
      url_path text,
      url_query text,
      browser_type text,
      operating_system text,
      visitor_id uuid,
      PRIMARY KEY ((operating_system, browser_type, url_host, url_path), click_timestamp)
    );
  5. Create a second table in the database. Copy and paste the following CQL statement into the CQL console and press “Enter”. This will create a second table in the database to hold our "product-clicks" web click data (ie: the filtered data).

    CREATE TABLE click_data.product_clicks (
        catalog_area_name text,
        product_name text,
        click_timestamp timestamp,
        PRIMARY KEY ((catalog_area_name), product_name, click_timestamp)
    ) WITH CLUSTERING ORDER BY (product_name ASC, click_timestamp DESC);
  6. You can confirm everything was created correctly by describing the keyspace in the CQL terminal.

    • CQL

    • Result

    describe click_data;
    token@cqlsh> describe click_data;
    
    CREATE KEYSPACE click_data WITH replication = {'class': 'NetworkTopologyStrategy', 'us-east-1': '3'}  AND durable_writes = true;
    
    CREATE TABLE click_data.all_clicks (
        operating_system text,
        browser_type text,
        url_host text,
        url_path text,
        click_timestamp bigint,
        url_protocol text,
        url_query text,
        visitor_id uuid,
        PRIMARY KEY ((operating_system, browser_type, url_host, url_path), click_timestamp)
    ) WITH CLUSTERING ORDER BY (click_timestamp ASC)
        AND additional_write_policy = '99PERCENTILE'
        AND bloom_filter_fp_chance = 0.01
        AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
        AND comment = ''
        AND compaction = {'class': 'org.apache.cassandra.db.compaction.UnifiedCompactionStrategy'}
        AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
        AND crc_check_chance = 1.0
        AND default_time_to_live = 0
        AND gc_grace_seconds = 864000
        AND max_index_interval = 2048
        AND memtable_flush_period_in_ms = 0
        AND min_index_interval = 128
        AND read_repair = 'BLOCKING'
        AND speculative_retry = '99PERCENTILE';
    
    CREATE TABLE click_data.product_clicks (
        catalog_area_name text,
        product_name text,
        click_timestamp timestamp,
        PRIMARY KEY (catalog_area_name, product_name, click_timestamp)
    ) WITH CLUSTERING ORDER BY (product_name ASC, click_timestamp DESC)
        AND additional_write_policy = '99PERCENTILE'
        AND bloom_filter_fp_chance = 0.01
        AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
        AND comment = ''
        AND compaction = {'class': 'org.apache.cassandra.db.compaction.UnifiedCompactionStrategy'}
        AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
        AND crc_check_chance = 1.0
        AND default_time_to_live = 0
        AND gc_grace_seconds = 864000
        AND max_index_interval = 2048
        AND memtable_flush_period_in_ms = 0
        AND min_index_interval = 128
        AND read_repair = 'BLOCKING'
        AND speculative_retry = '99PERCENTILE';

The output displays three “create” CQL statements for the “click_data” keyspace, the click_data.all_clicks table, and the click_data.product_clicks table.

Connecting the topics to the store

  1. From the Astra portal home, click on the “webstore-clicks” streaming tenant.

    image3

  2. Navigate to the “Sinks” tab and click “Create Sink”.

    image10

  3. Fill in the details about the new sink as follows:

    Namespace

    production

    Sink Type

    Astra DB

    Name

    all-clicks

    Input topic

    all-clicks

    Database

    webstore-clicks

    Token:

    1. Click “here” to create a new Astra token ( you won’t lose your work, it will open in a new tab).

    2. Choose the “Organization Administrator” role and click “Generate Token”.

      image2

    3. The page will refresh with the new token details. Click the clipboard icon (to the right) for the “Token” value.

    4. Navigate back to the Astra portal page in your browser and paste the value in the “Token” input.

      image17

    Keyspace

    click_data

    Table Name

    all_clicks

    Mapping

    (leave alone)

    You will need the token again when creating a second sink. Either paste it in notepad (or some temp safe place) or keep the browser tab open.
  4. Click “Create” to create the sink. You will be directed back to the Sinks listing where your new sink is initializing. When your new sink is ready, its status will change to “Running”.

    image14

  5. Follow the same flow to create a second sink with the following values:

    Namespace

    production

    Sink Type

    Astra DB

    Name

    prd-click-astradb

    Input topic

    product-clicks

    Database

    webstore-clicks

    Token:

    (paste the same value from the previous sink)

    Keyspace

    click_data

    Table Name

    product_clicks

    Mapping

    (leave alone)

  6. If everything goes smoothly, you should have 2 sinks in a “Running” state.

    image9

    To debug, click the sink name and scroll to the bottom of the sink’s page, where there is a terminal output area to view deployment logs. This is a semi-verbose log of the sink starting, validating, and running.

Next step

Great work! With the Astra objects in place, let’s move on to setting up the Decodable processing. Setup Decodable >>

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com