Real-time data pipeline Decodable objects

This guide is part of a series that creates a real-time data pipeline with Astra and Decodable. For context and prerequisites, start here.

The Astra Streaming connection info

To connect Astra to Decodable, you need some information from your Astra Streaming tenant. From the Astra portal home, navigate to the webstore-clicks streaming tenant.

image4

In the “Connect” tab, scroll down to the “Tenant Details” area. You will need these values to connect to Decodable.

image16

You will also need to create a Pulsar token. Click the “Token Manager” link and “Create Token”. A popup window will appear with the token value. Click the clipboard icon to copy the token, and paste it to a temporary safe location like Notepad.

image31

Close the popup and navigate back to the “Connect” tab. We’ll come back here in a bit.

Creating a Decodable connection to Astra Streaming for all web clicks

In a different browser tab, sign in to your Decodable account and navigate to the “Connections” tab. We are going to create a new connection and stream, that will direct all web clicks to the correct topic in Astra.

  1. Click the “New Connection” button

    image25

  2. In the “Choose a Connector” view, locate the DataStax Astra Streaming connector and click “Connect”.

    image14

  3. Use the Tenant Details from Astra (in your other browser tab) to fill in the details about the connection as follows:

    Connection Type

    Sink

    Broker Service Url

    (from astra tenant)

    Web Service Url

    (from astra tenant)

    Topic

    persistent://webstore-clicks/production/all-clicks

    Authentication Token

    (from astra tenant)

    Value Format

    JSON

    image35

  4. Click “Next” and choose “New Stream”. Name the new stream “Webstore-Normalized-Clicks-Stream”.

    image12

  5. Click “Next” to build the following schema.

    click_timestamp

    TIMESTAMP(0)

    url_host

    STRING

    url_protocol

    STRING

    url_path

    STRING

    url_query

    STRING

    browser_type

    STRING

    operating_system

    STRING

    visitor_id

    STRING

    image10

    You have to actually select "string" for the data type in the dropdown menu, or Decodable will not accept the schema.

  6. Click “Next” and name the overall connection “Astra-Streaming-All-Webclicks-Connection”.

    image26

  7. You now have a ready-to-go Astra Streaming connection! Oh, the things we can do!!!

    image13

Creating a Decodable connection to Astra Streaming for product web clicks

Create another connection to stream product clicks.

  1. Navigate back to the “Connections” area and click “New Connection”.

  2. Just as previously done, locate the Astra Streaming connection and use the Tenant Details from Astra to fill in the details about the connection as follows. Note the change in topic name.

    Connection Type

    Sink

    Broker Service Url

    (from astra tenant)

    Web Service Url

    (from astra tenant)

    Topic

    persistent://webstore-clicks/production/product-clicks

    Authentication Token

    (from astra tenant)

    Value Format

    JSON

    image21

  3. Click “Next” and create a “New Stream”. Name it “Webstore-Product-Clicks-Stream”

  4. Click “Next” and build the following schema.

    click_timestamp

    TIMESTAMP(0)

    catalog_area_name

    STRING

    product_name

    STRING

    image3

    You have to actually select "string" for the data type in the dropdown menu, or Decodable will not accept the schema.

  5. Click “Next” and name the overall connection “Astra-Streaming-Product-Webclicks-Connection”.

    image15

  6. Click “Create Connection” to finalize the connection.

    image34

Creating an HTTP data ingestion source

