Data

Data components load data from a source into your flow.

They may perform some processing or type checking, like converting raw HTML data into text, or ensuring your loaded file is of an acceptable type.

Use a data component in a flow

The URL data component loads content from a list of URLs.

In the component’s URLs field, enter a comma-separated list of URLs you want to load. Alternatively, connect a component that outputs the Message type, like the Chat Input component, to supply your URLs with a component.

To output a Data type, in the Output Format dropdown, select Raw HTML. To output a Message type, in the Output Format dropdown, select Text. This option applies postprocessing with the data_to_text helper function.

In this example of a document ingestion pipeline, the URL component outputs raw HTML to a text splitter, which splits the raw content into chunks for a vector database to ingest.

URL component

API Request

This component makes HTTP requests using URLs or cURL commands.

Parameters

Inputs
Name Display Name Info

urls

URLs

Enter one or more URLs, separated by commas.

curl

cURL

Paste a curl command to populate the fields. This completes the dictionary fields for headers and body.

method

Method

The HTTP method to use.

use_curl

Use cURL

Enable cURL mode to populate fields from a cURL command.

query_params

Query Parameters

The query parameters to append to the URL.

body

Body

The body sent with the request as a dictionary (for POST, PATCH, PUT).

headers

Headers

The headers sent with the request as a dictionary.

timeout

Timeout

The timeout specified for the request.

follow_redirects

Follow Redirects

Whether to follow http redirects.

include_httpx_metadata

Include HTTPx Metadata

Include properties such as headers, status_code, response_headers, and redirection_history in the output.

Outputs
Name Display Name Info

data

Data

The result of the API requests.

Component code

api_request.py
import asyncio
import json
import re
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse

import aiofiles
import aiofiles.os as aiofiles_os
import httpx
import validators

from langflow.base.curl.parse import parse_context
from langflow.custom import Component
from langflow.io import (
    BoolInput,
    DataInput,
    DropdownInput,
    FloatInput,
    IntInput,
    MessageTextInput,
    MultilineInput,
    Output,
    StrInput,
    TableInput,
)
from langflow.schema import Data
from langflow.schema.dotdict import dotdict


