Real-time data pipeline Astra objects
This guide is part of a series that creates a real-time data pipeline with Astra Streaming and Decodable. For context and prerequisites, start here. |
Creating message topics to capture the stream of click data
-
In the Astra Portal header, click Applications, select Streaming, and then click Create Tenant.
-
Name the new streaming tenant
webstore-clicks
, select any cloud provider and region, and then click Create Tenant. -
From your tenant’s overview page, click the Namespace and Topics tab.
-
Create a new namespace with the name
production
.In this example, namespaces represent logical development environments to illustrate how you could create a continuous delivery flow. You could also have namespaces for
development
andstaging
. -
Click Add Topic next to your new
production
namespace, name the topicall-clicks
, make sure Persistent is selected, and then click Add Topic. -
Create another topic in the
production
namespace, name the topicproduct-clicks
, make sure Persistent is selected, and then click Add Topic.
You now have a production
namespace with two topics, as well as the default
namespace that is automatically created by Pulsar whenever you create a streaming tenant.
Storing the stream of click data
-
In the Astra Portal header, click Applications, and then select Astra.
-
Click Create database, and then complete the fields as follows:
-
Type: Select Serverless (non-vector) to follow along with this tutorial.
If you select Serverless (vector), you must modify the tutorial to use the
default_keyspace
keyspace or create the tutorial keyspace after you create your database. -
Database name: Enter
webstore-clicks
. -
Keyspace name: Enter
click_data
. -
Provider and Region: Select the same cloud provider and region as your streaming tenant.
-
-
Click Create Database, and then wait for the database to initialize. This can take several minutes.
-
From your database’s overview page, click CQL Console, and then wait for
cqlsh
to start. -
Enter the following CQL statement into the CQL console, and then press Enter.
This statement creates a table named
all_clicks
in theclick_data
keyspace that will store all unfiltered web click data.CREATE TABLE IF NOT EXISTS click_data.all_clicks ( click_timestamp bigint, url_host text, url_protocol text, url_path text, url_query text, browser_type text, operating_system text, visitor_id uuid, PRIMARY KEY ((operating_system, browser_type, url_host, url_path), click_timestamp) );
-
Run the following command in the CQL console to create another table that will store filtered web click data for product clicks only.
CREATE TABLE click_data.product_clicks ( catalog_area_name text, product_name text, click_timestamp timestamp, PRIMARY KEY ((catalog_area_name), product_name, click_timestamp) ) WITH CLUSTERING ORDER BY (product_name ASC, click_timestamp DESC);
-
To verify that the tables were created, run
describe click_data;
.The console prints create statements describing the keyspace itself and the two tables.
Result
token@cqlsh> describe click_data; CREATE KEYSPACE click_data WITH replication = {'class': 'NetworkTopologyStrategy', 'us-east-1': '3'} AND durable_writes = true; CREATE TABLE click_data.all_clicks ( operating_system text, browser_type text, url_host text, url_path text, click_timestamp bigint, url_protocol text, url_query text, visitor_id uuid, PRIMARY KEY ((operating_system, browser_type, url_host, url_path), click_timestamp) ) WITH CLUSTERING ORDER BY (click_timestamp ASC) AND additional_write_policy = '99PERCENTILE' AND bloom_filter_fp_chance = 0.01 AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'} AND comment = '' AND compaction = {'class': 'org.apache.cassandra.db.compaction.UnifiedCompactionStrategy'} AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'} AND crc_check_chance = 1.0 AND default_time_to_live = 0 AND gc_grace_seconds = 864000 AND max_index_interval = 2048 AND memtable_flush_period_in_ms = 0 AND min_index_interval = 128 AND read_repair = 'BLOCKING' AND speculative_retry = '99PERCENTILE'; CREATE TABLE click_data.product_clicks ( catalog_area_name text, product_name text, click_timestamp timestamp, PRIMARY KEY (catalog_area_name, product_name, click_timestamp) ) WITH CLUSTERING ORDER BY (product_name ASC, click_timestamp DESC) AND additional_write_policy = '99PERCENTILE' AND bloom_filter_fp_chance = 0.01 AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'} AND comment = '' AND compaction = {'class': 'org.apache.cassandra.db.compaction.UnifiedCompactionStrategy'} AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'} AND crc_check_chance = 1.0 AND default_time_to_live = 0 AND gc_grace_seconds = 864000 AND max_index_interval = 2048 AND memtable_flush_period_in_ms = 0 AND min_index_interval = 128 AND read_repair = 'BLOCKING' AND speculative_retry = '99PERCENTILE';
Connecting the topics to the store
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Click your
webstore-clicks
streaming tenant. -
Click the Sinks tab, click Create Sink, and then complete the fields as follows:
-
Namespace: Select your
production
namespace. -
Sink Type: Select Astra DB.
-
Name: Enter
all-clicks
. -
Input topic: Select your
all-clicks
topic in yourproduction
namespace. -
Database: Select your
webstore-clicks
database. -
Token: Click the link to create an Astra application token with the Organization Administrator role, and then enter the token in the sink’s Token field. Store the token securely, you will use it multiple times during this tutorial.
-
Keyspace: Enter
click_data
. -
Table Name: Enter
all_clicks
. -
Mapping: Use the default mapping, which maps the topic’s fields to the table’s columns.
-
-
Click Create, and then wait for the sink to initialize.
When the sink is ready, its status changes to Running.
-
Create another sink with the following configuration:
-
Namespace: Select your
production
namespace. -
Sink Type: Select Astra DB.
-
Name: Enter
prd-click-astradb
. -
Input topic: Select your
product-clicks
topic in yourproduction
namespace. -
Database: Select your
webstore-clicks
database. -
Token: Use the same token that you created for the other sink.
-
Keyspace: Enter
click_data
. -
Table Name: Enter
product_clicks
. -
Mapping: Use the default mapping, which maps the topic’s fields to the table’s columns.
-
After the second sink initializes, you have two running sinks.
To debug a sink, you can view the sink’s logs in the Astra Portal.
To do this, click the sink name, and then scroll to terminal output area on the sink’s overview page.
The deployment logs are printed in this terminal output area, including semi-verbose starting
, validating
, and running
logs.
Next step
Now that you created the required Astra objects, you can set up the Decodable processing.