Real-time data pipeline Decodable objects

This guide is part of a series that creates a real-time data pipeline with Astra Streaming and Decodable. For context and prerequisites, start here.

The Astra Streaming connection info

To connect Astra Streaming to Decodable, you need some information from your Astra Streaming tenant.

  1. In the Astra Portal header, click Applications, select Streaming, and then click your webstore-clicks streaming tenant.

    image4

  2. Click the Connect tab, and then scroll to the Tenant Details section.

    These values are required to connect to Decodable.

    image16

  3. Create a Pulsar token:

    1. In Tenant Details, click Token Manager, and then click Create Token.

    2. In the Copy Token dialog, copy the Token, and then store it securely.

    3. Click Close when you are done.

Creating a Decodable connection to Astra Streaming for all web clicks

In Decodable, you must create a connection and stream that will direct all web clicks to the correct topics in Astra Streaming.

  1. In new browser tab, sign in to your Decodable account.

  2. Click the Connections tab, and then click New Connection.

    image25

  3. In the Choose a Connector dialog, find the DataStax Astra Streaming connector, and then click Connect.

    image14

  4. Use the Astra Streaming tenant details from your other browser tab to complete the Decodable connection fields:

    • Connection Type: Select Sink.

    • Broker Service URL: Enter the Pulsar broker service URL from your Astra Streaming tenant.

    • Web Service URL: Enter the Pulsar web service URL from your Astra Streaming tenant.

    • Topic: Enter persistent://webstore-clicks/production/all-clicks.

    • Authentication Token: Enter the same token you used for your Astra Streaming sinks.

    • Value Format: Select JSON.

      image35

  5. Click Next, click New Stream, name the stream Webstore-Normalized-Clicks-Stream, and then click Next.

    image12

  6. In Define this Connection’s schema, select New Schema for the Schema Source, and then add fields with the following names and types:

    Name Type

    click_timestamp

    TIMESTAMP(0)

    url_host

    STRING

    url_protocol

    STRING

    url_path

    STRING

    url_query

    STRING

    browser_type

    STRING

    operating_system

    STRING

    visitor_id

    STRING

    image10

    For Type, you must select options from the dropdown menu in order for Decodable to accept the schema.

  7. Click Next, name the connection Astra-Streaming-All-Webclicks-Connection, and then click Create Connection.

    image26

Creating a Decodable connection to Astra Streaming for product web clicks

Create another connection in Decodable to stream product clicks.

  1. In Decodable, on the Connections tab, click New Connection.

  2. In the Choose a Connector dialog, find the DataStax Astra Streaming connector, and then click Connect.

  3. Use the Astra Streaming tenant details from your other browser tab to complete the Decodable connection fields. All values are the same as the other connection, except the Topic.

    • Connection Type: Select Sink.

    • Broker Service URL: Enter the Pulsar broker service URL from your Astra Streaming tenant.

    • Web Service URL: Enter the Pulsar web service URL from your Astra Streaming tenant.

    • Topic: Enter persistent://webstore-clicks/production/product-clicks.

    • Authentication Token: Enter the same token you used for your Astra Streaming sinks.

    • Value Format: Select JSON.

  4. Click Next, click New Stream, name the stream Webstore-Product-Clicks-Stream, and then click Next.

  5. In Define this Connection’s schema, select New Schema for the Schema Source, and then add fields with the following names and types:

    Name Type

    click_timestamp

    TIMESTAMP(0)

    catalog_area_name

    STRING

    product_name

    STRING

    image3

  6. Click Next, name the connection Astra-Streaming-Product-Webclicks-Connection, and then click Create Connection.

Creating an HTTP data ingestion source

Create a third connection to use Decodable’s REST API to ingest (POST) raw data into the pipeline:

  1. In Decodable, on the Connections tab, click New Connection.

  2. In the Choose a Connector dialog, find the REST connector, and then click Connect.

    image19

  3. On the Create your REST connector dialog, leave the default values for all fields, and then click Next.

    image27

  4. Click New Stream, enter the name Webstore-Raw-Clicks-Stream, and then click Next.

    image1

  5. In Define this Connection’s schema, select New Schema for the Schema Source, and then add fields with the following names and types:

    Name Type

    click_epoch

    BIGINT

    UTC_offset

    INT

    request_url

    STRING

    browser_agent

    STRING

    visitor_id

    STRING

    image6

  6. Click Next, name the connection Webstore-Raw-Clicks-Connection, and then click Create Connection.

    image29