class APIRequestComponent(Component):
    display_name = "API Request"
    description = "Make HTTP requests using URLs or cURL commands."
    icon = "Globe"
    name = "APIRequest"

    default_keys = ["urls", "method", "query_params"]

    inputs = [
        MessageTextInput(
            name="urls",
            display_name="URLs",
            list=True,
            info="Enter one or more URLs, separated by commas.",
            advanced=False,
            tool_mode=True,
        ),
        MultilineInput(
            name="curl",
            display_name="cURL",
            info=(
                "Paste a curl command to populate the fields. "
                "This will fill in the dictionary fields for headers and body."
            ),
            advanced=True,
            real_time_refresh=True,
            tool_mode=True,
        ),
        DropdownInput(
            name="method",
            display_name="Method",
            options=["GET", "POST", "PATCH", "PUT", "DELETE"],
            info="The HTTP method to use.",
            real_time_refresh=True,
        ),
        BoolInput(
            name="use_curl",
            display_name="Use cURL",
            value=False,
            info="Enable cURL mode to populate fields from a cURL command.",
            real_time_refresh=True,
        ),
        DataInput(
            name="query_params",
            display_name="Query Parameters",
            info="The query parameters to append to the URL.",
            advanced=True,
        ),
        TableInput(
            name="body",
            display_name="Body",
            info="The body to send with the request as a dictionary (for POST, PATCH, PUT).",
            table_schema=[
                {
                    "name": "key",
                    "display_name": "Key",
                    "type": "str",
                    "description": "Parameter name",
                },
                {
                    "name": "value",
                    "display_name": "Value",
                    "description": "Parameter value",
                },
            ],
            value=[],
            input_types=["Data"],
            advanced=True,
        ),
        TableInput(
            name="headers",
            display_name="Headers",
            info="The headers to send with the request as a dictionary.",
            table_schema=[
                {
                    "name": "key",
                    "display_name": "Header",
                    "type": "str",
                    "description": "Header name",
                },
                {
                    "name": "value",
                    "display_name": "Value",
                    "type": "str",
                    "description": "Header value",
                },
            ],
            value=[],
            advanced=True,
            input_types=["Data"],
        ),
        IntInput(
            name="timeout",
            display_name="Timeout",
            value=5,
            info="The timeout to use for the request.",
            advanced=True,
        ),
        BoolInput(
            name="follow_redirects",
            display_name="Follow Redirects",
            value=True,
            info="Whether to follow http redirects.",
            advanced=True,
        ),
        BoolInput(
            name="save_to_file",
            display_name="Save to File",
            value=False,
            info="Save the API response to a temporary file",
            advanced=True,
        ),
        BoolInput(
            name="include_httpx_metadata",
            display_name="Include HTTPx Metadata",
            value=False,
            info=(
                "Include properties such as headers, status_code, response_headers, "
                "and redirection_history in the output."
            ),
            advanced=True,
        ),
    ]

    outputs = [
        Output(display_name="Data", name="data", method="make_requests"),
    ]

    def _parse_json_value(self, value: Any) -> Any:
        """Parse a value that might be a JSON string."""
        if not isinstance(value, str):
            return value

        try:
            parsed = json.loads(value)
        except json.JSONDecodeError:
            return value
        else:
            return parsed

    def _process_body(self, body: Any) -> dict:
        """Process the body input into a valid dictionary.

        Args:
            body: The body to process, can be dict, str, or list
        Returns:
            Processed dictionary
        """
        if body is None:
            return {}
        if isinstance(body, dict):
            return self._process_dict_body(body)
        if isinstance(body, str):
            return self._process_string_body(body)
        if isinstance(body, list):
            return self._process_list_body(body)

        return {}

    def _process_dict_body(self, body: dict) -> dict:
        """Process dictionary body by parsing JSON values."""
        return {k: self._parse_json_value(v) for k, v in body.items()}

    def _process_string_body(self, body: str) -> dict:
        """Process string body by attempting JSON parse."""
        try:
            return self._process_body(json.loads(body))
        except json.JSONDecodeError:
            return {"data": body}

    def _process_list_body(self, body: list) -> dict:
        """Process list body by converting to key-value dictionary."""
        processed_dict = {}

        try:
            for item in body:
                if not self._is_valid_key_value_item(item):
                    continue

                key = item["key"]
                value = self._parse_json_value(item["value"])
                processed_dict[key] = value

        except (KeyError, TypeError, ValueError) as e:
            self.log(f"Failed to process body list: {e}")
            return {}  # Return empty dictionary instead of None

        return processed_dict

    def _is_valid_key_value_item(self, item: Any) -> bool:
        """Check if an item is a valid key-value dictionary."""
        return isinstance(item, dict) and "key" in item and "value" in item

    def parse_curl(self, curl: str, build_config: dotdict) -> dotdict:
        """Parse a cURL command and update build configuration.

        Args:
            curl: The cURL command to parse
            build_config: The build configuration to update
        Returns:
            Updated build configuration
        """
        try:
            parsed = parse_context(curl)

            # Update basic configuration
            build_config["urls"]["value"] = [parsed.url]
            build_config["method"]["value"] = parsed.method.upper()
            build_config["headers"]["advanced"] = True
            build_config["body"]["advanced"] = True

            # Process headers
            headers_list = [{"key": k, "value": v} for k, v in parsed.headers.items()]
            build_config["headers"]["value"] = headers_list

            if headers_list:
                build_config["headers"]["advanced"] = False

            # Process body data
            if not parsed.data:
                build_config["body"]["value"] = []
            elif parsed.data:
                try:
                    json_data = json.loads(parsed.data)
                    if isinstance(json_data, dict):
                        body_list = [
                            {"key": k, "value": json.dumps(v) if isinstance(v, dict | list) else str(v)}
                            for k, v in json_data.items()
                        ]
                        build_config["body"]["value"] = body_list
                        build_config["body"]["advanced"] = False
                    else:
                        build_config["body"]["value"] = [{"key": "data", "value": json.dumps(json_data)}]
                        build_config["body"]["advanced"] = False
                except json.JSONDecodeError:
                    build_config["body"]["value"] = [{"key": "data", "value": parsed.data}]
                    build_config["body"]["advanced"] = False

        except Exception as exc:
            msg = f"Error parsing curl: {exc}"
            self.log(msg)
            raise ValueError(msg) from exc

        return build_config

    def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None) -> dotdict:
        if field_name == "use_curl":
            build_config = self._update_curl_mode(build_config, use_curl=field_value)

            # Fields that should not be reset
            preserve_fields = {"timeout", "follow_redirects", "save_to_file", "include_httpx_metadata", "use_curl"}

            # Mapping between input types and their reset values
            type_reset_mapping = {
                TableInput: [],
                BoolInput: False,
                IntInput: 0,
                FloatInput: 0.0,
                MessageTextInput: "",
                StrInput: "",
                MultilineInput: "",
                DropdownInput: "GET",
                DataInput: {},
            }

            for input_field in self.inputs:
                # Only reset if field is not in preserve list
                if input_field.name not in preserve_fields:
                    reset_value = type_reset_mapping.get(type(input_field), None)
                    build_config[input_field.name]["value"] = reset_value
                    self.log(f"Reset field {input_field.name} to {reset_value}")
        elif field_name == "method" and not self.use_curl:
            build_config = self._update_method_fields(build_config, field_value)
        elif field_name == "curl" and self.use_curl and field_value:
            build_config = self.parse_curl(field_value, build_config)
        return build_config

    def _update_curl_mode(self, build_config: dotdict, *, use_curl: bool) -> dotdict:
        always_visible = ["method", "use_curl"]

        for field in self.inputs:
            field_name = field.name
            field_config = build_config.get(field_name)
            if isinstance(field_config, dict):
                if field_name in always_visible:
                    field_config["advanced"] = False
                elif field_name == "urls":
                    field_config["advanced"] = use_curl
                elif field_name == "curl":
                    field_config["advanced"] = not use_curl
                    field_config["real_time_refresh"] = use_curl
                elif field_name in ["body", "headers"]:
                    field_config["advanced"] = True  # Always keep body and headers in advanced when use_curl is False
                else:
                    field_config["advanced"] = use_curl
            else:
                self.log(f"Expected dict for build_config[{field_name}], got {type(field_config).__name__}")

        if not use_curl:
            current_method = build_config.get("method", {}).get("value", "GET")
            build_config = self._update_method_fields(build_config, current_method)

        return build_config

    def _update_method_fields(self, build_config: dotdict, method: str) -> dotdict:
        common_fields = [
            "urls",
            "method",
            "use_curl",
        ]

        always_advanced_fields = [
            "body",
            "headers",
            "timeout",
            "follow_redirects",
            "save_to_file",
            "include_httpx_metadata",
        ]

        body_fields = ["body"]

        for field in self.inputs:
            field_name = field.name
            field_config = build_config.get(field_name)
            if isinstance(field_config, dict):
                if field_name in common_fields:
                    field_config["advanced"] = False
                elif field_name in body_fields:
                    field_config["advanced"] = method not in ["POST", "PUT", "PATCH"]
                elif field_name in always_advanced_fields:
                    field_config["advanced"] = True
                else:
                    field_config["advanced"] = True
            else:
                self.log(f"Expected dict for build_config[{field_name}], got {type(field_config).__name__}")

        return build_config

    async def make_request(
        self,
        client: httpx.AsyncClient,
        method: str,
        url: str,
        headers: dict | None = None,
        body: Any = None,
        timeout: int = 5,
        *,
        follow_redirects: bool = True,
        save_to_file: bool = False,
        include_httpx_metadata: bool = False,
    ) -> Data:
        method = method.upper()
        if method not in {"GET", "POST", "PATCH", "PUT", "DELETE"}:
            msg = f"Unsupported method: {method}"
            raise ValueError(msg)

        # Process body using the new helper method
        processed_body = self._process_body(body)

        try:
            response = await client.request(
                method,
                url,
                headers=headers,
                json=processed_body,
                timeout=timeout,
                follow_redirects=follow_redirects,
            )

            redirection_history = [
                {
                    "url": redirect.headers.get("Location", str(redirect.url)),
                    "status_code": redirect.status_code,
                }
                for redirect in response.history
            ]

            is_binary, file_path = await self._response_info(response, with_file_path=save_to_file)
            response_headers = self._headers_to_dict(response.headers)

            metadata: dict[str, Any] = {
                "source": url,
            }

            if save_to_file:
                mode = "wb" if is_binary else "w"
                encoding = response.encoding if mode == "w" else None
                if file_path:
                    # Ensure parent directory exists
                    await aiofiles_os.makedirs(file_path.parent, exist_ok=True)
                    if is_binary:
                        async with aiofiles.open(file_path, "wb") as f:
                            await f.write(response.content)
                            await f.flush()
                    else:
                        async with aiofiles.open(file_path, "w", encoding=encoding) as f:
                            await f.write(response.text)
                            await f.flush()
                    metadata["file_path"] = str(file_path)

                if include_httpx_metadata:
                    metadata.update(
                        {
                            "headers": headers,
                            "status_code": response.status_code,
                            "response_headers": response_headers,
                            **({"redirection_history": redirection_history} if redirection_history else {}),
                        }
                    )
                return Data(data=metadata)

            if is_binary:
                result = response.content
            else:
                try:
                    result = response.json()
                except json.JSONDecodeError:
                    self.log("Failed to decode JSON response")
                    result = response.text.encode("utf-8")

            metadata.update({"result": result})

            if include_httpx_metadata:
                metadata.update(
                    {
                        "headers": headers,
                        "status_code": response.status_code,
                        "response_headers": response_headers,
                        **({"redirection_history": redirection_history} if redirection_history else {}),
                    }
                )
            return Data(data=metadata)
        except httpx.TimeoutException:
            return Data(
                data={
                    "source": url,
                    "headers": headers,
                    "status_code": 408,
                    "error": "Request timed out",
                },
            )
        except Exception as exc:  # noqa: BLE001
            self.log(f"Error making request to {url}")
            return Data(
                data={
                    "source": url,
                    "headers": headers,
                    "status_code": 500,
                    "error": str(exc),
                    **({"redirection_history": redirection_history} if redirection_history else {}),
                },
            )

    def add_query_params(self, url: str, params: dict) -> str:
        url_parts = list(urlparse(url))
        query = dict(parse_qsl(url_parts[4]))
        query.update(params)
        url_parts[4] = urlencode(query)
        return urlunparse(url_parts)

    async def make_requests(self) -> list[Data]:
        method = self.method
        urls = [url.strip() for url in self.urls if url.strip()]
        headers = self.headers or {}
        body = self.body or {}
        timeout = self.timeout
        follow_redirects = self.follow_redirects
        save_to_file = self.save_to_file
        include_httpx_metadata = self.include_httpx_metadata

        if self.use_curl and self.curl:
            self._build_config = self.parse_curl(self.curl, dotdict())

        invalid_urls = [url for url in urls if not validators.url(url)]
        if invalid_urls:
            msg = f"Invalid URLs provided: {invalid_urls}"
            raise ValueError(msg)

        if isinstance(self.query_params, str):
            query_params = dict(parse_qsl(self.query_params))
        else:
            query_params = self.query_params.data if self.query_params else {}

        # Process headers here
        headers = self._process_headers(headers)

        # Process body
        body = self._process_body(body)

        bodies = [body] * len(urls)

        urls = [self.add_query_params(url, query_params) for url in urls]

        async with httpx.AsyncClient() as client:
            results = await asyncio.gather(
                *[
                    self.make_request(
                        client,
                        method,
                        u,
                        headers,
                        rec,
                        timeout,
                        follow_redirects=follow_redirects,
                        save_to_file=save_to_file,
                        include_httpx_metadata=include_httpx_metadata,
                    )
                    for u, rec in zip(urls, bodies, strict=False)
                ]
            )
        self.status = results
        return results

    async def _response_info(
        self, response: httpx.Response, *, with_file_path: bool = False
    ) -> tuple[bool, Path | None]:
        """Determine the file path and whether the response content is binary.

        Args:
            response (Response): The HTTP response object.
            with_file_path (bool): Whether to save the response content to a file.

        Returns:
            Tuple[bool, Path | None]:
                A tuple containing a boolean indicating if the content is binary and the full file path (if applicable).
        """
        content_type = response.headers.get("Content-Type", "")
        is_binary = "application/octet-stream" in content_type or "application/binary" in content_type

        if not with_file_path:
            return is_binary, None

        component_temp_dir = Path(tempfile.gettempdir()) / self.__class__.__name__

        # Create directory asynchronously
        await aiofiles_os.makedirs(component_temp_dir, exist_ok=True)

        filename = None
        if "Content-Disposition" in response.headers:
            content_disposition = response.headers["Content-Disposition"]
            filename_match = re.search(r'filename="(.+?)"', content_disposition)
            if filename_match:
                extracted_filename = filename_match.group(1)
                filename = extracted_filename

        # Step 3: Infer file extension or use part of the request URL if no filename
        if not filename:
            # Extract the last segment of the URL path
            url_path = urlparse(str(response.request.url) if response.request else "").path
            base_name = Path(url_path).name  # Get the last segment of the path
            if not base_name:  # If the path ends with a slash or is empty
                base_name = "response"

            # Infer file extension
            content_type_to_extension = {
                "text/plain": ".txt",
                "application/json": ".json",
                "image/jpeg": ".jpg",
                "image/png": ".png",
                "application/octet-stream": ".bin",
            }
            extension = content_type_to_extension.get(content_type, ".bin" if is_binary else ".txt")
            filename = f"{base_name}{extension}"

        # Step 4: Define the full file path
        file_path = component_temp_dir / filename

        # Step 5: Check if file exists asynchronously and handle accordingly
        try:
            # Try to create the file exclusively (x mode) to check existence
            async with aiofiles.open(file_path, "x") as _:
                pass  # File created successfully, we can use this path
        except FileExistsError:
            # If file exists, append a timestamp to the filename
            timestamp = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S%f")
            file_path = component_temp_dir / f"{timestamp}-{filename}"

        return is_binary, file_path

    def _headers_to_dict(self, headers: httpx.Headers) -> dict[str, str]:
        """Convert HTTP headers to a dictionary with lowercased keys."""
        return {k.lower(): v for k, v in headers.items()}

    def _process_headers(self, headers: Any) -> dict:
        """Process the headers input into a valid dictionary.

        Args:
            headers: The headers to process, can be dict, str, or list
        Returns:
            Processed dictionary
        """
        if headers is None:
            return {}
        if isinstance(headers, dict):
            return headers
        if isinstance(headers, list):
            processed_headers = {}
            try:
                for item in headers:
                    if not self._is_valid_key_value_item(item):
                        continue
                    key = item["key"]
                    value = item["value"]
                    processed_headers[key] = value
            except (KeyError, TypeError, ValueError) as e:
                self.log(f"Failed to process headers list: {e}")
                return {}  # Return empty dictionary instead of None
            return processed_headers
        return {}

