Data
This Langflow feature is currently in public preview. Development is ongoing, and the features and functionality are subject to change. Langflow, and the use of such, is subject to the DataStax Preview Terms. |
Data components load data from a source into your flow.
They may perform some processing or type checking, like converting raw HTML data into text, or ensuring your loaded file is of an acceptable type.
Use a data component in a flow
The URL data component loads content from a list of URLs.
In the component’s URLs field, enter a comma-separated list of URLs you want to load. Alternatively, connect a component that outputs the Message
type, like the Chat Input component, to supply your URLs with a component.
To output a Data
type, in the Output Format dropdown, select Raw HTML.
To output a Message
type, in the Output Format dropdown, select Text. This option applies postprocessing with the data_to_text
helper function.
In this example of a document ingestion pipeline, the URL component outputs raw HTML to a text splitter, which splits the raw content into chunks for a vector database to ingest.

API Request
This component makes HTTP requests using URLs or cURL commands.
Parameters
Name | Display Name | Info |
---|---|---|
urls |
URLs |
Enter one or more URLs, separated by commas. |
curl |
cURL |
Paste a curl command to populate the fields. This completes the dictionary fields for headers and body. |
method |
Method |
The HTTP method to use. |
use_curl |
Use cURL |
Enable cURL mode to populate fields from a cURL command. |
query_params |
Query Parameters |
The query parameters to append to the URL. |
body |
Body |
The body sent with the request as a dictionary (for POST, PATCH, PUT). |
headers |
Headers |
The headers sent with the request as a dictionary. |
timeout |
Timeout |
The timeout specified for the request. |
follow_redirects |
Follow Redirects |
Whether to follow http redirects. |
include_httpx_metadata |
Include HTTPx Metadata |
Include properties such as headers, status_code, response_headers, and redirection_history in the output. |
Name | Display Name | Info |
---|---|---|
data |
Data |
The result of the API requests. |
Component code
api_request.py
import json
import re
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
import aiofiles
import aiofiles.os as aiofiles_os
import httpx
import validators
from langflow.base.curl.parse import parse_context
from langflow.custom.custom_component.component import Component
from langflow.inputs.inputs import TabInput
from langflow.io import (
BoolInput,
DataInput,
DropdownInput,
IntInput,
MessageTextInput,
MultilineInput,
Output,
TableInput,
)
from langflow.schema.data import Data
from langflow.schema.dotdict import dotdict
from langflow.services.deps import get_settings_service
from langflow.utils.component_utils import set_current_fields, set_field_advanced, set_field_display
# Define fields for each mode
MODE_FIELDS = {
"URL": [
"url_input",
"method",
],
"cURL": ["curl_input"],
}
# Fields that should always be visible
DEFAULT_FIELDS = ["mode"]
class APIRequestComponent(Component):
display_name = "API Request"
description = "Make HTTP requests using URL or cURL commands."
icon = "Globe"
name = "APIRequest"
inputs = [
MessageTextInput(
name="url_input",
display_name="URL",
info="Enter the URL for the request.",
advanced=False,
tool_mode=True,
),
MultilineInput(
name="curl_input",
display_name="cURL",
info=(
"Paste a curl command to populate the fields. "
"This will fill in the dictionary fields for headers and body."
),
real_time_refresh=True,
tool_mode=True,
advanced=True,
show=False,
),
DropdownInput(
name="method",
display_name="Method",
options=["GET", "POST", "PATCH", "PUT", "DELETE"],
value="GET",
info="The HTTP method to use.",
real_time_refresh=True,
),
TabInput(
name="mode",
display_name="Mode",
options=["URL", "cURL"],
value="URL",
info="Enable cURL mode to populate fields from a cURL command.",
real_time_refresh=True,
),
DataInput(
name="query_params",
display_name="Query Parameters",
info="The query parameters to append to the URL.",
advanced=True,
),
TableInput(
name="body",
display_name="Body",
info="The body to send with the request as a dictionary (for POST, PATCH, PUT).",
table_schema=[
{
"name": "key",
"display_name": "Key",
"type": "str",
"description": "Parameter name",
},
{
"name": "value",
"display_name": "Value",
"description": "Parameter value",
},
],
value=[],
input_types=["Data"],
advanced=True,
real_time_refresh=True,
),
TableInput(
name="headers",
display_name="Headers",
info="The headers to send with the request",
table_schema=[
{
"name": "key",
"display_name": "Header",
"type": "str",
"description": "Header name",
},
{
"name": "value",
"display_name": "Value",
"type": "str",
"description": "Header value",
},
],
value=[{"key": "User-Agent", "value": get_settings_service().settings.user_agent}],
advanced=True,
input_types=["Data"],
real_time_refresh=True,
),
IntInput(
name="timeout",
display_name="Timeout",
value=30,
info="The timeout to use for the request.",
advanced=True,
),
BoolInput(
name="follow_redirects",
display_name="Follow Redirects",
value=True,
info="Whether to follow http redirects.",
advanced=True,
),
BoolInput(
name="save_to_file",
display_name="Save to File",
value=False,
info="Save the API response to a temporary file",
advanced=True,
),
BoolInput(
name="include_httpx_metadata",
display_name="Include HTTPx Metadata",
value=False,
info=(
"Include properties such as headers, status_code, response_headers, "
"and redirection_history in the output."
),
advanced=True,
),
]
outputs = [
Output(display_name="API Response", name="data", method="make_api_request"),
]
def _parse_json_value(self, value: Any) -> Any:
"""Parse a value that might be a JSON string."""
if not isinstance(value, str):
return value
try:
parsed = json.loads(value)
except json.JSONDecodeError:
return value
else:
return parsed
def _process_body(self, body: Any) -> dict:
"""Process the body input into a valid dictionary."""
if body is None:
return {}
if isinstance(body, dict):
return self._process_dict_body(body)
if isinstance(body, str):
return self._process_string_body(body)
if isinstance(body, list):
return self._process_list_body(body)
return {}
def _process_dict_body(self, body: dict) -> dict:
"""Process dictionary body by parsing JSON values."""
return {k: self._parse_json_value(v) for k, v in body.items()}
def _process_string_body(self, body: str) -> dict:
"""Process string body by attempting JSON parse."""
try:
return self._process_body(json.loads(body))
except json.JSONDecodeError:
return {"data": body}
def _process_list_body(self, body: list) -> dict:
"""Process list body by converting to key-value dictionary."""
processed_dict = {}
try:
for item in body:
if not self._is_valid_key_value_item(item):
continue
key = item["key"]
value = self._parse_json_value(item["value"])
processed_dict[key] = value
except (KeyError, TypeError, ValueError) as e:
self.log(f"Failed to process body list: {e}")
return {}
return processed_dict
def _is_valid_key_value_item(self, item: Any) -> bool:
"""Check if an item is a valid key-value dictionary."""
return isinstance(item, dict) and "key" in item and "value" in item
def parse_curl(self, curl: str, build_config: dotdict) -> dotdict:
"""Parse a cURL command and update build configuration."""
try:
parsed = parse_context(curl)
# Update basic configuration
url = parsed.url
# Normalize URL before setting it
url = self._normalize_url(url)
build_config["url_input"]["value"] = url
build_config["method"]["value"] = parsed.method.upper()
# Process headers
headers_list = [{"key": k, "value": v} for k, v in parsed.headers.items()]
build_config["headers"]["value"] = headers_list
# Process body data
if not parsed.data:
build_config["body"]["value"] = []
elif parsed.data:
try:
json_data = json.loads(parsed.data)
if isinstance(json_data, dict):
body_list = [
{"key": k, "value": json.dumps(v) if isinstance(v, dict | list) else str(v)}
for k, v in json_data.items()
]
build_config["body"]["value"] = body_list
else:
build_config["body"]["value"] = [{"key": "data", "value": json.dumps(json_data)}]
except json.JSONDecodeError:
build_config["body"]["value"] = [{"key": "data", "value": parsed.data}]
except Exception as exc:
msg = f"Error parsing curl: {exc}"
self.log(msg)
raise ValueError(msg) from exc
return build_config
def _normalize_url(self, url: str) -> str:
"""Normalize URL by adding https:// if no protocol is specified."""
if not url or not isinstance(url, str):
msg = "URL cannot be empty"
raise ValueError(msg)
url = url.strip()
if url.startswith(("http://", "https://")):
return url
return f"https://{url}"
async def make_request(
self,
client: httpx.AsyncClient,
method: str,
url: str,
headers: dict | None = None,
body: Any = None,
timeout: int = 5,
*,
follow_redirects: bool = True,
save_to_file: bool = False,
include_httpx_metadata: bool = False,
) -> Data:
method = method.upper()
if method not in {"GET", "POST", "PATCH", "PUT", "DELETE"}:
msg = f"Unsupported method: {method}"
raise ValueError(msg)
processed_body = self._process_body(body)
redirection_history = []
try:
# Prepare request parameters
request_params = {
"method": method,
"url": url,
"headers": headers,
"json": processed_body,
"timeout": timeout,
"follow_redirects": follow_redirects,
}
response = await client.request(**request_params)
redirection_history = [
{
"url": redirect.headers.get("Location", str(redirect.url)),
"status_code": redirect.status_code,
}
for redirect in response.history
]
is_binary, file_path = await self._response_info(response, with_file_path=save_to_file)
response_headers = self._headers_to_dict(response.headers)
# Base metadata
metadata = {
"source": url,
"status_code": response.status_code,
"response_headers": response_headers,
}
if redirection_history:
metadata["redirection_history"] = redirection_history
if save_to_file:
mode = "wb" if is_binary else "w"
encoding = response.encoding if mode == "w" else None
if file_path:
await aiofiles_os.makedirs(file_path.parent, exist_ok=True)
if is_binary:
async with aiofiles.open(file_path, "wb") as f:
await f.write(response.content)
await f.flush()
else:
async with aiofiles.open(file_path, "w", encoding=encoding) as f:
await f.write(response.text)
await f.flush()
metadata["file_path"] = str(file_path)
if include_httpx_metadata:
metadata.update({"headers": headers})
return Data(data=metadata)
# Handle response content
if is_binary:
result = response.content
else:
try:
result = response.json()
except json.JSONDecodeError:
self.log("Failed to decode JSON response")
result = response.text.encode("utf-8")
metadata["result"] = result
if include_httpx_metadata:
metadata.update({"headers": headers})
return Data(data=metadata)
except (httpx.HTTPError, httpx.RequestError, httpx.TimeoutException) as exc:
self.log(f"Error making request to {url}")
return Data(
data={
"source": url,
"headers": headers,
"status_code": 500,
"error": str(exc),
**({"redirection_history": redirection_history} if redirection_history else {}),
},
)
def add_query_params(self, url: str, params: dict) -> str:
"""Add query parameters to URL efficiently."""
if not params:
return url
url_parts = list(urlparse(url))
query = dict(parse_qsl(url_parts[4]))
query.update(params)
url_parts[4] = urlencode(query)
return urlunparse(url_parts)
def _headers_to_dict(self, headers: httpx.Headers) -> dict[str, str]:
"""Convert HTTP headers to a dictionary with lowercased keys."""
return {k.lower(): v for k, v in headers.items()}
def _process_headers(self, headers: Any) -> dict:
"""Process the headers input into a valid dictionary."""
if headers is None:
return {}
if isinstance(headers, dict):
return headers
if isinstance(headers, list):
return {item["key"]: item["value"] for item in headers if self._is_valid_key_value_item(item)}
return {}
async def make_api_request(self) -> Data:
"""Make HTTP request with optimized parameter handling."""
method = self.method
url = self.url_input.strip() if isinstance(self.url_input, str) else ""
headers = self.headers or {}
body = self.body or {}
timeout = self.timeout
follow_redirects = self.follow_redirects
save_to_file = self.save_to_file
include_httpx_metadata = self.include_httpx_metadata
# if self.mode == "cURL" and self.curl_input:
# self._build_config = self.parse_curl(self.curl_input, dotdict())
# # After parsing curl, get the normalized URL
# url = self._build_config["url_input"]["value"]
# Normalize URL before validation
url = self._normalize_url(url)
# Validate URL
if not validators.url(url):
msg = f"Invalid URL provided: {url}"
raise ValueError(msg)
# Process query parameters
if isinstance(self.query_params, str):
query_params = dict(parse_qsl(self.query_params))
else:
query_params = self.query_params.data if self.query_params else {}
# Process headers and body
headers = self._process_headers(headers)
body = self._process_body(body)
url = self.add_query_params(url, query_params)
async with httpx.AsyncClient() as client:
result = await self.make_request(
client,
method,
url,
headers,
body,
timeout,
follow_redirects=follow_redirects,
save_to_file=save_to_file,
include_httpx_metadata=include_httpx_metadata,
)
self.status = result
return result
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None) -> dotdict:
"""Update the build config based on the selected mode."""
if field_name != "mode":
if field_name == "curl_input" and self.mode == "cURL" and self.curl_input:
return self.parse_curl(self.curl_input, build_config)
return build_config
# print(f"Current mode: {field_value}")
if field_value == "cURL":
set_field_display(build_config, "curl_input", value=True)
if build_config["curl_input"]["value"]:
build_config = self.parse_curl(build_config["curl_input"]["value"], build_config)
else:
set_field_display(build_config, "curl_input", value=False)
return set_current_fields(
build_config=build_config,
action_fields=MODE_FIELDS,
selected_action=field_value,
default_fields=DEFAULT_FIELDS,
func=set_field_advanced,
default_value=True,
)
async def _response_info(
self, response: httpx.Response, *, with_file_path: bool = False
) -> tuple[bool, Path | None]:
"""Determine the file path and whether the response content is binary.
Args:
response (Response): The HTTP response object.
with_file_path (bool): Whether to save the response content to a file.
Returns:
Tuple[bool, Path | None]:
A tuple containing a boolean indicating if the content is binary and the full file path (if applicable).
"""
content_type = response.headers.get("Content-Type", "")
is_binary = "application/octet-stream" in content_type or "application/binary" in content_type
if not with_file_path:
return is_binary, None
component_temp_dir = Path(tempfile.gettempdir()) / self.__class__.__name__
# Create directory asynchronously
await aiofiles_os.makedirs(component_temp_dir, exist_ok=True)
filename = None
if "Content-Disposition" in response.headers:
content_disposition = response.headers["Content-Disposition"]
filename_match = re.search(r'filename="(.+?)"', content_disposition)
if filename_match:
extracted_filename = filename_match.group(1)
filename = extracted_filename
# Step 3: Infer file extension or use part of the request URL if no filename
if not filename:
# Extract the last segment of the URL path
url_path = urlparse(str(response.request.url) if response.request else "").path
base_name = Path(url_path).name # Get the last segment of the path
if not base_name: # If the path ends with a slash or is empty
base_name = "response"
# Infer file extension
content_type_to_extension = {
"text/plain": ".txt",
"application/json": ".json",
"image/jpeg": ".jpg",
"image/png": ".png",
"application/octet-stream": ".bin",
}
extension = content_type_to_extension.get(content_type, ".bin" if is_binary else ".txt")
filename = f"{base_name}{extension}"
# Step 4: Define the full file path
file_path = component_temp_dir / filename
# Step 5: Check if file exists asynchronously and handle accordingly
try:
# Try to create the file exclusively (x mode) to check existence
async with aiofiles.open(file_path, "x") as _:
pass # File created successfully, we can use this path
except FileExistsError:
# If file exists, append a timestamp to the filename
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S%f")
file_path = component_temp_dir / f"{timestamp}-{filename}"
return is_binary, file_path
Directory
This component recursively loads files from a directory, with options for file types, depth, and concurrency.
Parameters
Name | Display Name | Info |
---|---|---|
path |
Path |
The path to the directory to load files from. |
types |
Types |
The file types to load. Leave empty to load all types. |
depth |
Depth |
Controls how many directory levels deep to search for files. 0 means only search current directory, 1 means one level down, and so on. |
max_concurrency |
Max Concurrency |
Maximum concurrency for loading files. The default is 2 files. |
load_hidden |
Load Hidden |
If true, hidden files are loaded. |
recursive |
Recursive |
If true, subdirectories are included in the search. |
silent_errors |
Silent Errors |
When |
use_multithreading |
Use Multithreading |
When |
Name | Display Name | Info |
---|---|---|
data |
Data |
Loaded file data from the directory. Type: |
Component code
directory.py
from langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data, retrieve_file_paths
from langflow.custom.custom_component.component import Component
from langflow.io import BoolInput, IntInput, MessageTextInput, MultiselectInput
from langflow.schema.data import Data
from langflow.schema.dataframe import DataFrame
from langflow.template.field.base import Output
class DirectoryComponent(Component):
display_name = "Directory"
description = "Recursively load files from a directory."
icon = "folder"
name = "Directory"
inputs = [
MessageTextInput(
name="path",
display_name="Path",
info="Path to the directory to load files from. Defaults to current directory ('.')",
value=".",
tool_mode=True,
),
MultiselectInput(
name="types",
display_name="File Types",
info="File types to load. Select one or more types or leave empty to load all supported types.",
options=TEXT_FILE_TYPES,
value=[],
),
IntInput(
name="depth",
display_name="Depth",
info="Depth to search for files.",
value=0,
),
IntInput(
name="max_concurrency",
display_name="Max Concurrency",
advanced=True,
info="Maximum concurrency for loading files.",
value=2,
),
BoolInput(
name="load_hidden",
display_name="Load Hidden",
advanced=True,
info="If true, hidden files will be loaded.",
),
BoolInput(
name="recursive",
display_name="Recursive",
advanced=True,
info="If true, the search will be recursive.",
),
BoolInput(
name="silent_errors",
display_name="Silent Errors",
advanced=True,
info="If true, errors will not raise an exception.",
),
BoolInput(
name="use_multithreading",
display_name="Use Multithreading",
advanced=True,
info="If true, multithreading will be used.",
),
]
outputs = [
Output(display_name="Loaded Files", name="dataframe", method="as_dataframe"),
]
def load_directory(self) -> list[Data]:
path = self.path
types = self.types
depth = self.depth
max_concurrency = self.max_concurrency
load_hidden = self.load_hidden
recursive = self.recursive
silent_errors = self.silent_errors
use_multithreading = self.use_multithreading
resolved_path = self.resolve_path(path)
# If no types are specified, use all supported types
if not types:
types = TEXT_FILE_TYPES
# Check if all specified types are valid
invalid_types = [t for t in types if t not in TEXT_FILE_TYPES]
if invalid_types:
msg = f"Invalid file types specified: {invalid_types}. Valid types are: {TEXT_FILE_TYPES}"
raise ValueError(msg)
valid_types = types
file_paths = retrieve_file_paths(
resolved_path, load_hidden=load_hidden, recursive=recursive, depth=depth, types=valid_types
)
loaded_data = []
if use_multithreading:
loaded_data = parallel_load_data(file_paths, silent_errors=silent_errors, max_concurrency=max_concurrency)
else:
loaded_data = [parse_text_file_to_data(file_path, silent_errors=silent_errors) for file_path in file_paths]
valid_data = [x for x in loaded_data if x is not None and isinstance(x, Data)]
self.status = valid_data
return valid_data
def as_dataframe(self) -> DataFrame:
return DataFrame(self.load_directory())
File
This component loads and parses text files of various supported formats, converting the content into a Data object. It supports multiple file types and provides an option for silent error handling.
The maximum supported file size is 100 MB.
Parameters
Name | Display Name | Info |
---|---|---|
path |
Path |
File path to load. |
silent_errors |
Silent Errors |
If true, errors will not raise an exception |
Name | Display Name | Info |
---|---|---|
data |
Data |
Parsed content of the file as a Data object |
Supported file extensions
The following file types are supported for processing:
Supported file extensions
Document formats |
|
Data formats |
|
Markup Languages |
|
Programming Languages |
|
Image Formats |
|
Component code
file.py
from langflow.base.data.base_file import BaseFileComponent
from langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data
from langflow.io import BoolInput, IntInput
from langflow.schema.data import Data
class FileComponent(BaseFileComponent):
"""Handles loading and processing of individual or zipped text files.
This component supports processing multiple valid files within a zip archive,
resolving paths, validating file types, and optionally using multithreading for processing.
"""
display_name = "File"
description = "Loads content from one or more files as a DataFrame."
icon = "file-text"
name = "File"
VALID_EXTENSIONS = TEXT_FILE_TYPES
inputs = [
*BaseFileComponent._base_inputs,
BoolInput(
name="use_multithreading",
display_name="[Deprecated] Use Multithreading",
advanced=True,
value=True,
info="Set 'Processing Concurrency' greater than 1 to enable multithreading.",
),
IntInput(
name="concurrency_multithreading",
display_name="Processing Concurrency",
advanced=True,
info="When multiple files are being processed, the number of files to process concurrently.",
value=1,
),
]
outputs = [
*BaseFileComponent._base_outputs,
]
def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:
"""Processes files either sequentially or in parallel, depending on concurrency settings.
Args:
file_list (list[BaseFileComponent.BaseFile]): List of files to process.
Returns:
list[BaseFileComponent.BaseFile]: Updated list of files with merged data.
"""
def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None:
"""Processes a single file and returns its Data object."""
try:
return parse_text_file_to_data(file_path, silent_errors=silent_errors)
except FileNotFoundError as e:
msg = f"File not found: {file_path}. Error: {e}"
self.log(msg)
if not silent_errors:
raise
return None
except Exception as e:
msg = f"Unexpected error processing {file_path}: {e}"
self.log(msg)
if not silent_errors:
raise
return None
if not file_list:
msg = "No files to process."
raise ValueError(msg)
concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading)
file_count = len(file_list)
parallel_processing_threshold = 2
if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold:
if file_count > 1:
self.log(f"Processing {file_count} files sequentially.")
processed_data = [process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list]
else:
self.log(f"Starting parallel processing of {file_count} files with concurrency: {concurrency}.")
file_paths = [str(file.path) for file in file_list]
processed_data = parallel_load_data(
file_paths,
silent_errors=self.silent_errors,
load_function=process_file,
max_concurrency=concurrency,
)
# Use rollup_basefile_data to merge processed data with BaseFile objects
return self.rollup_data(file_list, processed_data)
Gmail loader
Google components are available in the Components menu under Bundles.
This component loads emails from Gmail using provided credentials and filters. For more information about creating a service account JSON, see Service Account JSON.
Parameters
Input | Type | Description |
---|---|---|
json_string |
SecretStrInput |
JSON string containing OAuth 2.0 access token information for service account access |
label_ids |
MessageTextInput |
Comma-separated list of label IDs to filter emails |
max_results |
MessageTextInput |
Maximum number of emails to load |
Output | Type | Description |
---|---|---|
data |
Data |
Loaded email data |
Component code
gmail.py
import base64
import json
import re
from collections.abc import Iterator
from json.decoder import JSONDecodeError
from typing import Any
from google.auth.exceptions import RefreshError
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from langchain_core.chat_sessions import ChatSession
from langchain_core.messages import HumanMessage
from langchain_google_community.gmail.loader import GMailLoader
from loguru import logger
from langflow.custom.custom_component.component import Component
from langflow.inputs.inputs import MessageTextInput
from langflow.io import SecretStrInput
from langflow.schema.data import Data
from langflow.template.field.base import Output
class GmailLoaderComponent(Component):
display_name = "Gmail Loader"
description = "Loads emails from Gmail using provided credentials."
icon = "Google"
legacy: bool = True
inputs = [
SecretStrInput(
name="json_string",
display_name="JSON String of the Service Account Token",
info="JSON string containing OAuth 2.0 access token information for service account access",
required=True,
value="""{
"account": "",
"client_id": "",
"client_secret": "",
"expiry": "",
"refresh_token": "",
"scopes": [
"https://www.googleapis.com/auth/gmail.readonly",
],
"token": "",
"token_uri": "https://oauth2.googleapis.com/token",
"universe_domain": "googleapis.com"
}""",
),
MessageTextInput(
name="label_ids",
display_name="Label IDs",
info="Comma-separated list of label IDs to filter emails.",
required=True,
value="INBOX,SENT,UNREAD,IMPORTANT",
),
MessageTextInput(
name="max_results",
display_name="Max Results",
info="Maximum number of emails to load.",
required=True,
value="10",
),
]
outputs = [
Output(display_name="Data", name="data", method="load_emails"),
]
def load_emails(self) -> Data:
class CustomGMailLoader(GMailLoader):
def __init__(
self, creds: Any, *, n: int = 100, label_ids: list[str] | None = None, raise_error: bool = False
) -> None:
super().__init__(creds, n, raise_error)
self.label_ids = label_ids if label_ids is not None else ["SENT"]
def clean_message_content(self, message):
# Remove URLs
message = re.sub(r"http\S+|www\S+|https\S+", "", message, flags=re.MULTILINE)
# Remove email addresses
message = re.sub(r"\S+@\S+", "", message)
# Remove special characters and excessive whitespace
message = re.sub(r"[^A-Za-z0-9\s]+", " ", message)
message = re.sub(r"\s{2,}", " ", message)
# Trim leading and trailing whitespace
return message.strip()
def _extract_email_content(self, msg: Any) -> HumanMessage:
from_email = None
for values in msg["payload"]["headers"]:
name = values["name"]
if name == "From":
from_email = values["value"]
if from_email is None:
msg = "From email not found."
raise ValueError(msg)
parts = msg["payload"]["parts"] if "parts" in msg["payload"] else [msg["payload"]]
for part in parts:
if part["mimeType"] == "text/plain":
data = part["body"]["data"]
data = base64.urlsafe_b64decode(data).decode("utf-8")
pattern = re.compile(r"\r\nOn .+(\r\n)*wrote:\r\n")
newest_response = re.split(pattern, data)[0]
return HumanMessage(
content=self.clean_message_content(newest_response),
additional_kwargs={"sender": from_email},
)
msg = "No plain text part found in the email."
raise ValueError(msg)
def _get_message_data(self, service: Any, message: Any) -> ChatSession:
msg = service.users().messages().get(userId="me", id=message["id"]).execute()
message_content = self._extract_email_content(msg)
in_reply_to = None
email_data = msg["payload"]["headers"]
for values in email_data:
name = values["name"]
if name == "In-Reply-To":
in_reply_to = values["value"]
thread_id = msg["threadId"]
if in_reply_to:
thread = service.users().threads().get(userId="me", id=thread_id).execute()
messages = thread["messages"]
response_email = None
for _message in messages:
email_data = _message["payload"]["headers"]
for values in email_data:
if values["name"] == "Message-ID":
message_id = values["value"]
if message_id == in_reply_to:
response_email = _message
if response_email is None:
msg = "Response email not found in the thread."
raise ValueError(msg)
starter_content = self._extract_email_content(response_email)
return ChatSession(messages=[starter_content, message_content])
return ChatSession(messages=[message_content])
def lazy_load(self) -> Iterator[ChatSession]:
service = build("gmail", "v1", credentials=self.creds)
results = (
service.users().messages().list(userId="me", labelIds=self.label_ids, maxResults=self.n).execute()
)
messages = results.get("messages", [])
if not messages:
logger.warning("No messages found with the specified labels.")
for message in messages:
try:
yield self._get_message_data(service, message)
except Exception:
if self.raise_error:
raise
else:
logger.exception(f"Error processing message {message['id']}")
json_string = self.json_string
label_ids = self.label_ids.split(",") if self.label_ids else ["INBOX"]
max_results = int(self.max_results) if self.max_results else 100
# Load the token information from the JSON string
try:
token_info = json.loads(json_string)
except JSONDecodeError as e:
msg = "Invalid JSON string"
raise ValueError(msg) from e
creds = Credentials.from_authorized_user_info(token_info)
# Initialize the custom loader with the provided credentials
loader = CustomGMailLoader(creds=creds, n=max_results, label_ids=label_ids)
try:
docs = loader.load()
except RefreshError as e:
msg = "Authentication error: Unable to refresh authentication token. Please try to reauthenticate."
raise ValueError(msg) from e
except Exception as e:
msg = f"Error loading documents: {e}"
raise ValueError(msg) from e
# Return the loaded documents
self.status = docs
return Data(data={"text": docs})
Google Drive loader
Google components are available in the Components menu under Bundles.
This component loads documents from Google Drive using provided credentials and a single document ID. For more information about creating a service account JSON, see Service Account JSON.
Parameters
Input | Type | Description |
---|---|---|
json_string |
SecretStrInput |
JSON string containing OAuth 2.0 access token information for service account access |
document_id |
MessageTextInput |
Single Google Drive document ID |
Output | Type | Description |
---|---|---|
docs |
Data |
Loaded document data |
Component code
google_drive.py
import json
from json.decoder import JSONDecodeError
from google.auth.exceptions import RefreshError
from google.oauth2.credentials import Credentials
from langchain_google_community import GoogleDriveLoader
from langflow.custom.custom_component.component import Component
from langflow.helpers.data import docs_to_data
from langflow.inputs.inputs import MessageTextInput
from langflow.io import SecretStrInput
from langflow.schema.data import Data
from langflow.template.field.base import Output
class GoogleDriveComponent(Component):
display_name = "Google Drive Loader"
description = "Loads documents from Google Drive using provided credentials."
icon = "Google"
legacy: bool = True
inputs = [
SecretStrInput(
name="json_string",
display_name="JSON String of the Service Account Token",
info="JSON string containing OAuth 2.0 access token information for service account access",
required=True,
),
MessageTextInput(
name="document_id", display_name="Document ID", info="Single Google Drive document ID", required=True
),
]
outputs = [
Output(display_name="Loaded Documents", name="docs", method="load_documents"),
]
def load_documents(self) -> Data:
class CustomGoogleDriveLoader(GoogleDriveLoader):
creds: Credentials | None = None
"""Credentials object to be passed directly."""
def _load_credentials(self):
"""Load credentials from the provided creds attribute or fallback to the original method."""
if self.creds:
return self.creds
msg = "No credentials provided."
raise ValueError(msg)
class Config:
arbitrary_types_allowed = True
json_string = self.json_string
document_ids = [self.document_id]
if len(document_ids) != 1:
msg = "Expected a single document ID"
raise ValueError(msg)
# TODO: Add validation to check if the document ID is valid
# Load the token information from the JSON string
try:
token_info = json.loads(json_string)
except JSONDecodeError as e:
msg = "Invalid JSON string"
raise ValueError(msg) from e
# Initialize the custom loader with the provided credentials and document IDs
loader = CustomGoogleDriveLoader(
creds=Credentials.from_authorized_user_info(token_info), document_ids=document_ids
)
# Load the documents
try:
docs = loader.load()
# catch google.auth.exceptions.RefreshError
except RefreshError as e:
msg = "Authentication error: Unable to refresh authentication token. Please try to reauthenticate."
raise ValueError(msg) from e
except Exception as e:
msg = f"Error loading documents: {e}"
raise ValueError(msg) from e
if len(docs) != 1:
msg = "Expected a single document to be loaded."
raise ValueError(msg)
data = docs_to_data(docs)
# Return the loaded documents
self.status = data
return Data(data={"text": data})
Google Drive search
Google components are available in the Components menu under Bundles.
This component searches Google Drive files using provided credentials and query parameters. For more information about creating a service account JSON, see Service Account JSON.
Parameters
Input | Type | Description |
---|---|---|
token_string |
SecretStrInput |
JSON string containing OAuth 2.0 access token information for service account access |
query_item |
DropdownInput |
The field to query |
valid_operator |
DropdownInput |
Operator to use in the query |
search_term |
MessageTextInput |
The value to search for in the specified query item |
query_string |
MessageTextInput |
The query string used for searching (can be edited manually) |
Output | Type | Description |
---|---|---|
doc_urls |
List[str] |
URLs of the found documents |
doc_ids |
List[str] |
IDs of the found documents |
doc_titles |
List[str] |
Titles of the found documents |
Data |
Data |
Document titles and URLs in a structured format |
Component code
google_drive_search.py
import json
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from langflow.custom.custom_component.component import Component
from langflow.inputs.inputs import DropdownInput, MessageTextInput
from langflow.io import SecretStrInput
from langflow.schema.data import Data
from langflow.template.field.base import Output
class GoogleDriveSearchComponent(Component):
display_name = "Google Drive Search"
description = "Searches Google Drive files using provided credentials and query parameters."
icon = "Google"
legacy: bool = True
inputs = [
SecretStrInput(
name="token_string",
display_name="Token String",
info="JSON string containing OAuth 2.0 access token information for service account access",
required=True,
),
DropdownInput(
name="query_item",
display_name="Query Item",
options=[
"name",
"fullText",
"mimeType",
"modifiedTime",
"viewedByMeTime",
"trashed",
"starred",
"parents",
"owners",
"writers",
"readers",
"sharedWithMe",
"createdTime",
"properties",
"appProperties",
"visibility",
"shortcutDetails.targetId",
],
info="The field to query.",
required=True,
),
DropdownInput(
name="valid_operator",
display_name="Valid Operator",
options=["contains", "=", "!=", "<=", "<", ">", ">=", "in", "has"],
info="Operator to use in the query.",
required=True,
),
MessageTextInput(
name="search_term",
display_name="Search Term",
info="The value to search for in the specified query item.",
required=True,
),
MessageTextInput(
name="query_string",
display_name="Query String",
info="The query string used for searching. You can edit this manually.",
value="", # This will be updated with the generated query string
),
]
outputs = [
Output(display_name="Document URLs", name="doc_urls", method="search_doc_urls"),
Output(display_name="Document IDs", name="doc_ids", method="search_doc_ids"),
Output(display_name="Document Titles", name="doc_titles", method="search_doc_titles"),
Output(display_name="Data", name="Data", method="search_data"),
]
def generate_query_string(self) -> str:
query_item = self.query_item
valid_operator = self.valid_operator
search_term = self.search_term
# Construct the query string
query = f"{query_item} {valid_operator} '{search_term}'"
# Update the editable query string input with the generated query
self.query_string = query
return query
def on_inputs_changed(self) -> None:
# Automatically regenerate the query string when inputs change
self.generate_query_string()
def generate_file_url(self, file_id: str, mime_type: str) -> str:
"""Generates the appropriate Google Drive URL for a file based on its MIME type."""
return {
"application/vnd.google-apps.document": f"https://docs.google.com/document/d/{file_id}/edit",
"application/vnd.google-apps.spreadsheet": f"https://docs.google.com/spreadsheets/d/{file_id}/edit",
"application/vnd.google-apps.presentation": f"https://docs.google.com/presentation/d/{file_id}/edit",
"application/vnd.google-apps.drawing": f"https://docs.google.com/drawings/d/{file_id}/edit",
"application/pdf": f"https://drive.google.com/file/d/{file_id}/view?usp=drivesdk",
}.get(mime_type, f"https://drive.google.com/file/d/{file_id}/view?usp=drivesdk")
def search_files(self) -> dict:
# Load the token information from the JSON string
token_info = json.loads(self.token_string)
creds = Credentials.from_authorized_user_info(token_info)
# Use the query string from the input (which might have been edited by the user)
query = self.query_string or self.generate_query_string()
# Initialize the Google Drive API service
service = build("drive", "v3", credentials=creds)
# Perform the search
results = service.files().list(q=query, pageSize=5, fields="nextPageToken, files(id, name, mimeType)").execute()
items = results.get("files", [])
doc_urls = []
doc_ids = []
doc_titles_urls = []
doc_titles = []
if items:
for item in items:
# Directly use the file ID, title, and MIME type to generate the URL
file_id = item["id"]
file_title = item["name"]
mime_type = item["mimeType"]
file_url = self.generate_file_url(file_id, mime_type)
# Store the URL, ID, and title+URL in their respective lists
doc_urls.append(file_url)
doc_ids.append(file_id)
doc_titles.append(file_title)
doc_titles_urls.append({"title": file_title, "url": file_url})
return {"doc_urls": doc_urls, "doc_ids": doc_ids, "doc_titles_urls": doc_titles_urls, "doc_titles": doc_titles}
def search_doc_ids(self) -> list[str]:
return self.search_files()["doc_ids"]
def search_doc_urls(self) -> list[str]:
return self.search_files()["doc_urls"]
def search_doc_titles(self) -> list[str]:
return self.search_files()["doc_titles"]
def search_data(self) -> Data:
return Data(data={"text": self.search_files()["doc_titles_urls"]})
SQL Query
This component executes SQL queries on a specified database.
Parameters
Name | Display Name | Info |
---|---|---|
query |
Query |
The SQL query to execute. |
database_url |
Database URL |
The URL of the database. |
include_columns |
Include Columns |
Include columns in the result. |
passthrough |
Passthrough |
If an error occurs, return the query instead of raising an exception. |
add_error |
Add Error |
Add the error to the result. |
Name | Display Name | Info |
---|---|---|
result |
Result |
The result of the SQL query execution. |
Component code
sql_executor.py
from typing import TYPE_CHECKING, Any
from langchain_community.utilities import SQLDatabase
from sqlalchemy.exc import SQLAlchemyError
from langflow.custom.custom_component.component_with_cache import ComponentWithCache
from langflow.io import BoolInput, MessageTextInput, MultilineInput, Output
from langflow.schema.dataframe import DataFrame
from langflow.schema.message import Message
from langflow.services.cache.utils import CacheMiss
if TYPE_CHECKING:
from sqlalchemy.engine import Result
class SQLComponent(ComponentWithCache):
"""A sql component."""
display_name = "SQL Database"
description = "Executes SQL queries on SQLAlchemy-compatible databases."
icon = "database"
name = "SQLComponent"
metadata = {"keywords": ["sql", "database", "query", "db", "fetch"]}
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.db: SQLDatabase = None
def maybe_create_db(self):
if self.database_url != "":
cached_db = self._shared_component_cache.get(self.database_url)
if not isinstance(cached_db, CacheMiss):
self.db = cached_db
return
self.log("Connecting to database")
try:
self.db = SQLDatabase.from_uri(self.database_url)
except Exception as e:
msg = f"An error occurred while connecting to the database: {e}"
raise ValueError(msg) from e
self._shared_component_cache.set(self.database_url, self.db)
inputs = [
MessageTextInput(name="database_url", display_name="Database URL", required=True),
MultilineInput(name="query", display_name="SQL Query", tool_mode=True, required=True),
BoolInput(name="include_columns", display_name="Include Columns", value=True, tool_mode=True, advanced=True),
BoolInput(
name="add_error",
display_name="Add Error",
value=False,
tool_mode=True,
info="If True, the error will be added to the result",
advanced=True,
),
]
outputs = [
Output(display_name="Result Table", name="run_sql_query", method="run_sql_query"),
]
def build_component(
self,
) -> Message:
error = None
self.maybe_create_db()
try:
result = self.db.run(self.query, include_columns=self.include_columns)
self.status = result
except SQLAlchemyError as e:
msg = f"An error occurred while running the SQL Query: {e}"
self.log(msg)
result = str(e)
self.status = result
error = repr(e)
if self.add_error and error is not None:
result = f"{result}\n\nError: {error}\n\nQuery: {self.query}"
elif error is not None:
# Then we won't add the error to the result
result = self.query
return Message(text=result)
def __execute_query(self) -> list[dict[str, Any]]:
self.maybe_create_db()
try:
cursor: Result[Any] = self.db.run(self.query, fetch="cursor")
return [x._asdict() for x in cursor.fetchall()]
except SQLAlchemyError as e:
msg = f"An error occurred while running the SQL Query: {e}"
self.log(msg)
raise ValueError(msg) from e
def run_sql_query(self) -> DataFrame:
result = self.__execute_query()
df_result = DataFrame(result)
self.status = df_result
return df_result
URL
The URLComponent fetches content from one or more URLs, processes the content, and returns it in various formats. It supports output in plain text, raw HTML, or JSON, with options for cleaning and separating multiple outputs.
Parameters
Name | Display Name | Info |
---|---|---|
urls |
URLs |
Enter one or more URLs. URLs are automatically validated and cleaned. |
format |
Output Format |
Output Format. Use Text to extract text from the HTML, Raw HTML for the raw HTML content, or JSON to extract JSON from the HTML. |
separator |
Separator |
Specify the separator to use between multiple outputs. The default separator for Text is |
clean_extra_whitespace |
Clean Extra Whitespace |
Controls whether to clean excessive blank lines in the text output. This option only applies to |
Name | Display Name | Info |
---|---|---|
data |
Data |
List of data objects containing fetched content and metadata. |
text |
Text |
Fetched content as formatted text, with applied separators and cleaning. |
dataframe |
DataFrame |
Content formatted as a data object. |
Component code
url.py
import re
import requests
from bs4 import BeautifulSoup
from langchain_community.document_loaders import RecursiveUrlLoader
from loguru import logger
from langflow.custom.custom_component.component import Component
from langflow.field_typing.range_spec import RangeSpec
from langflow.helpers.data import safe_convert
from langflow.io import BoolInput, DropdownInput, IntInput, MessageTextInput, Output, SliderInput, TableInput
from langflow.schema.dataframe import DataFrame
from langflow.schema.message import Message
from langflow.services.deps import get_settings_service
# Constants
DEFAULT_TIMEOUT = 30
DEFAULT_MAX_DEPTH = 1
DEFAULT_FORMAT = "Text"
URL_REGEX = re.compile(
r"^(https?:\/\/)?"
r"(www\.)?"
r"([a-zA-Z0-9.-]+)"
r"(\.[a-zA-Z]{2,})?"
r"(:\d+)?"
r"(\/[^\s]*)?$",
re.IGNORECASE,
)
class URLComponent(Component):
"""A component that loads and parses content from web pages recursively.
This component allows fetching content from one or more URLs, with options to:
- Control crawl depth
- Prevent crawling outside the root domain
- Use async loading for better performance
- Extract either raw HTML or clean text
- Configure request headers and timeouts
"""
display_name = "URL"
description = "Fetch content from one or more web pages, following links recursively."
icon = "layout-template"
name = "URLComponent"
inputs = [
MessageTextInput(
name="urls",
display_name="URLs",
info="Enter one or more URLs to crawl recursively, by clicking the '+' button.",
is_list=True,
tool_mode=True,
placeholder="Enter a URL...",
list_add_label="Add URL",
input_types=[],
),
SliderInput(
name="max_depth",
display_name="Depth",
info=(
"Controls how many 'clicks' away from the initial page the crawler will go:\n"
"- depth 1: only the initial page\n"
"- depth 2: initial page + all pages linked directly from it\n"
"- depth 3: initial page + direct links + links found on those direct link pages\n"
"Note: This is about link traversal, not URL path depth."
),
value=DEFAULT_MAX_DEPTH,
range_spec=RangeSpec(min=1, max=5, step=1),
required=False,
min_label=" ",
max_label=" ",
min_label_icon="None",
max_label_icon="None",
# slider_input=True
),
BoolInput(
name="prevent_outside",
display_name="Prevent Outside",
info=(
"If enabled, only crawls URLs within the same domain as the root URL. "
"This helps prevent the crawler from going to external websites."
),
value=True,
required=False,
advanced=True,
),
BoolInput(
name="use_async",
display_name="Use Async",
info=(
"If enabled, uses asynchronous loading which can be significantly faster "
"but might use more system resources."
),
value=True,
required=False,
advanced=True,
),
DropdownInput(
name="format",
display_name="Output Format",
info="Output Format. Use 'Text' to extract the text from the HTML or 'HTML' for the raw HTML content.",
options=["Text", "HTML"],
value=DEFAULT_FORMAT,
advanced=True,
),
IntInput(
name="timeout",
display_name="Timeout",
info="Timeout for the request in seconds.",
value=DEFAULT_TIMEOUT,
required=False,
advanced=True,
),
TableInput(
name="headers",
display_name="Headers",
info="The headers to send with the request",
table_schema=[
{
"name": "key",
"display_name": "Header",
"type": "str",
"description": "Header name",
},
{
"name": "value",
"display_name": "Value",
"type": "str",
"description": "Header value",
},
],
value=[{"key": "User-Agent", "value": get_settings_service().settings.user_agent}],
advanced=True,
input_types=["DataFrame"],
),
BoolInput(
name="filter_text_html",
display_name="Filter Text/HTML",
info="If enabled, filters out text/css content type from the results.",
value=True,
required=False,
advanced=True,
),
BoolInput(
name="continue_on_failure",
display_name="Continue on Failure",
info="If enabled, continues crawling even if some requests fail.",
value=True,
required=False,
advanced=True,
),
BoolInput(
name="check_response_status",
display_name="Check Response Status",
info="If enabled, checks the response status of the request.",
value=False,
required=False,
advanced=True,
),
BoolInput(
name="autoset_encoding",
display_name="Autoset Encoding",
info="If enabled, automatically sets the encoding of the request.",
value=True,
required=False,
advanced=True,
),
]
outputs = [
Output(display_name="Extracted Pages", name="page_results", method="fetch_content"),
Output(display_name="Raw Content", name="raw_results", method="fetch_content_as_message", tool_mode=False),
]
@staticmethod
def validate_url(url: str) -> bool:
"""Validates if the given string matches URL pattern.
Args:
url: The URL string to validate
Returns:
bool: True if the URL is valid, False otherwise
"""
return bool(URL_REGEX.match(url))
def ensure_url(self, url: str) -> str:
"""Ensures the given string is a valid URL.
Args:
url: The URL string to validate and normalize
Returns:
str: The normalized URL
Raises:
ValueError: If the URL is invalid
"""
url = url.strip()
if not url.startswith(("http://", "https://")):
url = "https://" + url
if not self.validate_url(url):
msg = f"Invalid URL: {url}"
raise ValueError(msg)
return url
def _create_loader(self, url: str) -> RecursiveUrlLoader:
"""Creates a RecursiveUrlLoader instance with the configured settings.
Args:
url: The URL to load
Returns:
RecursiveUrlLoader: Configured loader instance
"""
headers_dict = {header["key"]: header["value"] for header in self.headers}
extractor = (lambda x: x) if self.format == "HTML" else (lambda x: BeautifulSoup(x, "lxml").get_text())
return RecursiveUrlLoader(
url=url,
max_depth=self.max_depth,
prevent_outside=self.prevent_outside,
use_async=self.use_async,
extractor=extractor,
timeout=self.timeout,
headers=headers_dict,
check_response_status=self.check_response_status,
continue_on_failure=self.continue_on_failure,
base_url=url, # Add base_url to ensure consistent domain crawling
autoset_encoding=self.autoset_encoding, # Enable automatic encoding detection
exclude_dirs=[], # Allow customization of excluded directories
link_regex=None, # Allow customization of link filtering
)
def fetch_url_contents(self) -> list[dict]:
"""Load documents from the configured URLs.
Returns:
List[Data]: List of Data objects containing the fetched content
Raises:
ValueError: If no valid URLs are provided or if there's an error loading documents
"""
try:
urls = list({self.ensure_url(url) for url in self.urls if url.strip()})
logger.info(f"URLs: {urls}")
if not urls:
msg = "No valid URLs provided."
raise ValueError(msg)
all_docs = []
for url in urls:
logger.info(f"Loading documents from {url}")
try:
loader = self._create_loader(url)
docs = loader.load()
if not docs:
logger.warning(f"No documents found for {url}")
continue
logger.info(f"Found {len(docs)} documents from {url}")
all_docs.extend(docs)
except requests.exceptions.RequestException as e:
logger.exception(f"Error loading documents from {url}: {e}")
continue
if not all_docs:
msg = "No documents were successfully loaded from any URL"
raise ValueError(msg)
# data = [Data(text=doc.page_content, **doc.metadata) for doc in all_docs]
data = [
{
"text": safe_convert(doc.page_content, clean_data=True),
"url": doc.metadata.get("source", ""),
"title": doc.metadata.get("title", ""),
"description": doc.metadata.get("description", ""),
"content_type": doc.metadata.get("content_type", ""),
"language": doc.metadata.get("language", ""),
}
for doc in all_docs
]
except Exception as e:
error_msg = e.message if hasattr(e, "message") else e
msg = f"Error loading documents: {error_msg!s}"
logger.exception(msg)
raise ValueError(msg) from e
return data
def fetch_content(self) -> DataFrame:
"""Convert the documents to a DataFrame."""
return DataFrame(data=self.fetch_url_contents())
def fetch_content_as_message(self) -> Message:
"""Convert the documents to a Message."""
url_contents = self.fetch_url_contents()
return Message(text="\n\n".join([x["text"] for x in url_contents]), data={"data": url_contents})
Webhook
This component defines a webhook trigger that runs a flow when it receives an HTTP POST request.
If the input is not valid JSON, the component wraps it in a payload
object so that it can be processed and still trigger the flow. The component does not require an API key.
When a Webhook component is added to the workspace, the Endpoint field in the Webhook component tab contains an endpoint for triggering the webhook component.
To use the webhook component, copy the endpoint URL and paste it into a POST request. The request must include your Astra DB application token in the header.
curl -X POST \
"http://127.0.0.1:7860/api/v1/webhook/**YOUR_FLOW_ID**" \
-H 'Content-Type: application/json'\
-H 'Authorization: Bearer AstraCS:...' \
-d '{"any": "data"}'
To test the webhook component:
-
Add a Webhook component to the flow.
-
Connect the Webhook component’s Data output to the Data input of a Parser component.
-
Connect the Parser component’s Parsed Text output to the Text input of a Chat output component.
-
To send a POST request, copy the URL from the Endpoint field and paste it into a POST request. The request must include your Astra DB application token in the header.
-
Send the POST request.
-
Open the Playground. Your JSON data is posted to the Chat output component, which indicates that the webhook component is correctly triggering the flow.
Parameters
Name | Type | Description |
---|---|---|
data |
Payload |
Receives a payload through HTTP POST requests. |
curl |
cURL |
|
The cURL command template for making requests to this webhook. |
endpoint |
|
Endpoint |
The endpoint URL where the webhook component receives requests. |
Name | Type | Description |
---|---|---|
output_data |
Data |
Outputs processed data from the webhook input, and returns an empty data object if no input is provided.
If the input is not valid JSON, the component wraps it in a |
Component code
webhook.py
import json
from langflow.custom.custom_component.component import Component
from langflow.io import MultilineInput, Output
from langflow.schema.data import Data
class WebhookComponent(Component):
display_name = "Webhook"
name = "Webhook"
icon = "webhook"
inputs = [
MultilineInput(
name="data",
display_name="Payload",
info="Receives a payload from external systems via HTTP POST.",
advanced=True,
),
MultilineInput(
name="curl",
display_name="cURL",
value="CURL_WEBHOOK",
advanced=True,
input_types=[],
),
MultilineInput(
name="endpoint",
display_name="Endpoint",
value="BACKEND_URL",
advanced=False,
copy_field=True,
input_types=[],
),
]
outputs = [
Output(display_name="Data", name="output_data", method="build_data"),
]
def build_data(self) -> Data:
message: str | Data = ""
if not self.data:
self.status = "No data provided."
return Data(data={})
try:
my_data = self.data.replace('"\n"', '"\\n"')
body = json.loads(my_data or "{}")
except json.JSONDecodeError:
body = {"payload": self.data}
message = f"Invalid JSON payload. Please check the format.\n\n{self.data}"
data = Data(data=body)
if not message:
message = data
self.status = message
return data