Data

Data components define how data is processed in your flow. They can be used to fetch data from external sources, process data, or store data in memory.

API request

This component sends HTTP requests to the specified URLs.

Use this component to interact with external APIs or services and retrieve data. Ensure that the URLs are valid and that you configure the method, headers, body, and timeout correctly.

Parameters

Inputs
Name Display Name Info

URLs

URLs

The URLs to target

curl

curl

Paste a curl command to fill in the dictionary fields for headers and body

Method

HTTP Method

The HTTP method to use, such as GET or POST

Headers

Headers

The headers to include with the request

Body

Request Body

The data to send with the request (for methods like POST, PATCH, PUT)

Timeout

Timeout

The maximum time to wait for a response

Component code

APIRequest.py
import asyncio
import json
from typing import Any, List, Optional
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse

import httpx
from loguru import logger

from langflow.base.curl.parse import parse_context
from langflow.custom import Component
from langflow.io import DataInput, DropdownInput, IntInput, MessageTextInput, NestedDictInput, Output
from langflow.schema import Data
from langflow.schema.dotdict import dotdict


class APIRequestComponent(Component):
    display_name = "API Request"
    description = (
        "This component allows you to make HTTP requests to one or more URLs. "
        "You can provide headers and body as either dictionaries or Data objects. "
        "Additionally, you can append query parameters to the URLs.\n\n"
        "**Note:** Check advanced options for more settings."
    )
    icon = "Globe"
    name = "APIRequest"

    inputs = [
        MessageTextInput(
            name="urls",
            display_name="URLs",
            is_list=True,
            info="Enter one or more URLs, separated by commas.",
        ),
        MessageTextInput(
            name="curl",
            display_name="Curl",
            info="Paste a curl command to populate the fields. This will fill in the dictionary fields for headers and body.",
            advanced=False,
            refresh_button=True,
        ),
        DropdownInput(
            name="method",
            display_name="Method",
            options=["GET", "POST", "PATCH", "PUT"],
            value="GET",
            info="The HTTP method to use (GET, POST, PATCH, PUT).",
        ),
        NestedDictInput(
            name="headers",
            display_name="Headers",
            info="The headers to send with the request as a dictionary. This is populated when using the CURL field.",
            input_types=["Data"],
        ),
        NestedDictInput(
            name="body",
            display_name="Body",
            info="The body to send with the request as a dictionary (for POST, PATCH, PUT). This is populated when using the CURL field.",
            input_types=["Data"],
        ),
        DataInput(
            name="query_params",
            display_name="Query Parameters",
            info="The query parameters to append to the URL.",
        ),
        IntInput(
            name="timeout",
            display_name="Timeout",
            value=5,
            info="The timeout to use for the request.",
        ),
    ]

    outputs = [
        Output(display_name="Data", name="data", method="make_requests"),
    ]

    def parse_curl(self, curl: str, build_config: dotdict) -> dotdict:
        try:
            parsed = parse_context(curl)
            build_config["urls"]["value"] = [parsed.url]
            build_config["method"]["value"] = parsed.method.upper()
            build_config["headers"]["value"] = dict(parsed.headers)

            if parsed.data:
                try:
                    json_data = json.loads(parsed.data)
                    build_config["body"]["value"] = json_data
                except json.JSONDecodeError as e:
                    logger.error(f"Error decoding JSON data: {e}")
            else:
                build_config["body"]["value"] = {}
        except Exception as exc:
            logger.error(f"Error parsing curl: {exc}")
            raise ValueError(f"Error parsing curl: {exc}")
        return build_config

    def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
        if field_name == "curl" and field_value:
            build_config = self.parse_curl(field_value, build_config)
        return build_config

    async def make_request(
        self,
        client: httpx.AsyncClient,
        method: str,
        url: str,
        headers: Optional[dict] = None,
        body: Optional[dict] = None,
        timeout: int = 5,
    ) -> Data:
        method = method.upper()
        if method not in ["GET", "POST", "PATCH", "PUT", "DELETE"]:
            raise ValueError(f"Unsupported method: {method}")

        if isinstance(body, str) and body:
            try:
                body = json.loads(body)
            except Exception as e:
                logger.error(f"Error decoding JSON data: {e}")
                body = None
                raise ValueError(f"Error decoding JSON data: {e}")

        data = body if body else None

        try:
            response = await client.request(method, url, headers=headers, json=data, timeout=timeout)
            try:
                result = response.json()
            except Exception:
                result = response.text
            return Data(
                data={
                    "source": url,
                    "headers": headers,
                    "status_code": response.status_code,
                    "result": result,
                },
            )
        except httpx.TimeoutException:
            return Data(
                data={
                    "source": url,
                    "headers": headers,
                    "status_code": 408,
                    "error": "Request timed out",
                },
            )
        except Exception as exc:
            return Data(
                data={
                    "source": url,
                    "headers": headers,
                    "status_code": 500,
                    "error": str(exc),
                },
            )

    def add_query_params(self, url: str, params: dict) -> str:
        url_parts = list(urlparse(url))
        query = dict(parse_qsl(url_parts[4]))
        query.update(params)
        url_parts[4] = urlencode(query)
        return urlunparse(url_parts)

    async def make_requests(self) -> List[Data]:
        method = self.method
        urls = [url.strip() for url in self.urls if url.strip()]
        curl = self.curl
        headers = self.headers or {}
        body = self.body or {}
        timeout = self.timeout
        query_params = self.query_params.data if self.query_params else {}

        if curl:
            self._build_config = self.parse_curl(curl, dotdict())

        if isinstance(headers, Data):
            headers = headers.data

        if isinstance(body, Data):
            body = body.data

        bodies = [body] * len(urls)

        urls = [self.add_query_params(url, query_params) for url in urls]

        async with httpx.AsyncClient() as client:
            results = await asyncio.gather(
                *[self.make_request(client, method, u, headers, rec, timeout) for u, rec in zip(urls, bodies)]
            )
        self.status = results
        return results