We need to create one more connection. We’ll use Decodable’s REST connector to POST (or ingest) raw data into the pipeline.

  1. Navigate to the “Connections” area and click “New Connection”.

  2. Locate the “REST” connection and click “Connect”.

    image19

  3. Leave all the settings as default.

    image27

  4. Click “Next” and create a “New Stream”. Name it “Webstore-Raw-Clicks-Stream”.

    image1

  5. Click “Next” and fill in the following schema.

    click_epoch

    BIGINT

    UTC_offset

    INT

    request_url

    STRING

    browser_agent

    STRING

    visitor_id

    STRING

    image6

    You have to actually select "string" for the data type in the dropdown menu, or Decodable will not accept the schema.

  6. Click “Next” and name the overall connection “Webstore-Raw-Clicks-Connection”.

    image29

  7. Click “Create Connection”.

    image24

  8. Notice in the connector settings how the “Endpoint” value has a “<connection-id>” value. This is a dynamic value that is generated when the connection is created. Navigate to the “Details” tab of the connection and you will see the final endpoint value. We’ll prefix that value with our account info (ddieruf.api.decodable.co) to create a usable URL. Learn more about the REST connector in the Decodable documentation.

    image7

You now have 3 connections ready to go.

image5

Creating a data normalization pipeline

Now we are going to create the core functions for our stream processing.

  1. Navigate to the “Pipelines” area and click “Create Pipeline”.

    image9

  2. Choose an input of “Webstore-Raw-Clicks-Stream” and click “Next”.

    image28

  3. Clear the existing SQL and copy/paste the following into the “SQL” area.

    insert into `Webstore-Normalized-Clicks-Stream`
    select
        CURRENT_TIMESTAMP as click_timestamp
        , PARSE_URL(request_url, 'HOST') as url_host
        , PARSE_URL(request_url, 'PROTOCOL') as url_protocol
        , PARSE_URL(request_url, 'PATH') as url_path
        , PARSE_URL(request_url, 'QUERY') as url_query
        , REGEXP_EXTRACT(browser_agent,'(MSIE|Trident|(?!Gecko.+)Firefox|(?!AppleWebKit.+Chrome.+)Safari(?!.+Edge)|(?!AppleWebKit.+)Chrome(?!.+Edge)|(?!AppleWebKit.+Chrome.+Safari.+)Edge|AppleWebKit(?!.+Chrome|.+Safari)|Gecko(?!.+Firefox))(?: |\/)([\d\.apre]+)') as browser_type
        , CASE
            WHEN (browser_agent like '%Win64%') THEN 'Windows'
            WHEN (browser_agent like '%Mac%') THEN 'Macintosh'
            WHEN (browser_agent like '%Linux%') THEN 'Linux'
            WHEN (browser_agent like '%iPhone%') THEN 'iPhone'
            WHEN (browser_agent like '%Android%') THEN 'Android'
            ELSE 'unknown'
          END as operating_system
        , visitor_id as visitor_id
    from `Webstore-Raw-Clicks-Stream`

    image17

  4. Click “Next” and review the auto-generated output stream. Thank you Decodable!

    image23

  5. Click “Next” and name the pipeline “Webstore-Raw-Clicks-Normalize-Pipeline”.

    image11

  6. Click “Create Pipeline” and be patient, it might take a few seconds.

    image20

Creating a data filtering pipeline

Create one more pipeline to filter out product click data.

  1. Navigate to the “Pipelines” area and click “New Pipeline”.

  2. Choose the “Webstore-Normalized-Clicks-Stream” as the input.

    image22

  3. Clear the SQL from the window and copy/paste the following into the “SQL” window.

    insert into `Webstore-Product-Clicks-Stream`
    select
        click_timestamp
        , TRIM(REPLACE(SPLIT_INDEX(url_path, '/', 2),'-',' ')) as catalog_area_name
        , TRIM(REPLACE(SPLIT_INDEX(url_path, '/', 3),'-',' ')) as product_name
    from `Webstore-Normalized-Clicks-Stream`
    where TRIM(LOWER(SPLIT_INDEX(url_path, '/', 1))) = 'catalog'

    image33

  4. Click “Next” and review the auto-generated output stream. Thank you again Decodable!

    image32

  5. Click “Next” and name the pipeline “Webstore-Product-Clicks-Pipeline”.

    image18

  6. Now we have a pipeline ready to filter by product.

    image30

Next step

Now it’s time see the magic! Run the pipelines >>

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com