In your REST connector’s settings, note that the Endpoint value contains a <connection_ID>, which is a dynamic value that is generated when the connection is created. Click the connector’s Details tab to see the resolved endpoint path, such as /v1alpha2/connections/7ef9055f/events. You will use this path with your account domain, such as user.api.decodable.co to create the full endpoint URL. For more information about the REST connector, see the Decodable documentation.

+ image7

You now have three connectors ready to use in your streaming pipeline.

image5

Creating a data normalization pipeline

In this part of the tutorial, you will create the core functions for your stream processing pipeline.

  1. In Decodable, go to Pipelines, and then click Create Pipeline.

  2. For the input stream, select Webstore-Raw-Clicks-Stream, and then click Next.

  3. In Define your data processing with SQL, delete the pre-populated SQL, and then enter the following SQL statement:

    insert into `Webstore-Normalized-Clicks-Stream`
    select
        CURRENT_TIMESTAMP as click_timestamp
        , PARSE_URL(request_url, 'HOST') as url_host
        , PARSE_URL(request_url, 'PROTOCOL') as url_protocol
        , PARSE_URL(request_url, 'PATH') as url_path
        , PARSE_URL(request_url, 'QUERY') as url_query
        , REGEXP_EXTRACT(browser_agent,'(MSIE|Trident|(?!Gecko.+)Firefox|(?!AppleWebKit.+Chrome.+)Safari(?!.+Edge)|(?!AppleWebKit.+)Chrome(?!.+Edge)|(?!AppleWebKit.+Chrome.+Safari.+)Edge|AppleWebKit(?!.+Chrome|.+Safari)|Gecko(?!.+Firefox))(?: |\/)([\d\.apre]+)') as browser_type
        , CASE
            WHEN (browser_agent like '%Win64%') THEN 'Windows'
            WHEN (browser_agent like '%Mac%') THEN 'Macintosh'
            WHEN (browser_agent like '%Linux%') THEN 'Linux'
            WHEN (browser_agent like '%iPhone%') THEN 'iPhone'
            WHEN (browser_agent like '%Android%') THEN 'Android'
            ELSE 'unknown'
          END as operating_system
        , visitor_id as visitor_id
    from `Webstore-Raw-Clicks-Stream`

    image17

  4. Click Next, review the automatically generated output stream, and then click Next.

    The output stream should be correct by default if you followed along with the tutorial so far.

    image23

  5. Click Next, name the pipeline Webstore-Raw-Clicks-Normalize-Pipeline, and then click Create Pipeline.

    It can take a few minutes for the pipeline to be created.

Creating a data filtering pipeline

Create a pipeline to separate product click data from all web click data:

  1. In Decodable, go to Pipelines, and then click Create Pipeline.

  2. For the input stream, select Webstore-Normalized-Clicks-Stream, and then click Next.

  3. In Define your data processing with SQL, delete the pre-populated SQL, and then enter the following SQL statement:

    insert into `Webstore-Product-Clicks-Stream`
    select
        click_timestamp
        , TRIM(REPLACE(SPLIT_INDEX(url_path, '/', 2),'-',' ')) as catalog_area_name
        , TRIM(REPLACE(SPLIT_INDEX(url_path, '/', 3),'-',' ')) as product_name
    from `Webstore-Normalized-Clicks-Stream`
    where TRIM(LOWER(SPLIT_INDEX(url_path, '/', 1))) = 'catalog'

    image33

  4. Click Next, review the automatically generated output stream, and then click Next.

    The output stream should be correct by default if you followed along with the tutorial so far.

    image32

  5. Click Next, name the pipeline Webstore-Product-Clicks-Pipeline, and then click Create Pipeline.

    It can take a few minutes for the pipeline to be created.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax | Privacy policy | Terms of use | Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com