Real-time data pipeline Decodable objects
This guide is part of a series that creates a real-time data pipeline with Astra and Decodable. For context and prerequisites, start here. |
The Astra Streaming connection info
To connect Astra to Decodable, you need some information from your Astra Streaming tenant. From the Astra portal home, navigate to the webstore-clicks streaming tenant.
In the “Connect” tab, scroll down to the “Tenant Details” area. You will need these values to connect to Decodable.
You will also need to create a Pulsar token. Click the “Token Manager” link and “Create Token”. A popup window will appear with the token value. Click the clipboard icon to copy the token, and paste it to a temporary safe location like Notepad.
Close the popup and navigate back to the “Connect” tab. We’ll come back here in a bit.
Creating a Decodable connection to Astra Streaming for all web clicks
In a different browser tab, sign in to your Decodable account and navigate to the “Connections” tab. We are going to create a new connection and stream, that will direct all web clicks to the correct topic in Astra.
-
Click the “New Connection” button
-
In the “Choose a Connector” view, locate the DataStax Astra Streaming connector and click “Connect”.
-
Use the Tenant Details from Astra (in your other browser tab) to fill in the details about the connection as follows:
Connection Type
Sink
Broker Service Url
(from astra tenant)
Web Service Url
(from astra tenant)
Topic
persistent://webstore-clicks/production/all-clicks
Authentication Token
(from astra tenant)
Value Format
JSON
-
Click “Next” and choose “New Stream”. Name the new stream “Webstore-Normalized-Clicks-Stream”.
-
Click “Next” to build the following schema.
click_timestamp
TIMESTAMP(0)
url_host
STRING
url_protocol
STRING
url_path
STRING
url_query
STRING
browser_type
STRING
operating_system
STRING
visitor_id
STRING
You have to actually select "string" for the data type in the dropdown menu, or Decodable will not accept the schema.
-
Click “Next” and name the overall connection “Astra-Streaming-All-Webclicks-Connection”.
-
You now have a ready-to-go Astra Streaming connection! Oh, the things we can do!!!
Creating a Decodable connection to Astra Streaming for product web clicks
Create another connection to stream product clicks.
-
Navigate back to the “Connections” area and click “New Connection”.
-
Just as previously done, locate the Astra Streaming connection and use the Tenant Details from Astra to fill in the details about the connection as follows. Note the change in topic name.
Connection Type
Sink
Broker Service Url
(from astra tenant)
Web Service Url
(from astra tenant)
Topic
persistent://webstore-clicks/production/product-clicks
Authentication Token
(from astra tenant)
Value Format
JSON
-
Click “Next” and create a “New Stream”. Name it “Webstore-Product-Clicks-Stream”
-
Click “Next” and build the following schema.
click_timestamp
TIMESTAMP(0)
catalog_area_name
STRING
product_name
STRING
You have to actually select "string" for the data type in the dropdown menu, or Decodable will not accept the schema.
-
Click “Next” and name the overall connection “Astra-Streaming-Product-Webclicks-Connection”.
-
Click “Create Connection” to finalize the connection.
Creating an HTTP data ingestion source
We need to create one more connection. We’ll use Decodable’s REST connector to POST (or ingest) raw data into the pipeline.
-
Navigate to the “Connections” area and click “New Connection”.
-
Locate the “REST” connection and click “Connect”.
-
Leave all the settings as default.
-
Click “Next” and create a “New Stream”. Name it “Webstore-Raw-Clicks-Stream”.
-
Click “Next” and fill in the following schema.
click_epoch
BIGINT
UTC_offset
INT
request_url
STRING
browser_agent
STRING
visitor_id
STRING
You have to actually select "string" for the data type in the dropdown menu, or Decodable will not accept the schema.
-
Click “Next” and name the overall connection “Webstore-Raw-Clicks-Connection”.
-
Click “Create Connection”.
-
Notice in the connector settings how the “Endpoint” value has a “<connection-id>” value. This is a dynamic value that is generated when the connection is created. Navigate to the “Details” tab of the connection and you will see the final endpoint value. We’ll prefix that value with our account info (ddieruf.api.decodable.co) to create a usable URL. Learn more about the REST connector in the Decodable documentation.
You now have 3 connections ready to go.
Creating a data normalization pipeline
Now we are going to create the core functions for our stream processing.
-
Navigate to the “Pipelines” area and click “Create Pipeline”.
-
Choose an input of “Webstore-Raw-Clicks-Stream” and click “Next”.
-
Clear the existing SQL and copy/paste the following into the “SQL” area.
insert into `Webstore-Normalized-Clicks-Stream` select CURRENT_TIMESTAMP as click_timestamp , PARSE_URL(request_url, 'HOST') as url_host , PARSE_URL(request_url, 'PROTOCOL') as url_protocol , PARSE_URL(request_url, 'PATH') as url_path , PARSE_URL(request_url, 'QUERY') as url_query , REGEXP_EXTRACT(browser_agent,'(MSIE|Trident|(?!Gecko.+)Firefox|(?!AppleWebKit.+Chrome.+)Safari(?!.+Edge)|(?!AppleWebKit.+)Chrome(?!.+Edge)|(?!AppleWebKit.+Chrome.+Safari.+)Edge|AppleWebKit(?!.+Chrome|.+Safari)|Gecko(?!.+Firefox))(?: |\/)([\d\.apre]+)') as browser_type , CASE WHEN (browser_agent like '%Win64%') THEN 'Windows' WHEN (browser_agent like '%Mac%') THEN 'Macintosh' WHEN (browser_agent like '%Linux%') THEN 'Linux' WHEN (browser_agent like '%iPhone%') THEN 'iPhone' WHEN (browser_agent like '%Android%') THEN 'Android' ELSE 'unknown' END as operating_system , visitor_id as visitor_id from `Webstore-Raw-Clicks-Stream`
-
Click “Next” and review the auto-generated output stream. Thank you Decodable!
-
Click “Next” and name the pipeline “Webstore-Raw-Clicks-Normalize-Pipeline”.
-
Click “Create Pipeline” and be patient, it might take a few seconds.
Creating a data filtering pipeline
Create one more pipeline to filter out product click data.
-
Navigate to the “Pipelines” area and click “New Pipeline”.
-
Choose the “Webstore-Normalized-Clicks-Stream” as the input.
-
Clear the SQL from the window and copy/paste the following into the “SQL” window.
insert into `Webstore-Product-Clicks-Stream` select click_timestamp , TRIM(REPLACE(SPLIT_INDEX(url_path, '/', 2),'-',' ')) as catalog_area_name , TRIM(REPLACE(SPLIT_INDEX(url_path, '/', 3),'-',' ')) as product_name from `Webstore-Normalized-Clicks-Stream` where TRIM(LOWER(SPLIT_INDEX(url_path, '/', 1))) = 'catalog'
-
Click “Next” and review the auto-generated output stream. Thank you again Decodable!
-
Click “Next” and name the pipeline “Webstore-Product-Clicks-Pipeline”.
-
Now we have a pipeline ready to filter by product.
Next step
Now it’s time see the magic! Run the pipelines >>