Real-time data pipeline Decodable objects
|
This guide is part of a series that creates a real-time data pipeline with Astra Streaming and Decodable. For context and prerequisites, start here. |
The Astra Streaming connection info
To connect Astra Streaming to Decodable, you need some information from your Astra Streaming tenant.
-
In the Astra Portal header, click Applications, select Streaming, and then click your
webstore-clicksstreaming tenant.
-
Click the Connect tab, and then scroll to the Tenant Details section.
These values are required to connect to Decodable.

-
Create a Pulsar token:
-
In Tenant Details, click Token Manager, and then click Create Token.
-
In the Copy Token dialog, copy the Token, and then store it securely.
-
Click Close when you are done.
-
Creating a Decodable connection to Astra Streaming for all web clicks
In Decodable, you must create a connection and stream that will direct all web clicks to the correct topics in Astra Streaming.
-
In new browser tab, sign in to your Decodable account.
-
Click the Connections tab, and then click New Connection.

-
In the Choose a Connector dialog, find the DataStax Astra Streaming connector, and then click Connect.

-
Use the Astra Streaming tenant details from your other browser tab to complete the Decodable connection fields:
-
Connection Type: Select Sink.
-
Broker Service URL: Enter the Pulsar broker service URL from your Astra Streaming tenant.
-
Web Service URL: Enter the Pulsar web service URL from your Astra Streaming tenant.
-
Topic: Enter
persistent://webstore-clicks/production/all-clicks. -
Authentication Token: Enter the same token you used for your Astra Streaming sinks.
-
Value Format: Select JSON.

-
-
Click Next, click New Stream, name the stream
Webstore-Normalized-Clicks-Stream, and then click Next.
-
In Define this Connection’s schema, select New Schema for the Schema Source, and then add fields with the following names and types:
Name Type click_timestampTIMESTAMP(0)
url_hostSTRING
url_protocolSTRING
url_pathSTRING
url_querySTRING
browser_typeSTRING
operating_systemSTRING
visitor_idSTRING

For Type, you must select options from the dropdown menu in order for Decodable to accept the schema.
-
Click Next, name the connection
Astra-Streaming-All-Webclicks-Connection, and then click Create Connection.
Creating a Decodable connection to Astra Streaming for product web clicks
Create another connection in Decodable to stream product clicks.
-
In Decodable, on the Connections tab, click New Connection.
-
In the Choose a Connector dialog, find the DataStax Astra Streaming connector, and then click Connect.
-
Use the Astra Streaming tenant details from your other browser tab to complete the Decodable connection fields. All values are the same as the other connection, except the Topic.
-
Connection Type: Select Sink.
-
Broker Service URL: Enter the Pulsar broker service URL from your Astra Streaming tenant.
-
Web Service URL: Enter the Pulsar web service URL from your Astra Streaming tenant.
-
Topic: Enter
persistent://webstore-clicks/production/product-clicks. -
Authentication Token: Enter the same token you used for your Astra Streaming sinks.
-
Value Format: Select JSON.
-
-
Click Next, click New Stream, name the stream
Webstore-Product-Clicks-Stream, and then click Next. -
In Define this Connection’s schema, select New Schema for the Schema Source, and then add fields with the following names and types:
Name Type click_timestampTIMESTAMP(0)
catalog_area_nameSTRING
product_nameSTRING

-
Click Next, name the connection
Astra-Streaming-Product-Webclicks-Connection, and then click Create Connection.
Creating an HTTP data ingestion source
Create a third connection to use Decodable’s REST API to ingest (POST) raw data into the pipeline:
-
In Decodable, on the Connections tab, click New Connection.
-
In the Choose a Connector dialog, find the REST connector, and then click Connect.

-
On the Create your REST connector dialog, leave the default values for all fields, and then click Next.

-
Click New Stream, enter the name
Webstore-Raw-Clicks-Stream, and then click Next.
-
In Define this Connection’s schema, select New Schema for the Schema Source, and then add fields with the following names and types:
Name Type click_epochBIGINT
UTC_offsetINT
request_urlSTRING
browser_agentSTRING
visitor_idSTRING

-
Click Next, name the connection
Webstore-Raw-Clicks-Connection, and then click Create Connection.
In your REST connector’s settings, note that the Endpoint value contains a <connection_ID>, which is a dynamic value that is generated when the connection is created.
Click the connector’s Details tab to see the resolved endpoint path, such as /v1alpha2/connections/7ef9055f/events.
You will use this path with your account domain, such as user.api.decodable.co to create the full endpoint URL.
For more information about the REST connector, see the Decodable documentation.
+

You now have three connectors ready to use in your streaming pipeline.

Creating a data normalization pipeline
In this part of the tutorial, you will create the core functions for your stream processing pipeline.
-
In Decodable, go to Pipelines, and then click Create Pipeline.
-
For the input stream, select Webstore-Raw-Clicks-Stream, and then click Next.
-
In Define your data processing with SQL, delete the pre-populated SQL, and then enter the following SQL statement:
insert into `Webstore-Normalized-Clicks-Stream` select CURRENT_TIMESTAMP as click_timestamp , PARSE_URL(request_url, 'HOST') as url_host , PARSE_URL(request_url, 'PROTOCOL') as url_protocol , PARSE_URL(request_url, 'PATH') as url_path , PARSE_URL(request_url, 'QUERY') as url_query , REGEXP_EXTRACT(browser_agent,'(MSIE|Trident|(?!Gecko.+)Firefox|(?!AppleWebKit.+Chrome.+)Safari(?!.+Edge)|(?!AppleWebKit.+)Chrome(?!.+Edge)|(?!AppleWebKit.+Chrome.+Safari.+)Edge|AppleWebKit(?!.+Chrome|.+Safari)|Gecko(?!.+Firefox))(?: |\/)([\d\.apre]+)') as browser_type , CASE WHEN (browser_agent like '%Win64%') THEN 'Windows' WHEN (browser_agent like '%Mac%') THEN 'Macintosh' WHEN (browser_agent like '%Linux%') THEN 'Linux' WHEN (browser_agent like '%iPhone%') THEN 'iPhone' WHEN (browser_agent like '%Android%') THEN 'Android' ELSE 'unknown' END as operating_system , visitor_id as visitor_id from `Webstore-Raw-Clicks-Stream`
-
Click Next, review the automatically generated output stream, and then click Next.
The output stream should be correct by default if you followed along with the tutorial so far.

-
Click Next, name the pipeline
Webstore-Raw-Clicks-Normalize-Pipeline, and then click Create Pipeline.It can take a few minutes for the pipeline to be created.
Creating a data filtering pipeline
Create a pipeline to separate product click data from all web click data:
-
In Decodable, go to Pipelines, and then click Create Pipeline.
-
For the input stream, select Webstore-Normalized-Clicks-Stream, and then click Next.
-
In Define your data processing with SQL, delete the pre-populated SQL, and then enter the following SQL statement:
insert into `Webstore-Product-Clicks-Stream` select click_timestamp , TRIM(REPLACE(SPLIT_INDEX(url_path, '/', 2),'-',' ')) as catalog_area_name , TRIM(REPLACE(SPLIT_INDEX(url_path, '/', 3),'-',' ')) as product_name from `Webstore-Normalized-Clicks-Stream` where TRIM(LOWER(SPLIT_INDEX(url_path, '/', 1))) = 'catalog'
-
Click Next, review the automatically generated output stream, and then click Next.
The output stream should be correct by default if you followed along with the tutorial so far.

-
Click Next, name the pipeline
Webstore-Product-Clicks-Pipeline, and then click Create Pipeline.It can take a few minutes for the pipeline to be created.