File

This component loads and parses text files of various supported formats, converting the content into a Data object. It supports multiple file types and provides an option for silent error handling.

The maximum supported file size is 100 MB.

Parameters

Inputs
Name Display Name Info

path

Path

File path to load.

silent_errors

Silent Errors

If true, errors will not raise an exception

Outputs
Name Display Name Info

data

Data

Parsed content of the file as a Data object

Supported file extensions

The following file types are supported for processing:

Supported file extensions
Document formats
  • txt - Plain Text

  • md - Markdown

  • mdx - MDX (Markdown with JSX)

  • pdf - Portable Document Format

  • docx - Microsoft Word Document

Data formats
  • csv - Comma-Separated Values

  • json - JavaScript Object Notation

  • yaml, yml - YAML Ain’t Markup Language

Markup Languages
  • xml - Extensible Markup Language

  • html, htm - HyperText Markup Language

Programming Languages
  • py - Python

  • js - JavaScript

  • ts - TypeScript

  • tsx - TypeScript with JSX

  • sql - Structured Query Language

  • sh - Shell Script

Image Formats
  • jpg, jpeg - Joint Photographic Experts Group

  • png - Portable Network Graphics

  • gif - Graphics Interchange Format

  • bmp - Bitmap Image File

  • svg - Scalable Vector Graphics