File

The FileComponent is a class that loads and parses text files of various supported formats, converting the content into a Data object. It supports multiple file types and provides an option for silent error handling.

Parameters

Inputs
Name Display Name Info

path

Path

File path to load.

silent_errors

Silent Errors

If true, errors will not raise an exception

Outputs
Name Display Name Info

data

Data

Parsed content of the file as a Data object

Supported file extensions

The following file types are supported for processing:

Supported file extensions
Document formats
  • txt - Plain Text

  • md - Markdown

  • mdx - MDX (Markdown with JSX)

  • pdf - Portable Document Format

  • docx - Microsoft Word Document

Data formats
  • csv - Comma-Separated Values

  • json - JavaScript Object Notation

  • yaml, yml - YAML Ain’t Markup Language

Markup Languages
  • xml - Extensible Markup Language

  • html, htm - HyperText Markup Language

Programming Languages
  • py - Python

  • js - JavaScript

  • ts - TypeScript

  • tsx - TypeScript with JSX

  • sql - Structured Query Language

  • sh - Shell Script

Image Formats
  • jpg, jpeg - Joint Photographic Experts Group

  • png - Portable Network Graphics

  • gif - Graphics Interchange Format

  • bmp - Bitmap Image File

  • svg - Scalable Vector Graphics

Component code

File.py
from pathlib import Path

from langflow.base.data.utils import TEXT_FILE_TYPES, parse_text_file_to_data
from langflow.custom import Component
from langflow.io import BoolInput, FileInput, Output
from langflow.schema import Data


class FileComponent(Component):
    display_name = "File"
    description = "A generic file loader."
    icon = "file-text"
    name = "File"

    inputs = [
        FileInput(
            name="path",
            display_name="Path",
            file_types=TEXT_FILE_TYPES,
            info=f"Supported file types: {', '.join(TEXT_FILE_TYPES)}",
        ),
        BoolInput(
            name="silent_errors",
            display_name="Silent Errors",
            advanced=True,
            info="If true, errors will not raise an exception.",
        ),
    ]

    outputs = [
        Output(display_name="Data", name="data", method="load_file"),
    ]

    def load_file(self) -> Data:
        if not self.path:
            raise ValueError("Please, upload a file to use this component.")
        resolved_path = self.resolve_path(self.path)
        silent_errors = self.silent_errors

        extension = Path(resolved_path).suffix[1:].lower()

        if extension == "doc":
            raise ValueError("doc files are not supported. Please save as .docx")
        if extension not in TEXT_FILE_TYPES:
            raise ValueError(f"Unsupported file type: {extension}")

        data = parse_text_file_to_data(resolved_path, silent_errors)
        self.status = data if data else "No data"
        return data or Data()

