Real-time data pipeline Decodable objects
This guide is part of a series that creates a real-time data pipeline with Astra Streaming and Decodable. For context and prerequisites, start here. |
The Astra Streaming connection info
To connect Astra Streaming to Decodable, you need some information from your Astra Streaming tenant.
-
In the Astra Portal header, click Applications, select Streaming, and then click your
webstore-clicks
streaming tenant. -
Click the Connect tab, and then scroll to the Tenant Details section.
These values are required to connect to Decodable.
-
Create a Pulsar token:
-
In Tenant Details, click Token Manager, and then click Create Token.
-
In the Copy Token dialog, copy the Token, and then store it securely.
-
Click Close when you are done.
-
Creating a Decodable connection to Astra Streaming for all web clicks
In Decodable, you must create a connection and stream that will direct all web clicks to the correct topics in Astra Streaming.
-
In new browser tab, sign in to your Decodable account.
-
Click the Connections tab, and then click New Connection.
-
In the Choose a Connector dialog, find the DataStax Astra Streaming connector, and then click Connect.
-
Use the Astra Streaming tenant details from your other browser tab to complete the Decodable connection fields:
-
Connection Type: Select Sink.
-
Broker Service URL: Enter the Pulsar broker service URL from your Astra Streaming tenant.
-
Web Service URL: Enter the Pulsar web service URL from your Astra Streaming tenant.
-
Topic: Enter
persistent://webstore-clicks/production/all-clicks
. -
Authentication Token: Enter the same token you used for your Astra Streaming sinks.
-
Value Format: Select JSON.
-
-
Click Next, click New Stream, name the stream
Webstore-Normalized-Clicks-Stream
, and then click Next. -
In Define this Connection’s schema, select New Schema for the Schema Source, and then add fields with the following names and types:
Name Type click_timestamp
TIMESTAMP(0)
url_host
STRING
url_protocol
STRING
url_path
STRING
url_query
STRING
browser_type
STRING
operating_system
STRING
visitor_id
STRING
For Type, you must select options from the dropdown menu in order for Decodable to accept the schema.
-
Click Next, name the connection
Astra-Streaming-All-Webclicks-Connection
, and then click Create Connection.
Creating a Decodable connection to Astra Streaming for product web clicks
Create another connection in Decodable to stream product clicks.
-
In Decodable, on the Connections tab, click New Connection.
-
In the Choose a Connector dialog, find the DataStax Astra Streaming connector, and then click Connect.
-
Use the Astra Streaming tenant details from your other browser tab to complete the Decodable connection fields. All values are the same as the other connection, except the Topic.
-
Connection Type: Select Sink.
-
Broker Service URL: Enter the Pulsar broker service URL from your Astra Streaming tenant.
-
Web Service URL: Enter the Pulsar web service URL from your Astra Streaming tenant.
-
Topic: Enter
persistent://webstore-clicks/production/product-clicks
. -
Authentication Token: Enter the same token you used for your Astra Streaming sinks.
-
Value Format: Select JSON.
-
-
Click Next, click New Stream, name the stream
Webstore-Product-Clicks-Stream
, and then click Next. -
In Define this Connection’s schema, select New Schema for the Schema Source, and then add fields with the following names and types:
Name Type click_timestamp
TIMESTAMP(0)
catalog_area_name
STRING
product_name
STRING
-
Click Next, name the connection
Astra-Streaming-Product-Webclicks-Connection
, and then click Create Connection.
Creating an HTTP data ingestion source
Create a third connection to use Decodable’s REST API to ingest (POST
) raw data into the pipeline:
-
In Decodable, on the Connections tab, click New Connection.
-
In the Choose a Connector dialog, find the REST connector, and then click Connect.
-
On the Create your REST connector dialog, leave the default values for all fields, and then click Next.
-
Click New Stream, enter the name
Webstore-Raw-Clicks-Stream
, and then click Next. -
In Define this Connection’s schema, select New Schema for the Schema Source, and then add fields with the following names and types:
Name Type click_epoch
BIGINT
UTC_offset
INT
request_url
STRING
browser_agent
STRING
visitor_id
STRING
-
Click Next, name the connection
Webstore-Raw-Clicks-Connection
, and then click Create Connection.
In your REST connector’s settings, note that the Endpoint value contains a <connection_ID>
, which is a dynamic value that is generated when the connection is created.
Click the connector’s Details tab to see the resolved endpoint path, such as /v1alpha2/connections/7ef9055f/events
.
You will use this path with your account domain, such as user.api.decodable.co
to create the full endpoint URL.
For more information about the REST connector, see the Decodable documentation.
+
You now have three connectors ready to use in your streaming pipeline.
Creating a data normalization pipeline
In this part of the tutorial, you will create the core functions for your stream processing pipeline.
-
In Decodable, go to Pipelines, and then click Create Pipeline.
-
For the input stream, select Webstore-Raw-Clicks-Stream, and then click Next.
-
In Define your data processing with SQL, delete the pre-populated SQL, and then enter the following SQL statement:
insert into `Webstore-Normalized-Clicks-Stream` select CURRENT_TIMESTAMP as click_timestamp , PARSE_URL(request_url, 'HOST') as url_host , PARSE_URL(request_url, 'PROTOCOL') as url_protocol , PARSE_URL(request_url, 'PATH') as url_path , PARSE_URL(request_url, 'QUERY') as url_query , REGEXP_EXTRACT(browser_agent,'(MSIE|Trident|(?!Gecko.+)Firefox|(?!AppleWebKit.+Chrome.+)Safari(?!.+Edge)|(?!AppleWebKit.+)Chrome(?!.+Edge)|(?!AppleWebKit.+Chrome.+Safari.+)Edge|AppleWebKit(?!.+Chrome|.+Safari)|Gecko(?!.+Firefox))(?: |\/)([\d\.apre]+)') as browser_type , CASE WHEN (browser_agent like '%Win64%') THEN 'Windows' WHEN (browser_agent like '%Mac%') THEN 'Macintosh' WHEN (browser_agent like '%Linux%') THEN 'Linux' WHEN (browser_agent like '%iPhone%') THEN 'iPhone' WHEN (browser_agent like '%Android%') THEN 'Android' ELSE 'unknown' END as operating_system , visitor_id as visitor_id from `Webstore-Raw-Clicks-Stream`
-
Click Next, review the automatically generated output stream, and then click Next.
The output stream should be correct by default if you followed along with the tutorial so far.
-
Click Next, name the pipeline
Webstore-Raw-Clicks-Normalize-Pipeline
, and then click Create Pipeline.It can take a few minutes for the pipeline to be created.
Creating a data filtering pipeline
Create a pipeline to separate product click data from all web click data:
-
In Decodable, go to Pipelines, and then click Create Pipeline.
-
For the input stream, select Webstore-Normalized-Clicks-Stream, and then click Next.
-
In Define your data processing with SQL, delete the pre-populated SQL, and then enter the following SQL statement:
insert into `Webstore-Product-Clicks-Stream` select click_timestamp , TRIM(REPLACE(SPLIT_INDEX(url_path, '/', 2),'-',' ')) as catalog_area_name , TRIM(REPLACE(SPLIT_INDEX(url_path, '/', 3),'-',' ')) as product_name from `Webstore-Normalized-Clicks-Stream` where TRIM(LOWER(SPLIT_INDEX(url_path, '/', 1))) = 'catalog'
-
Click Next, review the automatically generated output stream, and then click Next.
The output stream should be correct by default if you followed along with the tutorial so far.
-
Click Next, name the pipeline
Webstore-Product-Clicks-Pipeline
, and then click Create Pipeline.It can take a few minutes for the pipeline to be created.