Component code

file.py
from langflow.base.data import BaseFileComponent
from langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data
from langflow.io import BoolInput, IntInput
from langflow.schema import Data


class FileComponent(BaseFileComponent):
    """Handles loading and processing of individual or zipped text files.

    This component supports processing multiple valid files within a zip archive,
    resolving paths, validating file types, and optionally using multithreading for processing.
    """

    display_name = "File"
    description = "Load a file to be used in your project."
    icon = "file-text"
    name = "File"

    VALID_EXTENSIONS = TEXT_FILE_TYPES

    inputs = [
        *BaseFileComponent._base_inputs,
        BoolInput(
            name="use_multithreading",
            display_name="[Deprecated] Use Multithreading",
            advanced=True,
            value=True,
            info="Set 'Processing Concurrency' greater than 1 to enable multithreading.",
        ),
        IntInput(
            name="concurrency_multithreading",
            display_name="Processing Concurrency",
            advanced=True,
            info="When multiple files are being processed, the number of files to process concurrently.",
            value=1,
        ),
    ]

    outputs = [
        *BaseFileComponent._base_outputs,
    ]

    def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:
        """Processes files either sequentially or in parallel, depending on concurrency settings.

        Args:
            file_list (list[BaseFileComponent.BaseFile]): List of files to process.

        Returns:
            list[BaseFileComponent.BaseFile]: Updated list of files with merged data.
        """

        def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None:
            """Processes a single file and returns its Data object."""
            try:
                return parse_text_file_to_data(file_path, silent_errors=silent_errors)
            except FileNotFoundError as e:
                msg = f"File not found: {file_path}. Error: {e}"
                self.log(msg)
                if not silent_errors:
                    raise
                return None
            except Exception as e:
                msg = f"Unexpected error processing {file_path}: {e}"
                self.log(msg)
                if not silent_errors:
                    raise
                return None

        if not file_list:
            msg = "No files to process."
            raise ValueError(msg)

        concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading)
        file_count = len(file_list)

        parallel_processing_threshold = 2
        if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold:
            if file_count > 1:
                self.log(f"Processing {file_count} files sequentially.")
            processed_data = [process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list]
        else:
            self.log(f"Starting parallel processing of {file_count} files with concurrency: {concurrency}.")
            file_paths = [str(file.path) for file in file_list]
            processed_data = parallel_load_data(
                file_paths,
                silent_errors=self.silent_errors,
                load_function=process_file,
                max_concurrency=concurrency,
            )

        # Use rollup_basefile_data to merge processed data with BaseFile objects
        return self.rollup_data(file_list, processed_data)

