Putting the real-time data pipeline to work

This guide is part of a series that creates a real-time data pipeline with Astra and Decodable. For context and prerequisites, start here.

Now that we have all the pieces of our data processing pipeline in place, it’s time to start the connection and pipelines up and input some test data.

Starting the processing

  1. Navigate to the “Connections” area and click the three dots at the right for each connection. Click the “Start” option on all 3 connections.

    image9

  2. Be patient. It might take a minute or so, but each connection should refresh with a state of “Running”.

    image1

    If one of the connections has an issue starting up (like an incorrect setting or expired token), click on that connection for more information.
  3. Navigate to the “Pipelines” area and use the same three-dot menu on each pipeline to start. As with the connections, they might take a minute or so to get going. Grab a coffee while you wait - you’ve earned it.

    image3

Before ingesting data, let’s make sure we have all the pieces in order…​

  • REST connection running? CHECK!

  • Astra Streaming connections running? CHECK!

  • Normalization pipeline running? CHECK!

  • Product clicks filter pipeline running? CHECK!

Your first ingested data

  1. Navigate to the “Connections” area and click the “REST” connector.

  2. Choose the “Upload” tab and copy/paste the following web click data into the window.

    [
      {
        "click_epoch": 1655303592179,
        "request_url": "https://somedomain.com/catalog/area1/yetanother-cool-product?a=b&c=d",
        "visitor_id": "b56afbf3-321f-49c1-919c-b2ea3e550b07",
        "UTC_offset": -4,
        "browser_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
      }
    ]
  3. Click “Upload” to simulate data being posted to the endpoint. You will receive a confirmation that data has been received.

No, this was not the big moment with cheers and balloons - the celebration is at the end of the next area.

Following the flow

For this first record of data, let’s look at each step along the way and confirm processing is working.

  1. After the data was ingested, the “Webstore-Raw-Clicks-Normalize-Pipeline” received it. You can confirm this by inspecting the “Webstore-Raw-Clicks-Normalize-Pipeline” pipeline metrics. The “Input Metrics” and "Output Metrics" areas report that one record has been received. This confirms that the data passed successfully through this pipeline.

    image2

  2. In the “Connections” area, click the “Astra-Streaming-All-Webclicks-Connector”. In “Input Metrics”, we see that 1 record has been received.

    image4

  3. Return to your Astra Streaming tenant “webstore-clicks” and navigate to the “Namespace and Topics” area. Expand the “production” namespace and select the “all-clicks” topic. Confirm that “Data In” has 1 message and “Data Out” has 1 message. This means the topic took the data in and a consumer acknowledged receipt of the message.

    image6

  4. In the “Sinks” tab in Astra, select the “all-clicks” sink. In “Instance Stats” you see “Reads” has a value of 1 and “Writes” has a value of 1. This means the sink consumed a message from the topic and wrote the data to the store.

    image5

  5. Finally, let’s look at the final data in your Astra database. Navigate to the Astra home and click the “webstore-clicks” Serverless Database. Choose the “CQL Console” tab and copy/paste the following command in the terminal.

    • CQL

    • Result

    select * from click_data.all_clicks;
    token@cqlsh> EXPAND ON; //this cleans up the output
    Now Expanded output is enabled
    token@cqlsh> select * from click_data.all_clicks;
    @ Row 1
    ------------------+----------------------------------------
     operating_system | Windows
     browser_type     | Chrome/102.0.0.0
     url_host         | somedomain.com
     url_path         | /catalog/area1/yetanother-cool-product
     click_timestamp  | 1675286722000
     url_protocol     | https
     url_query        | a=b&c=d
     visitor_id       | b56afbf3-321f-49c1-919c-b2ea3e550b07
    
    (1 rows)

This confirms that the data was successfully written to the database.

🎉🎉 Cue the cheers and high-fives! Our pipeline ingested raw web click data, normalized it, and persisted the parsed data to the database! Woot woot!!

Follow the flow of the product clicks data

Similar to how you followed the above flow of raw click data, follow this flow to confirm the filtered messages were stored.

  1. Navigate to your Decodable pipeline named “Webstore-Product-Clicks-Pipeline”.

    1. The “Input Metrics” should be 1 record and the “Output Metrics” should be 1 record.

  2. Navigate to your Decodable connection named “Astra-Streaming-Product-Webclicks-Connection”.

    1. The “Input Metrics” should be 1 record.

  3. Navigate to your Astra tenant and check the production/product-clicks topic.

    1. There should be 1 message in “Data In” and 1 message in “Data Out”.

  4. Finally, navigate to your Astra database CQL Console.

    1. Query the product_clicks table:

      • CQL

      • Result

      select * from click_data.product_clicks;
      @ Row 1
      -------------------+---------------------------------
       catalog_area_name | area1
       product_name      | yetanother cool product
       click_timestamp   | 2023-02-01 21:25:22.000000+0000

      🚀🚀 Yesssss! The first web click data we entered happened to be a product click, so the data was filtered in the pipeline and processed into the correct table!

Example real-time site data

Let’s see what this can do! To put a load on the pipeline, we’ll need a way to continuously post data to our endpoint. Below are a few examples.

  1. Use the download button below to download a zip of a static HTML e-commerce catalog that silently posts click data to an endpoint. The site is a copy of BlazeMeter’s Demoblaze site.

  2. Extract the zip, open the folder in your text editor or IDE of choice, and open the "script.js" file. There are 2 placeholders for data you’ll need to retrieve from Decodable: the Endpoint URL and an authorization token.

    function post_click(url){
      let decodable_token = "access token: <value retrieved from access_token in .decodable/auth>";
      let endpoint_url = "https://ddieruf.api.decodable.co/v1alpha2/connections/4f003544/events";

    Learn more about retrieving the Endpoint URL and auth token in the Decodable documentation.

  3. Replace the placeholders with your retrieved values and save "script.js".

  4. Open the "phones.html" file in your browser (yes, as a local file) and begin clicking on products. Each click should be a new post to your Decodable endpoint.

    image10

Next step

Continue on to the last step for debugging and cleanup! Next >>

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com