Data
Data components define how data is processed in your flow. They can be used to fetch data from external sources, process data, or store data in memory.
API request
This component sends HTTP requests to the specified URLs.
Use this component to interact with external APIs or services and retrieve data. Ensure that the URLs are valid and that you configure the method, headers, body, and timeout correctly.
Parameters
Name | Display Name | Info |
---|---|---|
URLs |
URLs |
The URLs to target |
curl |
curl |
Paste a curl command to fill in the dictionary fields for headers and body |
Method |
HTTP Method |
The HTTP method to use, such as GET or POST |
Headers |
Headers |
The headers to include with the request |
Body |
Request Body |
The data to send with the request (for methods like POST, PATCH, PUT) |
Timeout |
Timeout |
The maximum time to wait for a response |
Component code
api_request.py
import asyncio
import json
import mimetypes
import re
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Any
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
from zoneinfo import ZoneInfo
import httpx
import validators
from aiofile import async_open
from langflow.base.curl.parse import parse_context
from langflow.custom import Component
from langflow.io import BoolInput, DataInput, DropdownInput, IntInput, MessageTextInput, NestedDictInput, Output
from langflow.schema import Data
from langflow.schema.dotdict import dotdict
class APIRequestComponent(Component):
display_name = "API Request"
description = (
"This component allows you to make HTTP requests to one or more URLs. "
"You can provide headers and body as either dictionaries or Data objects. "
"Additionally, you can append query parameters to the URLs.\n\n"
"**Note:** Check advanced options for more settings."
)
icon = "Globe"
name = "APIRequest"
inputs = [
MessageTextInput(
name="urls",
display_name="URLs",
list=True,
info="Enter one or more URLs, separated by commas.",
),
MessageTextInput(
name="curl",
display_name="cURL",
info="Paste a curl command to populate the fields. "
"This will fill in the dictionary fields for headers and body.",
advanced=False,
refresh_button=True,
real_time_refresh=True,
tool_mode=True,
),
DropdownInput(
name="method",
display_name="Method",
options=["GET", "POST", "PATCH", "PUT"],
value="GET",
info="The HTTP method to use (GET, POST, PATCH, PUT).",
),
NestedDictInput(
name="headers",
display_name="Headers",
info="The headers to send with the request as a dictionary. This is populated when using the CURL field.",
input_types=["Data"],
),
NestedDictInput(
name="body",
display_name="Body",
info="The body to send with the request as a dictionary (for POST, PATCH, PUT). "
"This is populated when using the CURL field.",
input_types=["Data"],
),
DataInput(
name="query_params",
display_name="Query Parameters",
info="The query parameters to append to the URL.",
tool_mode=True,
),
IntInput(
name="timeout",
display_name="Timeout",
value=5,
info="The timeout to use for the request.",
),
BoolInput(
name="follow_redirects",
display_name="Follow Redirects",
value=True,
info="Whether to follow http redirects.",
advanced=True,
),
BoolInput(
name="save_to_file",
display_name="Save to File",
value=False,
info="Save the API response to a temporary file",
advanced=True,
),
BoolInput(
name="include_httpx_metadata",
display_name="Include HTTPx Metadata",
value=False,
info=(
"Include properties such as headers, status_code, response_headers, "
"and redirection_history in the output."
),
advanced=True,
),
]
outputs = [
Output(display_name="Data", name="data", method="make_requests"),
]
def parse_curl(self, curl: str, build_config: dotdict) -> dotdict:
try:
parsed = parse_context(curl)
build_config["urls"]["value"] = [parsed.url]
build_config["method"]["value"] = parsed.method.upper()
build_config["headers"]["value"] = dict(parsed.headers)
if parsed.data:
try:
json_data = json.loads(parsed.data)
build_config["body"]["value"] = json_data
except json.JSONDecodeError:
self.log("Error decoding JSON data")
else:
build_config["body"]["value"] = {}
except Exception as exc:
msg = f"Error parsing curl: {exc}"
self.log(msg)
raise ValueError(msg) from exc
return build_config
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
if field_name == "curl" and field_value:
build_config = self.parse_curl(field_value, build_config)
return build_config
async def make_request(
self,
client: httpx.AsyncClient,
method: str,
url: str,
headers: dict | None = None,
body: dict | None = None,
timeout: int = 5,
*,
follow_redirects: bool = True,
save_to_file: bool = False,
include_httpx_metadata: bool = False,
) -> Data:
method = method.upper()
if method not in {"GET", "POST", "PATCH", "PUT", "DELETE"}:
msg = f"Unsupported method: {method}"
raise ValueError(msg)
if isinstance(body, str) and body:
try:
body = json.loads(body)
except Exception as e:
msg = f"Error decoding JSON data: {e}"
self.log.exception(msg)
body = None
raise ValueError(msg) from e
data = body or None
redirection_history = []
try:
response = await client.request(
method,
url,
headers=headers,
json=data,
timeout=timeout,
follow_redirects=follow_redirects,
)
redirection_history = [
{"url": str(redirect.url), "status_code": redirect.status_code} for redirect in response.history
]
if response.is_redirect:
redirection_history.append({"url": str(response.url), "status_code": response.status_code})
is_binary, file_path = self._response_info(response, with_file_path=save_to_file)
response_headers = self._headers_to_dict(response.headers)
metadata: dict[str, Any] = {
"source": url,
}
if save_to_file:
mode = "wb" if is_binary else "w"
encoding = response.encoding if mode == "w" else None
if file_path:
async with async_open(file_path, mode, encoding=encoding) as f:
await f.write(response.content if is_binary else response.text)
if include_httpx_metadata:
metadata.update(
{
"file_path": str(file_path),
"headers": headers,
"status_code": response.status_code,
"response_headers": response_headers,
**({"redirection_history": redirection_history} if redirection_history else {}),
}
)
return Data(data=metadata)
# Populate result when not saving to a file
if is_binary:
result = response.content
else:
try:
result = response.json()
except Exception: # noqa: BLE001
self.log("Error decoding JSON response")
result = response.text.encode("utf-8")
# Add result to metadata
metadata.update({"result": result})
# Add metadata to the output
if include_httpx_metadata:
metadata.update(
{
"headers": headers,
"status_code": response.status_code,
"response_headers": response_headers,
**({"redirection_history": redirection_history} if redirection_history else {}),
}
)
return Data(data=metadata)
except httpx.TimeoutException:
return Data(
data={
"source": url,
"headers": headers,
"status_code": 408,
"error": "Request timed out",
},
)
except Exception as exc: # noqa: BLE001
self.log(f"Error making request to {url}")
return Data(
data={
"source": url,
"headers": headers,
"status_code": 500,
"error": str(exc),
**({"redirection_history": redirection_history} if redirection_history else {}),
},
)
def add_query_params(self, url: str, params: dict) -> str:
url_parts = list(urlparse(url))
query = dict(parse_qsl(url_parts[4]))
query.update(params)
url_parts[4] = urlencode(query)
return urlunparse(url_parts)
async def make_requests(self) -> list[Data]:
method = self.method
urls = [url.strip() for url in self.urls if url.strip()]
curl = self.curl
headers = self.headers or {}
body = self.body or {}
timeout = self.timeout
follow_redirects = self.follow_redirects
save_to_file = self.save_to_file
include_httpx_metadata = self.include_httpx_metadata
invalid_urls = [url for url in urls if not validators.url(url)]
if invalid_urls:
msg = f"Invalid URLs provided: {invalid_urls}"
raise ValueError(msg)
if isinstance(self.query_params, str):
query_params = dict(parse_qsl(self.query_params))
else:
query_params = self.query_params.data if self.query_params else {}
if curl:
self._build_config = self.parse_curl(curl, dotdict())
if isinstance(headers, Data):
headers = headers.data
if isinstance(body, Data):
body = body.data
bodies = [body] * len(urls)
urls = [self.add_query_params(url, query_params) for url in urls]
async with httpx.AsyncClient() as client:
results = await asyncio.gather(
*[
self.make_request(
client,
method,
u,
headers,
rec,
timeout,
follow_redirects=follow_redirects,
save_to_file=save_to_file,
include_httpx_metadata=include_httpx_metadata,
)
for u, rec in zip(urls, bodies, strict=True)
]
)
self.status = results
return results
def _response_info(self, response: httpx.Response, *, with_file_path: bool = False) -> tuple[bool, Path | None]:
"""Determine the file path and whether the response content is binary.
Args:
response (Response): The HTTP response object.
with_file_path (bool): Whether to save the response content to a file.
Returns:
Tuple[bool, Path | None]:
A tuple containing a boolean indicating if the content is binary and the full file path (if applicable).
"""
# Determine if the content is binary
content_type = response.headers.get("Content-Type", "")
is_binary = "application/octet-stream" in content_type or "application/binary" in content_type
if not with_file_path:
return is_binary, None
# Step 1: Set up a subdirectory for the component in the OS temp directory
component_temp_dir = Path(tempfile.gettempdir()) / self.__class__.__name__
component_temp_dir.mkdir(parents=True, exist_ok=True)
# Step 2: Extract filename from Content-Disposition
filename = None
if "Content-Disposition" in response.headers:
content_disposition = response.headers["Content-Disposition"]
filename_match = re.search(r'filename="(.+?)"', content_disposition)
if not filename_match: # Try to match RFC 5987 style
filename_match = re.search(r"filename\*=(?:UTF-8'')?(.+)", content_disposition)
if filename_match:
extracted_filename = filename_match.group(1)
# Ensure the filename is unique
if (component_temp_dir / extracted_filename).exists():
timestamp = datetime.now(ZoneInfo("UTC")).strftime("%Y%m%d%H%M%S%f")
filename = f"{timestamp}-{extracted_filename}"
else:
filename = extracted_filename
# Step 3: Infer file extension or use part of the request URL if no filename
if not filename:
# Extract the last segment of the URL path
url_path = urlparse(str(response.request.url)).path
base_name = Path(url_path).name # Get the last segment of the path
if not base_name: # If the path ends with a slash or is empty
base_name = "response"
# Infer file extension
extension = mimetypes.guess_extension(content_type.split(";")[0]) if content_type else None
if not extension:
extension = ".bin" if is_binary else ".txt" # Default extensions
# Combine the base name with timestamp and extension
timestamp = datetime.now(ZoneInfo("UTC")).strftime("%Y%m%d%H%M%S%f")
filename = f"{timestamp}-{base_name}{extension}"
# Step 4: Define the full file path
file_path = component_temp_dir / filename
return is_binary, file_path
def _headers_to_dict(self, headers: httpx.Headers) -> dict[str, str]:
"""Convert HTTP headers to a dictionary with lowercased keys."""
return {k.lower(): v for k, v in headers.items()}
File
This component loads and parses text files of various supported formats, converting the content into a Data object. It supports multiple file types and provides an option for silent error handling.
The maximum supported file size is 100 MB.
Parameters
Name | Display Name | Info |
---|---|---|
path |
Path |
File path to load. |
silent_errors |
Silent Errors |
If true, errors will not raise an exception |
Name | Display Name | Info |
---|---|---|
data |
Data |
Parsed content of the file as a Data object |
Supported file extensions
The following file types are supported for processing:
Supported file extensions
Document formats |
|
Data formats |
|
Markup Languages |
|
Programming Languages |
|
Image Formats |
|
Component code
file.py
from langflow.base.data import BaseFileComponent
from langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data
from langflow.io import BoolInput, IntInput
from langflow.schema import Data
class FileComponent(BaseFileComponent):
"""Handles loading and processing of individual or zipped text files.
This component supports processing multiple valid files within a zip archive,
resolving paths, validating file types, and optionally using multithreading for processing.
"""
display_name = "File"
description = "Load a file to be used in your project."
icon = "file-text"
name = "File"
VALID_EXTENSIONS = TEXT_FILE_TYPES
inputs = [
*BaseFileComponent._base_inputs,
BoolInput(
name="use_multithreading",
display_name="[Deprecated] Use Multithreading",
advanced=True,
value=True,
info="Set 'Processing Concurrency' greater than 1 to enable multithreading.",
),
IntInput(
name="concurrency_multithreading",
display_name="Processing Concurrency",
advanced=False,
info="When multiple files are being processed, the number of files to process concurrently.",
value=1,
),
]
outputs = [
*BaseFileComponent._base_outputs,
]
def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:
"""Processes files either sequentially or in parallel, depending on concurrency settings.
Args:
file_list (list[BaseFileComponent.BaseFile]): List of files to process.
Returns:
list[BaseFileComponent.BaseFile]: Updated list of files with merged data.
"""
def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None:
"""Processes a single file and returns its Data object."""
try:
return parse_text_file_to_data(file_path, silent_errors=silent_errors)
except FileNotFoundError as e:
msg = f"File not found: {file_path}. Error: {e}"
self.log(msg)
if not silent_errors:
raise
return None
except Exception as e:
msg = f"Unexpected error processing {file_path}: {e}"
self.log(msg)
if not silent_errors:
raise
return None
if not file_list:
msg = "No files to process."
raise ValueError(msg)
concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading)
file_count = len(file_list)
parallel_processing_threshold = 2
if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold:
if file_count > 1:
self.log(f"Processing {file_count} files sequentially.")
processed_data = [process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list]
else:
self.log(f"Starting parallel processing of {file_count} files with concurrency: {concurrency}.")
file_paths = [str(file.path) for file in file_list]
processed_data = parallel_load_data(
file_paths,
silent_errors=self.silent_errors,
load_function=process_file,
max_concurrency=concurrency,
)
# Use rollup_basefile_data to merge processed data with BaseFile objects
return self.rollup_data(file_list, processed_data)
Gmail loader
This component loads emails from Gmail using provided credentials and filters. For more information about creating a service account JSON, see Service Account JSON.
Parameters
Input | Type | Description |
---|---|---|
json_string |
SecretStrInput |
JSON string containing OAuth 2.0 access token information for service account access |
label_ids |
MessageTextInput |
Comma-separated list of label IDs to filter emails |
max_results |
MessageTextInput |
Maximum number of emails to load |
Output | Type | Description |
---|---|---|
data |
Data |
Loaded email data |
Component code
gmail.py
import base64
import json
import re
from collections.abc import Iterator
from json.decoder import JSONDecodeError
from typing import Any
from google.auth.exceptions import RefreshError
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from langchain_core.chat_sessions import ChatSession
from langchain_core.messages import HumanMessage
from langchain_google_community.gmail.loader import GMailLoader
from loguru import logger
from langflow.custom import Component
from langflow.inputs import MessageTextInput
from langflow.io import SecretStrInput
from langflow.schema import Data
from langflow.template import Output
class GmailLoaderComponent(Component):
display_name = "Gmail Loader"
description = "Loads emails from Gmail using provided credentials."
icon = "Google"
inputs = [
SecretStrInput(
name="json_string",
display_name="JSON String of the Service Account Token",
info="JSON string containing OAuth 2.0 access token information for service account access",
required=True,
value="""{
"account": "",
"client_id": "",
"client_secret": "",
"expiry": "",
"refresh_token": "",
"scopes": [
"https://www.googleapis.com/auth/gmail.readonly",
],
"token": "",
"token_uri": "https://oauth2.googleapis.com/token",
"universe_domain": "googleapis.com"
}""",
),
MessageTextInput(
name="label_ids",
display_name="Label IDs",
info="Comma-separated list of label IDs to filter emails.",
required=True,
value="INBOX,SENT,UNREAD,IMPORTANT",
),
MessageTextInput(
name="max_results",
display_name="Max Results",
info="Maximum number of emails to load.",
required=True,
value="10",
),
]
outputs = [
Output(display_name="Data", name="data", method="load_emails"),
]
def load_emails(self) -> Data:
class CustomGMailLoader(GMailLoader):
def __init__(
self, creds: Any, *, n: int = 100, label_ids: list[str] | None = None, raise_error: bool = False
) -> None:
super().__init__(creds, n, raise_error)
self.label_ids = label_ids if label_ids is not None else ["SENT"]
def clean_message_content(self, message):
# Remove URLs
message = re.sub(r"http\S+|www\S+|https\S+", "", message, flags=re.MULTILINE)
# Remove email addresses
message = re.sub(r"\S+@\S+", "", message)
# Remove special characters and excessive whitespace
message = re.sub(r"[^A-Za-z0-9\s]+", " ", message)
message = re.sub(r"\s{2,}", " ", message)
# Trim leading and trailing whitespace
return message.strip()
def _extract_email_content(self, msg: Any) -> HumanMessage:
from_email = None
for values in msg["payload"]["headers"]:
name = values["name"]
if name == "From":
from_email = values["value"]
if from_email is None:
msg = "From email not found."
raise ValueError(msg)
parts = msg["payload"]["parts"] if "parts" in msg["payload"] else [msg["payload"]]
for part in parts:
if part["mimeType"] == "text/plain":
data = part["body"]["data"]
data = base64.urlsafe_b64decode(data).decode("utf-8")
pattern = re.compile(r"\r\nOn .+(\r\n)*wrote:\r\n")
newest_response = re.split(pattern, data)[0]
return HumanMessage(
content=self.clean_message_content(newest_response),
additional_kwargs={"sender": from_email},
)
msg = "No plain text part found in the email."
raise ValueError(msg)
def _get_message_data(self, service: Any, message: Any) -> ChatSession:
msg = service.users().messages().get(userId="me", id=message["id"]).execute()
message_content = self._extract_email_content(msg)
in_reply_to = None
email_data = msg["payload"]["headers"]
for values in email_data:
name = values["name"]
if name == "In-Reply-To":
in_reply_to = values["value"]
thread_id = msg["threadId"]
if in_reply_to:
thread = service.users().threads().get(userId="me", id=thread_id).execute()
messages = thread["messages"]
response_email = None
for _message in messages:
email_data = _message["payload"]["headers"]
for values in email_data:
if values["name"] == "Message-ID":
message_id = values["value"]
if message_id == in_reply_to:
response_email = _message
if response_email is None:
msg = "Response email not found in the thread."
raise ValueError(msg)
starter_content = self._extract_email_content(response_email)
return ChatSession(messages=[starter_content, message_content])
return ChatSession(messages=[message_content])
def lazy_load(self) -> Iterator[ChatSession]:
service = build("gmail", "v1", credentials=self.creds)
results = (
service.users().messages().list(userId="me", labelIds=self.label_ids, maxResults=self.n).execute()
)
messages = results.get("messages", [])
if not messages:
logger.warning("No messages found with the specified labels.")
for message in messages:
try:
yield self._get_message_data(service, message)
except Exception:
if self.raise_error:
raise
else:
logger.exception(f"Error processing message {message['id']}")
json_string = self.json_string
label_ids = self.label_ids.split(",") if self.label_ids else ["INBOX"]
max_results = int(self.max_results) if self.max_results else 100
# Load the token information from the JSON string
try:
token_info = json.loads(json_string)
except JSONDecodeError as e:
msg = "Invalid JSON string"
raise ValueError(msg) from e
creds = Credentials.from_authorized_user_info(token_info)
# Initialize the custom loader with the provided credentials
loader = CustomGMailLoader(creds=creds, n=max_results, label_ids=label_ids)
try:
docs = loader.load()
except RefreshError as e:
msg = "Authentication error: Unable to refresh authentication token. Please try to reauthenticate."
raise ValueError(msg) from e
except Exception as e:
msg = f"Error loading documents: {e}"
raise ValueError(msg) from e
# Return the loaded documents
self.status = docs
return Data(data={"text": docs})
Google Drive loader
This component loads documents from Google Drive using provided credentials and a single document ID. For more information about creating a service account JSON, see Service Account JSON.
Parameters
Input | Type | Description |
---|---|---|
json_string |
SecretStrInput |
JSON string containing OAuth 2.0 access token information for service account access |
document_id |
MessageTextInput |
Single Google Drive document ID |
Output | Type | Description |
---|---|---|
docs |
Data |
Loaded document data |
Component code
google_drive.py
import json
from json.decoder import JSONDecodeError
from google.auth.exceptions import RefreshError
from google.oauth2.credentials import Credentials
from langchain_google_community import GoogleDriveLoader
from langflow.custom import Component
from langflow.helpers.data import docs_to_data
from langflow.inputs import MessageTextInput
from langflow.io import SecretStrInput
from langflow.schema import Data
from langflow.template import Output
class GoogleDriveComponent(Component):
display_name = "Google Drive Loader"
description = "Loads documents from Google Drive using provided credentials."
icon = "Google"
inputs = [
SecretStrInput(
name="json_string",
display_name="JSON String of the Service Account Token",
info="JSON string containing OAuth 2.0 access token information for service account access",
required=True,
),
MessageTextInput(
name="document_id", display_name="Document ID", info="Single Google Drive document ID", required=True
),
]
outputs = [
Output(display_name="Loaded Documents", name="docs", method="load_documents"),
]
def load_documents(self) -> Data:
class CustomGoogleDriveLoader(GoogleDriveLoader):
creds: Credentials | None = None
"""Credentials object to be passed directly."""
def _load_credentials(self):
"""Load credentials from the provided creds attribute or fallback to the original method."""
if self.creds:
return self.creds
msg = "No credentials provided."
raise ValueError(msg)
class Config:
arbitrary_types_allowed = True
json_string = self.json_string
document_ids = [self.document_id]
if len(document_ids) != 1:
msg = "Expected a single document ID"
raise ValueError(msg)
# TODO: Add validation to check if the document ID is valid
# Load the token information from the JSON string
try:
token_info = json.loads(json_string)
except JSONDecodeError as e:
msg = "Invalid JSON string"
raise ValueError(msg) from e
# Initialize the custom loader with the provided credentials and document IDs
loader = CustomGoogleDriveLoader(
creds=Credentials.from_authorized_user_info(token_info), document_ids=document_ids
)
# Load the documents
try:
docs = loader.load()
# catch google.auth.exceptions.RefreshError
except RefreshError as e:
msg = "Authentication error: Unable to refresh authentication token. Please try to reauthenticate."
raise ValueError(msg) from e
except Exception as e:
msg = f"Error loading documents: {e}"
raise ValueError(msg) from e
if len(docs) != 1:
msg = "Expected a single document to be loaded."
raise ValueError(msg)
data = docs_to_data(docs)
# Return the loaded documents
self.status = data
return Data(data={"text": data})
Google Drive search
This component searches Google Drive files using provided credentials and query parameters. For more information about creating a service account JSON, see Service Account JSON.
Parameters
Input | Type | Description |
---|---|---|
token_string |
SecretStrInput |
JSON string containing OAuth 2.0 access token information for service account access |
query_item |
DropdownInput |
The field to query |
valid_operator |
DropdownInput |
Operator to use in the query |
search_term |
MessageTextInput |
The value to search for in the specified query item |
query_string |
MessageTextInput |
The query string used for searching (can be edited manually) |
Output | Type | Description |
---|---|---|
doc_urls |
List[str] |
URLs of the found documents |
doc_ids |
List[str] |
IDs of the found documents |
doc_titles |
List[str] |
Titles of the found documents |
Data |
Data |
Document titles and URLs in a structured format |
Component code
google_drive_search.py
import json
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from langflow.custom import Component
from langflow.inputs import DropdownInput, MessageTextInput
from langflow.io import SecretStrInput
from langflow.schema import Data
from langflow.template import Output
class GoogleDriveSearchComponent(Component):
display_name = "Google Drive Search"
description = "Searches Google Drive files using provided credentials and query parameters."
icon = "Google"
inputs = [
SecretStrInput(
name="token_string",
display_name="Token String",
info="JSON string containing OAuth 2.0 access token information for service account access",
required=True,
),
DropdownInput(
name="query_item",
display_name="Query Item",
options=[
"name",
"fullText",
"mimeType",
"modifiedTime",
"viewedByMeTime",
"trashed",
"starred",
"parents",
"owners",
"writers",
"readers",
"sharedWithMe",
"createdTime",
"properties",
"appProperties",
"visibility",
"shortcutDetails.targetId",
],
info="The field to query.",
required=True,
),
DropdownInput(
name="valid_operator",
display_name="Valid Operator",
options=["contains", "=", "!=", "<=", "<", ">", ">=", "in", "has"],
info="Operator to use in the query.",
required=True,
),
MessageTextInput(
name="search_term",
display_name="Search Term",
info="The value to search for in the specified query item.",
required=True,
),
MessageTextInput(
name="query_string",
display_name="Query String",
info="The query string used for searching. You can edit this manually.",
value="", # This will be updated with the generated query string
),
]
outputs = [
Output(display_name="Document URLs", name="doc_urls", method="search_doc_urls"),
Output(display_name="Document IDs", name="doc_ids", method="search_doc_ids"),
Output(display_name="Document Titles", name="doc_titles", method="search_doc_titles"),
Output(display_name="Data", name="Data", method="search_data"),
]
def generate_query_string(self) -> str:
query_item = self.query_item
valid_operator = self.valid_operator
search_term = self.search_term
# Construct the query string
query = f"{query_item} {valid_operator} '{search_term}'"
# Update the editable query string input with the generated query
self.query_string = query
return query
def on_inputs_changed(self) -> None:
# Automatically regenerate the query string when inputs change
self.generate_query_string()
def generate_file_url(self, file_id: str, mime_type: str) -> str:
"""Generates the appropriate Google Drive URL for a file based on its MIME type."""
return {
"application/vnd.google-apps.document": f"https://docs.google.com/document/d/{file_id}/edit",
"application/vnd.google-apps.spreadsheet": f"https://docs.google.com/spreadsheets/d/{file_id}/edit",
"application/vnd.google-apps.presentation": f"https://docs.google.com/presentation/d/{file_id}/edit",
"application/vnd.google-apps.drawing": f"https://docs.google.com/drawings/d/{file_id}/edit",
"application/pdf": f"https://drive.google.com/file/d/{file_id}/view?usp=drivesdk",
}.get(mime_type, f"https://drive.google.com/file/d/{file_id}/view?usp=drivesdk")
def search_files(self) -> dict:
# Load the token information from the JSON string
token_info = json.loads(self.token_string)
creds = Credentials.from_authorized_user_info(token_info)
# Use the query string from the input (which might have been edited by the user)
query = self.query_string or self.generate_query_string()
# Initialize the Google Drive API service
service = build("drive", "v3", credentials=creds)
# Perform the search
results = service.files().list(q=query, pageSize=5, fields="nextPageToken, files(id, name, mimeType)").execute()
items = results.get("files", [])
doc_urls = []
doc_ids = []
doc_titles_urls = []
doc_titles = []
if items:
for item in items:
# Directly use the file ID, title, and MIME type to generate the URL
file_id = item["id"]
file_title = item["name"]
mime_type = item["mimeType"]
file_url = self.generate_file_url(file_id, mime_type)
# Store the URL, ID, and title+URL in their respective lists
doc_urls.append(file_url)
doc_ids.append(file_id)
doc_titles.append(file_title)
doc_titles_urls.append({"title": file_title, "url": file_url})
return {"doc_urls": doc_urls, "doc_ids": doc_ids, "doc_titles_urls": doc_titles_urls, "doc_titles": doc_titles}
def search_doc_ids(self) -> list[str]:
return self.search_files()["doc_ids"]
def search_doc_urls(self) -> list[str]:
return self.search_files()["doc_urls"]
def search_doc_titles(self) -> list[str]:
return self.search_files()["doc_titles"]
def search_data(self) -> Data:
return Data(data={"text": self.search_files()["doc_titles_urls"]})
URL
The URLComponent is a class that fetches content from one or more URLs, processes the content, and returns it as a list of Data objects. It ensures that the provided URLs are valid and uses WebBaseLoader to fetch the content.
Parameters
Name | Display Name | Info |
---|---|---|
urls |
URLs |
Enter one or more URLs |
Name | Display Name | Info |
---|---|---|
data |
Data |
List of Data objects containing fetched content and metadata |
Component code
url.py
import re
from langchain_community.document_loaders import AsyncHtmlLoader, WebBaseLoader
from langflow.custom import Component
from langflow.helpers.data import data_to_text
from langflow.io import DropdownInput, MessageTextInput, Output
from langflow.schema import Data
from langflow.schema.message import Message
class URLComponent(Component):
display_name = "URL"
description = "Fetch content from one or more URLs."
icon = "layout-template"
name = "URL"
inputs = [
MessageTextInput(
name="urls",
display_name="URLs",
info="Enter one or more URLs, by clicking the '+' button.",
is_list=True,
tool_mode=True,
),
DropdownInput(
name="format",
display_name="Output Format",
info="Output Format. Use 'Text' to extract the text from the HTML or 'Raw HTML' for the raw HTML content.",
options=["Text", "Raw HTML"],
value="Text",
),
]
outputs = [
Output(display_name="Data", name="data", method="fetch_content"),
Output(display_name="Text", name="text", method="fetch_content_text"),
]
def ensure_url(self, string: str) -> str:
"""Ensures the given string is a URL by adding 'http://' if it doesn't start with 'http://' or 'https://'.
Raises an error if the string is not a valid URL.
Parameters:
string (str): The string to be checked and possibly modified.
Returns:
str: The modified string that is ensured to be a URL.
Raises:
ValueError: If the string is not a valid URL.
"""
if not string.startswith(("http://", "https://")):
string = "http://" + string
# Basic URL validation regex
url_regex = re.compile(
r"^(https?:\/\/)?" # optional protocol
r"(www\.)?" # optional www
r"([a-zA-Z0-9.-]+)" # domain
r"(\.[a-zA-Z]{2,})?" # top-level domain
r"(:\d+)?" # optional port
r"(\/[^\s]*)?$", # optional path
re.IGNORECASE,
)
if not url_regex.match(string):
msg = f"Invalid URL: {string}"
raise ValueError(msg)
return string
def fetch_content(self) -> list[Data]:
urls = [self.ensure_url(url.strip()) for url in self.urls if url.strip()]
if self.format == "Raw HTML":
loader = AsyncHtmlLoader(web_path=urls, encoding="utf-8")
else:
loader = WebBaseLoader(web_paths=urls, encoding="utf-8")
docs = loader.load()
data = [Data(text=doc.page_content, **doc.metadata) for doc in docs]
self.status = data
return data
def fetch_content_text(self) -> Message:
data = self.fetch_content()
result_string = data_to_text("{text}", data)
self.status = result_string
return Message(text=result_string)
Webhook input
This component defines a webhook input for the flow. The flow can be triggered by an external HTTP POST request (webhook) sending a JSON payload.
If the input is not valid JSON, the component will wrap it in a "payload" field. The component’s status will reflect any errors or the processed data.
Parameters
Name | Type | Description |
---|---|---|
data |
String |
JSON payload for testing the webhook component |
Name | Type | Description |
---|---|---|
output_data |
Data |
Processed data from the webhook input |
Component code
webhook.py
import json
from langflow.custom import Component
from langflow.io import MultilineInput, Output
from langflow.schema import Data
class WebhookComponent(Component):
display_name = "Webhook"
description = "Defines a webhook input for the flow."
name = "Webhook"
icon = "webhook"
inputs = [
MultilineInput(
name="data",
display_name="Payload",
info="Receives a payload from external systems via HTTP POST.",
)
]
outputs = [
Output(display_name="Data", name="output_data", method="build_data"),
]
def build_data(self) -> Data:
message: str | Data = ""
if not self.data:
self.status = "No data provided."
return Data(data={})
try:
body = json.loads(self.data or "{}")
except json.JSONDecodeError:
body = {"payload": self.data}
message = f"Invalid JSON payload. Please check the format.\n\n{self.data}"
data = Data(data=body)
if not message:
message = data
self.status = message
return data