Gmail loader

This component loads emails from Gmail using provided credentials and filters. For more information about creating a service account JSON, see Service Account JSON.

Parameters

Input Type Description

json_string

SecretStrInput

JSON string containing OAuth 2.0 access token information for service account access

label_ids

MessageTextInput

Comma-separated list of label IDs to filter emails

max_results

MessageTextInput

Maximum number of emails to load

Output Type Description

data

Data

Loaded email data

Component code

gmail.py
import base64
import json
import re
from collections.abc import Iterator
from json.decoder import JSONDecodeError
from typing import Any

from google.auth.exceptions import RefreshError
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from langchain_core.chat_sessions import ChatSession
from langchain_core.messages import HumanMessage
from langchain_google_community.gmail.loader import GMailLoader
from loguru import logger

from langflow.custom import Component
from langflow.inputs import MessageTextInput
from langflow.io import SecretStrInput
from langflow.schema import Data
from langflow.template import Output


class GmailLoaderComponent(Component):
    display_name = "Gmail Loader"
    description = "Loads emails from Gmail using provided credentials."
    icon = "Google"

    inputs = [
        SecretStrInput(
            name="json_string",
            display_name="JSON String of the Service Account Token",
            info="JSON string containing OAuth 2.0 access token information for service account access",
            required=True,
            value="""{
                "account": "",
                "client_id": "",
                "client_secret": "",
                "expiry": "",
                "refresh_token": "",
                "scopes": [
                    "https://www.googleapis.com/auth/gmail.readonly",
                ],
                "token": "",
                "token_uri": "https://oauth2.googleapis.com/token",
                "universe_domain": "googleapis.com"
            }""",
        ),
        MessageTextInput(
            name="label_ids",
            display_name="Label IDs",
            info="Comma-separated list of label IDs to filter emails.",
            required=True,
            value="INBOX,SENT,UNREAD,IMPORTANT",
        ),
        MessageTextInput(
            name="max_results",
            display_name="Max Results",
            info="Maximum number of emails to load.",
            required=True,
            value="10",
        ),
    ]

    outputs = [
        Output(display_name="Data", name="data", method="load_emails"),
    ]

    def load_emails(self) -> Data:
        class CustomGMailLoader(GMailLoader):
            def __init__(
                self, creds: Any, *, n: int = 100, label_ids: list[str] | None = None, raise_error: bool = False
            ) -> None:
                super().__init__(creds, n, raise_error)
                self.label_ids = label_ids if label_ids is not None else ["SENT"]

            def clean_message_content(self, message):
                # Remove URLs
                message = re.sub(r"http\S+|www\S+|https\S+", "", message, flags=re.MULTILINE)

                # Remove email addresses
                message = re.sub(r"\S+@\S+", "", message)

                # Remove special characters and excessive whitespace
                message = re.sub(r"[^A-Za-z0-9\s]+", " ", message)
                message = re.sub(r"\s{2,}", " ", message)

                # Trim leading and trailing whitespace
                return message.strip()

            def _extract_email_content(self, msg: Any) -> HumanMessage:
                from_email = None
                for values in msg["payload"]["headers"]:
                    name = values["name"]
                    if name == "From":
                        from_email = values["value"]
                if from_email is None:
                    msg = "From email not found."
                    raise ValueError(msg)

                parts = msg["payload"]["parts"] if "parts" in msg["payload"] else [msg["payload"]]

                for part in parts:
                    if part["mimeType"] == "text/plain":
                        data = part["body"]["data"]
                        data = base64.urlsafe_b64decode(data).decode("utf-8")
                        pattern = re.compile(r"\r\nOn .+(\r\n)*wrote:\r\n")
                        newest_response = re.split(pattern, data)[0]
                        return HumanMessage(
                            content=self.clean_message_content(newest_response),
                            additional_kwargs={"sender": from_email},
                        )
                msg = "No plain text part found in the email."
                raise ValueError(msg)

            def _get_message_data(self, service: Any, message: Any) -> ChatSession:
                msg = service.users().messages().get(userId="me", id=message["id"]).execute()
                message_content = self._extract_email_content(msg)

                in_reply_to = None
                email_data = msg["payload"]["headers"]
                for values in email_data:
                    name = values["name"]
                    if name == "In-Reply-To":
                        in_reply_to = values["value"]

                thread_id = msg["threadId"]

                if in_reply_to:
                    thread = service.users().threads().get(userId="me", id=thread_id).execute()
                    messages = thread["messages"]

                    response_email = None
                    for _message in messages:
                        email_data = _message["payload"]["headers"]
                        for values in email_data:
                            if values["name"] == "Message-ID":
                                message_id = values["value"]
                                if message_id == in_reply_to:
                                    response_email = _message
                    if response_email is None:
                        msg = "Response email not found in the thread."
                        raise ValueError(msg)
                    starter_content = self._extract_email_content(response_email)
                    return ChatSession(messages=[starter_content, message_content])
                return ChatSession(messages=[message_content])

            def lazy_load(self) -> Iterator[ChatSession]:
                service = build("gmail", "v1", credentials=self.creds)
                results = (
                    service.users().messages().list(userId="me", labelIds=self.label_ids, maxResults=self.n).execute()
                )
                messages = results.get("messages", [])
                if not messages:
                    logger.warning("No messages found with the specified labels.")
                for message in messages:
                    try:
                        yield self._get_message_data(service, message)
                    except Exception:
                        if self.raise_error:
                            raise
                        else:
                            logger.exception(f"Error processing message {message['id']}")

        json_string = self.json_string
        label_ids = self.label_ids.split(",") if self.label_ids else ["INBOX"]
        max_results = int(self.max_results) if self.max_results else 100

        # Load the token information from the JSON string
        try:
            token_info = json.loads(json_string)
        except JSONDecodeError as e:
            msg = "Invalid JSON string"
            raise ValueError(msg) from e

        creds = Credentials.from_authorized_user_info(token_info)

        # Initialize the custom loader with the provided credentials
        loader = CustomGMailLoader(creds=creds, n=max_results, label_ids=label_ids)

        try:
            docs = loader.load()
        except RefreshError as e:
            msg = "Authentication error: Unable to refresh authentication token. Please try to reauthenticate."
            raise ValueError(msg) from e
        except Exception as e:
            msg = f"Error loading documents: {e}"
            raise ValueError(msg) from e

        # Return the loaded documents
        self.status = docs
        return Data(data={"text": docs})