URL

The URLComponent is a class that fetches content from one or more URLs, processes the content, and returns it as a list of Data objects. It ensures that the provided URLs are valid and uses WebBaseLoader to fetch the content.

Parameters

Inputs
Name Display Name Info

urls

URLs

Enter one or more URLs

Outputs
Name Display Name Info

data

Data

List of Data objects containing fetched content and metadata

Component code

URL.py
import re

from langchain_community.document_loaders.web_base import WebBaseLoader

from langflow.helpers.data import data_to_text
from langflow.custom import Component
from langflow.io import MessageTextInput, Output
from langflow.schema import Data
from langflow.schema.message import Message


class URLComponent(Component):
    display_name = "URL"
    description = "Fetch content from one or more URLs."
    icon = "layout-template"
    name = "URL"

    inputs = [
        MessageTextInput(
            name="urls",
            display_name="URLs",
            info="Enter one or more URLs, by clicking the '+' button.",
            is_list=True,
        ),
    ]

    outputs = [
        Output(display_name="Data", name="data", method="fetch_content"),
        Output(display_name="Text", name="text", method="fetch_content_text"),
    ]

    def ensure_url(self, string: str) -> str:
        """
        Ensures the given string is a URL by adding 'http://' if it doesn't start with 'http://' or 'https://'.
        Raises an error if the string is not a valid URL.

        Parameters:
            string (str): The string to be checked and possibly modified.

        Returns:
            str: The modified string that is ensured to be a URL.

        Raises:
            ValueError: If the string is not a valid URL.
        """
        if not string.startswith(("http://", "https://")):
            string = "http://" + string

        # Basic URL validation regex
        url_regex = re.compile(
            r"^(https?:\/\/)?"  # optional protocol
            r"(www\.)?"  # optional www
            r"([a-zA-Z0-9.-]+)"  # domain
            r"(\.[a-zA-Z]{2,})?"  # top-level domain
            r"(:\d+)?"  # optional port
            r"(\/[^\s]*)?$",  # optional path
            re.IGNORECASE,
        )

        if not url_regex.match(string):
            raise ValueError(f"Invalid URL: {string}")

        return string

    def fetch_content(self) -> list[Data]:
        urls = [self.ensure_url(url.strip()) for url in self.urls if url.strip()]
        loader = WebBaseLoader(web_paths=urls, encoding="utf-8")
        docs = loader.load()
        data = [Data(text=doc.page_content, **doc.metadata) for doc in docs]
        self.status = data
        return data

    def fetch_content_text(self) -> Message:
        data = self.fetch_content()

        result_string = data_to_text("{text}", data)
        self.status = result_string
        return Message(text=result_string)

Webhook Input

This component defines a webhook input for the flow. The flow can be triggered by an external HTTP POST request (webhook) sending a JSON payload.

If the input is not valid JSON, the component will wrap it in a "payload" field. The component’s status will reflect any errors or the processed data.

Parameters

Inputs
Name Type Description

data

String

JSON payload for testing the webhook component

Outputs
Name Type Description

output_data

Data

Processed data from the webhook input

Component code

Webhook.py
import json

from langflow.custom import Component
from langflow.io import MultilineInput, Output
from langflow.schema import Data


class WebhookComponent(Component):
    display_name = "Webhook Input"
    description = "Defines a webhook input for the flow."
    name = "Webhook"

    inputs = [
        MultilineInput(
            name="data",
            display_name="Data",
            info="Use this field to quickly test the webhook component by providing a JSON payload.",
        )
    ]
    outputs = [
        Output(display_name="Data", name="output_data", method="build_data"),
    ]

    def build_data(self) -> Data:
        message: str | Data = ""
        if not self.data:
            self.status = "No data provided."
            return Data(data={})
        try:
            body = json.loads(self.data or "{}")
        except json.JSONDecodeError:
            body = {"payload": self.data}
            message = f"Invalid JSON payload. Please check the format.\n\n{self.data}"
        data = Data(data=body)
        if not message:
            message = data
        self.status = message
        return data

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com