Google Drive loader

This component loads documents from Google Drive using provided credentials and a single document ID. For more information about creating a service account JSON, see Service Account JSON.

Parameters

Input Type Description

json_string

SecretStrInput

JSON string containing OAuth 2.0 access token information for service account access

document_id

MessageTextInput

Single Google Drive document ID

Output Type Description

docs

Data

Loaded document data

Component code

google_drive.py
import json
from json.decoder import JSONDecodeError

from google.auth.exceptions import RefreshError
from google.oauth2.credentials import Credentials
from langchain_google_community import GoogleDriveLoader

from langflow.custom import Component
from langflow.helpers.data import docs_to_data
from langflow.inputs import MessageTextInput
from langflow.io import SecretStrInput
from langflow.schema import Data
from langflow.template import Output


class GoogleDriveComponent(Component):
    display_name = "Google Drive Loader"
    description = "Loads documents from Google Drive using provided credentials."
    icon = "Google"

    inputs = [
        SecretStrInput(
            name="json_string",
            display_name="JSON String of the Service Account Token",
            info="JSON string containing OAuth 2.0 access token information for service account access",
            required=True,
        ),
        MessageTextInput(
            name="document_id", display_name="Document ID", info="Single Google Drive document ID", required=True
        ),
    ]

    outputs = [
        Output(display_name="Loaded Documents", name="docs", method="load_documents"),
    ]

    def load_documents(self) -> Data:
        class CustomGoogleDriveLoader(GoogleDriveLoader):
            creds: Credentials | None = None
            """Credentials object to be passed directly."""

            def _load_credentials(self):
                """Load credentials from the provided creds attribute or fallback to the original method."""
                if self.creds:
                    return self.creds
                msg = "No credentials provided."
                raise ValueError(msg)

            class Config:
                arbitrary_types_allowed = True

        json_string = self.json_string

        document_ids = [self.document_id]
        if len(document_ids) != 1:
            msg = "Expected a single document ID"
            raise ValueError(msg)

        # TODO: Add validation to check if the document ID is valid

        # Load the token information from the JSON string
        try:
            token_info = json.loads(json_string)
        except JSONDecodeError as e:
            msg = "Invalid JSON string"
            raise ValueError(msg) from e

        # Initialize the custom loader with the provided credentials and document IDs
        loader = CustomGoogleDriveLoader(
            creds=Credentials.from_authorized_user_info(token_info), document_ids=document_ids
        )

        # Load the documents
        try:
            docs = loader.load()
        # catch google.auth.exceptions.RefreshError
        except RefreshError as e:
            msg = "Authentication error: Unable to refresh authentication token. Please try to reauthenticate."
            raise ValueError(msg) from e
        except Exception as e:
            msg = f"Error loading documents: {e}"
            raise ValueError(msg) from e

        if len(docs) != 1:
            msg = "Expected a single document to be loaded."
            raise ValueError(msg)

        data = docs_to_data(docs)
        # Return the loaded documents
        self.status = data
        return Data(data={"text": data})

This component searches Google Drive files using provided credentials and query parameters. For more information about creating a service account JSON, see Service Account JSON.

Parameters

Input Type Description

token_string

SecretStrInput

JSON string containing OAuth 2.0 access token information for service account access

query_item

DropdownInput

The field to query

valid_operator

DropdownInput

Operator to use in the query

search_term

MessageTextInput

The value to search for in the specified query item

query_string

MessageTextInput

The query string used for searching (can be edited manually)

Output Type Description

doc_urls

List[str]

URLs of the found documents

doc_ids

List[str]

IDs of the found documents

doc_titles

List[str]

Titles of the found documents

Data

Data

Document titles and URLs in a structured format

Component code

google_drive_search.py
import json

from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build

from langflow.custom import Component
from langflow.inputs import DropdownInput, MessageTextInput
from langflow.io import SecretStrInput
from langflow.schema import Data
from langflow.template import Output


class GoogleDriveSearchComponent(Component):
    display_name = "Google Drive Search"
    description = "Searches Google Drive files using provided credentials and query parameters."
    icon = "Google"

    inputs = [
        SecretStrInput(
            name="token_string",
            display_name="Token String",
            info="JSON string containing OAuth 2.0 access token information for service account access",
            required=True,
        ),
        DropdownInput(
            name="query_item",
            display_name="Query Item",
            options=[
                "name",
                "fullText",
                "mimeType",
                "modifiedTime",
                "viewedByMeTime",
                "trashed",
                "starred",
                "parents",
                "owners",
                "writers",
                "readers",
                "sharedWithMe",
                "createdTime",
                "properties",
                "appProperties",
                "visibility",
                "shortcutDetails.targetId",
            ],
            info="The field to query.",
            required=True,
        ),
        DropdownInput(
            name="valid_operator",
            display_name="Valid Operator",
            options=["contains", "=", "!=", "<=", "<", ">", ">=", "in", "has"],
            info="Operator to use in the query.",
            required=True,
        ),
        MessageTextInput(
            name="search_term",
            display_name="Search Term",
            info="The value to search for in the specified query item.",
            required=True,
        ),
        MessageTextInput(
            name="query_string",
            display_name="Query String",
            info="The query string used for searching. You can edit this manually.",
            value="",  # This will be updated with the generated query string
        ),
    ]

    outputs = [
        Output(display_name="Document URLs", name="doc_urls", method="search_doc_urls"),
        Output(display_name="Document IDs", name="doc_ids", method="search_doc_ids"),
        Output(display_name="Document Titles", name="doc_titles", method="search_doc_titles"),
        Output(display_name="Data", name="Data", method="search_data"),
    ]

    def generate_query_string(self) -> str:
        query_item = self.query_item
        valid_operator = self.valid_operator
        search_term = self.search_term

        # Construct the query string
        query = f"{query_item} {valid_operator} '{search_term}'"

        # Update the editable query string input with the generated query
        self.query_string = query

        return query

    def on_inputs_changed(self) -> None:
        # Automatically regenerate the query string when inputs change
        self.generate_query_string()

    def generate_file_url(self, file_id: str, mime_type: str) -> str:
        """Generates the appropriate Google Drive URL for a file based on its MIME type."""
        return {
            "application/vnd.google-apps.document": f"https://docs.google.com/document/d/{file_id}/edit",
            "application/vnd.google-apps.spreadsheet": f"https://docs.google.com/spreadsheets/d/{file_id}/edit",
            "application/vnd.google-apps.presentation": f"https://docs.google.com/presentation/d/{file_id}/edit",
            "application/vnd.google-apps.drawing": f"https://docs.google.com/drawings/d/{file_id}/edit",
            "application/pdf": f"https://drive.google.com/file/d/{file_id}/view?usp=drivesdk",
        }.get(mime_type, f"https://drive.google.com/file/d/{file_id}/view?usp=drivesdk")

    def search_files(self) -> dict:
        # Load the token information from the JSON string
        token_info = json.loads(self.token_string)
        creds = Credentials.from_authorized_user_info(token_info)

        # Use the query string from the input (which might have been edited by the user)
        query = self.query_string or self.generate_query_string()

        # Initialize the Google Drive API service
        service = build("drive", "v3", credentials=creds)

        # Perform the search
        results = service.files().list(q=query, pageSize=5, fields="nextPageToken, files(id, name, mimeType)").execute()
        items = results.get("files", [])

        doc_urls = []
        doc_ids = []
        doc_titles_urls = []
        doc_titles = []

        if items:
            for item in items:
                # Directly use the file ID, title, and MIME type to generate the URL
                file_id = item["id"]
                file_title = item["name"]
                mime_type = item["mimeType"]
                file_url = self.generate_file_url(file_id, mime_type)

                # Store the URL, ID, and title+URL in their respective lists
                doc_urls.append(file_url)
                doc_ids.append(file_id)
                doc_titles.append(file_title)
                doc_titles_urls.append({"title": file_title, "url": file_url})

        return {"doc_urls": doc_urls, "doc_ids": doc_ids, "doc_titles_urls": doc_titles_urls, "doc_titles": doc_titles}

    def search_doc_ids(self) -> list[str]:
        return self.search_files()["doc_ids"]

    def search_doc_urls(self) -> list[str]:
        return self.search_files()["doc_urls"]

    def search_doc_titles(self) -> list[str]:
        return self.search_files()["doc_titles"]

    def search_data(self) -> Data:
        return Data(data={"text": self.search_files()["doc_titles_urls"]})

SQL Query

This component executes SQL queries on a specified database.

Parameters

Inputs
Name Display Name Info

query

Query

The SQL query to execute.

database_url

Database URL

The URL of the database.

include_columns

Include Columns

Include columns in the result.

passthrough

Passthrough

If an error occurs, return the query instead of raising an exception.

add_error

Add Error

Add the error to the result.

Outputs
Name Display Name Info

result

Result

The result of the SQL query execution.

Component code

sql_executor.py
from langchain_community.tools.sql_database.tool import QuerySQLDataBaseTool
from langchain_community.utilities import SQLDatabase

from langflow.custom import CustomComponent
from langflow.field_typing import Text


class SQLExecutorComponent(CustomComponent):
    display_name = "SQL Query"
    description = "Execute SQL query."
    name = "SQLExecutor"
    beta: bool = True

    def build_config(self):
        return {
            "database_url": {
                "display_name": "Database URL",
                "info": "The URL of the database.",
            },
            "include_columns": {
                "display_name": "Include Columns",
                "info": "Include columns in the result.",
            },
            "passthrough": {
                "display_name": "Passthrough",
                "info": "If an error occurs, return the query instead of raising an exception.",
            },
            "add_error": {
                "display_name": "Add Error",
                "info": "Add the error to the result.",
            },
        }

    def clean_up_uri(self, uri: str) -> str:
        if uri.startswith("postgresql://"):
            uri = uri.replace("postgresql://", "postgres://")
        return uri.strip()

    def build(
        self,
        query: str,
        database_url: str,
        *,
        include_columns: bool = False,
        passthrough: bool = False,
        add_error: bool = False,
        **kwargs,
    ) -> Text:
        _ = kwargs
        error = None
        try:
            database = SQLDatabase.from_uri(database_url)
        except Exception as e:
            msg = f"An error occurred while connecting to the database: {e}"
            raise ValueError(msg) from e
        try:
            tool = QuerySQLDataBaseTool(db=database)
            result = tool.run(query, include_columns=include_columns)
            self.status = result
        except Exception as e:
            result = str(e)
            self.status = result
            if not passthrough:
                raise
            error = repr(e)

        if add_error and error is not None:
            result = f"{result}\n\nError: {error}\n\nQuery: {query}"
        elif error is not None:
            # Then we won't add the error to the result
            # but since we are in passthrough mode, we will return the query
            result = query

        return result

URL

The URLComponent fetches content from one or more URLs, processes the content, and returns it as a list of Data objects. It ensures that the provided URLs are valid and uses WebBaseLoader to fetch the content.

Parameters

Inputs
Name Display Name Info

urls

URLs

Enter one or more URLs

Outputs
Name Display Name Info

data

Data

List of Data objects containing fetched content and metadata

Component code

url.py
import re

from langchain_community.document_loaders import AsyncHtmlLoader, WebBaseLoader

from langflow.custom import Component
from langflow.helpers.data import data_to_text
from langflow.io import DropdownInput, MessageTextInput, Output
from langflow.schema import Data
from langflow.schema.dataframe import DataFrame
from langflow.schema.message import Message


class URLComponent(Component):
    display_name = "URL"
    description = "Load and retrive data from specified URLs."
    icon = "layout-template"
    name = "URL"

    inputs = [
        MessageTextInput(
            name="urls",
            display_name="URLs",
            is_list=True,
            tool_mode=True,
            placeholder="Enter a URL...",
            list_add_label="Add URL",
        ),
        DropdownInput(
            name="format",
            display_name="Output Format",
            info="Output Format. Use 'Text' to extract the text from the HTML or 'Raw HTML' for the raw HTML content.",
            options=["Text", "Raw HTML"],
            value="Text",
        ),
    ]

    outputs = [
        Output(display_name="Data", name="data", method="fetch_content"),
        Output(display_name="Message", name="text", method="fetch_content_text"),
        Output(display_name="DataFrame", name="dataframe", method="as_dataframe"),
    ]

    def ensure_url(self, string: str) -> str:
        """Ensures the given string is a URL by adding 'http://' if it doesn't start with 'http://' or 'https://'.

        Raises an error if the string is not a valid URL.

        Parameters:
            string (str): The string to be checked and possibly modified.

        Returns:
            str: The modified string that is ensured to be a URL.

        Raises:
            ValueError: If the string is not a valid URL.
        """
        if not string.startswith(("http://", "https://")):
            string = "http://" + string

        # Basic URL validation regex
        url_regex = re.compile(
            r"^(https?:\/\/)?"  # optional protocol
            r"(www\.)?"  # optional www
            r"([a-zA-Z0-9.-]+)"  # domain
            r"(\.[a-zA-Z]{2,})?"  # top-level domain
            r"(:\d+)?"  # optional port
            r"(\/[^\s]*)?$",  # optional path
            re.IGNORECASE,
        )

        if not url_regex.match(string):
            msg = f"Invalid URL: {string}"
            raise ValueError(msg)

        return string

    def fetch_content(self) -> list[Data]:
        urls = [self.ensure_url(url.strip()) for url in self.urls if url.strip()]
        if self.format == "Raw HTML":
            loader = AsyncHtmlLoader(web_path=urls, encoding="utf-8")
        else:
            loader = WebBaseLoader(web_paths=urls, encoding="utf-8")
        docs = loader.load()
        data = [Data(text=doc.page_content, **doc.metadata) for doc in docs]
        self.status = data
        return data

    def fetch_content_text(self) -> Message:
        data = self.fetch_content()

        result_string = data_to_text("{text}", data)
        self.status = result_string
        return Message(text=result_string)

    def as_dataframe(self) -> DataFrame:
        return DataFrame(self.fetch_content())

Webhook

This component defines a webhook trigger that runs a flow when it receives an HTTP POST request.

If the input is not valid JSON, the component wraps it in a payload object so that it can be processed and still trigger the flow. The component does not require an API key.

When a Webhook component is added to the workspace, a new Webhook cURL tab becomes available in the API pane that contains an HTTP POST request for triggering the webhook component. For example:

curl -X POST \
    "https://api.langflow.astra.datastax.com/lf/YOUR_LANGFLOW_ID/api/v1/run/YOUR_FLOW_ID" \
  -H 'Content-Type: application/json' \
  -H 'Authorization: Bearer YOUR_APPLICATION_TOKEN' \
  -d '{"any": "data"}'

To test the webhook component:

  1. Add a Webhook component to the flow.

  2. Connect the Webhook component’s Data output to the Data input of a Data to message component.

  3. Connect the Data to Message component’s Message output to the Text input of a Chat input component.

  4. To send a POST request, copy the code from the Webhook cURL tab in the API pane and paste it into a terminal.

  5. Send the POST request.

  6. Open the Playground. Your JSON data is posted to the Chat output component, which indicates that the webhook component is correctly triggering the flow.

Parameters

Inputs
Name Type Description

data

String

JSON payload for testing the webhook component

Outputs
Name Type Description

output_data

Data

Processed data from the webhook input

Component code

webhook.py
import json

from langflow.custom import Component
from langflow.io import MultilineInput, Output
from langflow.schema import Data


class WebhookComponent(Component):
    display_name = "Webhook"
    description = "Defines a webhook input for the flow."
    name = "Webhook"
    icon = "webhook"

    inputs = [
        MultilineInput(
            name="data",
            display_name="Payload",
            info="Receives a payload from external systems via HTTP POST.",
        )
    ]
    outputs = [
        Output(display_name="Data", name="output_data", method="build_data"),
    ]

    def build_data(self) -> Data:
        message: str | Data = ""
        if not self.data:
            self.status = "No data provided."
            return Data(data={})
        try:
            body = json.loads(self.data or "{}")
        except json.JSONDecodeError:
            body = {"payload": self.data}
            message = f"Invalid JSON payload. Please check the format.\n\n{self.data}"
        data = Data(data=body)
        if not message:
            message = data
        self.status = message
        return data

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax | Privacy policy | Terms of use | Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com