Loaders
This Langflow feature is currently in public preview. Development is ongoing, and the features and functionality are subject to change. Langflow, and the use of such, is subject to the DataStax Preview Terms. |
As of Langflow 1.1, loader components are now found in the Components menu under Bundles. |
Loaders fetch data into Langflow from various sources, such as databases, websites, and local files.
Use a loader component in a flow
This flow creates a question-and-answer chatbot for documents that are loaded into the flow.
The Unstructured.io loader component loads files from your local machine, and then parses them into a list of structured Data
objects. This loaded data informs the Open AI component’s responses to your questions.
Confluence
The component integrates with Confluence, a wiki collaboration platform, to load and process documents. It uses the ConfluenceLoader from LangChain to fetch content from a specified Confluence space.
Parameters
Name | Display Name | Info |
---|---|---|
url |
Site URL |
The base URL of the Confluence Space (e.g., https://<company>.atlassian.net/wiki) |
username |
Username |
Atlassian User E-mail (e.g., email@example.com) |
api_key |
API Key |
Atlassian API Key (Create at: https://id.atlassian.com/manage-profile/security/api-tokens) |
space_key |
Space Key |
The key of the Confluence space to access |
cloud |
Use Cloud? |
Whether to use Confluence Cloud (default: true) |
content_format |
Content Format |
Specify content format (default: STORAGE) |
max_pages |
Max Pages |
Maximum number of pages to retrieve (default: 1000) |
Name | Display Name | Info |
---|---|---|
data |
Data |
List of Data objects containing the loaded Confluence documents |
Component code
confluence.py
from langchain_community.document_loaders import ConfluenceLoader
from langchain_community.document_loaders.confluence import ContentFormat
from langflow.custom import Component
from langflow.io import BoolInput, DropdownInput, IntInput, Output, SecretStrInput, StrInput
from langflow.schema import Data
class ConfluenceComponent(Component):
display_name = "Confluence"
description = "Confluence wiki collaboration platform"
documentation = "https://python.langchain.com/v0.2/docs/integrations/document_loaders/confluence/"
trace_type = "tool"
icon = "Confluence"
name = "Confluence"
inputs = [
StrInput(
name="url",
display_name="Site URL",
required=True,
info="The base URL of the Confluence Space. Example: https://<company>.atlassian.net/wiki.",
),
StrInput(
name="username",
display_name="Username",
required=True,
info="Atlassian User E-mail. Example: email@example.com",
),
SecretStrInput(
name="api_key",
display_name="API Key",
required=True,
info="Atlassian Key. Create at: https://id.atlassian.com/manage-profile/security/api-tokens",
),
StrInput(name="space_key", display_name="Space Key", required=True),
BoolInput(name="cloud", display_name="Use Cloud?", required=True, value=True, advanced=True),
DropdownInput(
name="content_format",
display_name="Content Format",
options=[
ContentFormat.EDITOR.value,
ContentFormat.EXPORT_VIEW.value,
ContentFormat.ANONYMOUS_EXPORT_VIEW.value,
ContentFormat.STORAGE.value,
ContentFormat.VIEW.value,
],
value=ContentFormat.STORAGE.value,
required=True,
advanced=True,
info="Specify content format, defaults to ContentFormat.STORAGE",
),
IntInput(
name="max_pages",
display_name="Max Pages",
required=False,
value=1000,
advanced=True,
info="Maximum number of pages to retrieve in total, defaults 1000",
),
]
outputs = [
Output(name="data", display_name="Data", method="load_documents"),
]
def build_confluence(self) -> ConfluenceLoader:
content_format = ContentFormat(self.content_format)
return ConfluenceLoader(
url=self.url,
username=self.username,
api_key=self.api_key,
cloud=self.cloud,
space_key=self.space_key,
content_format=content_format,
max_pages=self.max_pages,
)
def load_documents(self) -> list[Data]:
confluence = self.build_confluence()
documents = confluence.load()
data = [Data.from_document(doc) for doc in documents] # Using the from_document method of Data
self.status = data
return data
GitLoader
This component utilizes the GitLoader from LangChain to fetch and load documents from a specified Git repository.
Parameters
Name | Display Name | Info |
---|---|---|
repo_path |
Repository Path |
The local path to the Git repository |
clone_url |
Clone URL |
The URL to clone the Git repository from (optional) |
branch |
Branch |
The branch to load files from (default: 'main') |
file_filter |
File Filter |
Patterns to filter files (e.g., '.py' to include only .py files, '!.py' to exclude .py files) |
content_filter |
Content Filter |
A regex pattern to filter files based on their content |
Name | Display Name | Info |
---|---|---|
data |
Data |
List of Data objects containing the loaded Git repository documents |
Component code
git.py
import re
import tempfile
from contextlib import asynccontextmanager
from fnmatch import fnmatch
from pathlib import Path
import anyio
from langchain_community.document_loaders.git import GitLoader
from langflow.custom import Component
from langflow.io import DropdownInput, MessageTextInput, Output
from langflow.schema import Data
class GitLoaderComponent(Component):
display_name = "Git"
description = (
"Load and filter documents from a local or remote Git repository. "
"Use a local repo path or clone from a remote URL."
)
trace_type = "tool"
icon = "GitLoader"
inputs = [
DropdownInput(
name="repo_source",
display_name="Repository Source",
options=["Local", "Remote"],
required=True,
info="Select whether to use a local repo path or clone from a remote URL.",
real_time_refresh=True,
),
MessageTextInput(
name="repo_path",
display_name="Local Repository Path",
required=False,
info="The local path to the existing Git repository (used if 'Local' is selected).",
dynamic=True,
show=False,
),
MessageTextInput(
name="clone_url",
display_name="Clone URL",
required=False,
info="The URL of the Git repository to clone (used if 'Clone' is selected).",
dynamic=True,
show=False,
),
MessageTextInput(
name="branch",
display_name="Branch",
required=False,
value="main",
info="The branch to load files from. Defaults to 'main'.",
),
MessageTextInput(
name="file_filter",
display_name="File Filter",
required=False,
advanced=True,
info=(
"Patterns to filter files. For example:\n"
"Include only .py files: '*.py'\n"
"Exclude .py files: '!*.py'\n"
"Multiple patterns can be separated by commas."
),
),
MessageTextInput(
name="content_filter",
display_name="Content Filter",
required=False,
advanced=True,
info="A regex pattern to filter files based on their content.",
),
]
outputs = [
Output(name="data", display_name="Data", method="load_documents"),
]
@staticmethod
def is_binary(file_path: str | Path) -> bool:
"""Check if a file is binary by looking for null bytes."""
try:
with Path(file_path).open("rb") as file:
content = file.read(1024)
return b"\x00" in content
except Exception: # noqa: BLE001
return True
@staticmethod
def check_file_patterns(file_path: str | Path, patterns: str) -> bool:
"""Check if a file matches the given patterns.
Args:
file_path: Path to the file to check
patterns: Comma-separated list of glob patterns
Returns:
bool: True if file should be included, False if excluded
"""
# Handle empty or whitespace-only patterns
if not patterns or patterns.isspace():
return True
path_str = str(file_path)
file_name = Path(path_str).name
pattern_list: list[str] = [pattern.strip() for pattern in patterns.split(",") if pattern.strip()]
# If no valid patterns after stripping, treat as include all
if not pattern_list:
return True
# Process exclusion patterns first
for pattern in pattern_list:
if pattern.startswith("!"):
# For exclusions, match against both full path and filename
exclude_pattern = pattern[1:]
if fnmatch(path_str, exclude_pattern) or fnmatch(file_name, exclude_pattern):
return False
# Then check inclusion patterns
include_patterns = [p for p in pattern_list if not p.startswith("!")]
# If no include patterns, treat as include all
if not include_patterns:
return True
# For inclusions, match against both full path and filename
return any(fnmatch(path_str, pattern) or fnmatch(file_name, pattern) for pattern in include_patterns)
@staticmethod
def check_content_pattern(file_path: str | Path, pattern: str) -> bool:
"""Check if file content matches the given regex pattern.
Args:
file_path: Path to the file to check
pattern: Regex pattern to match against content
Returns:
bool: True if content matches, False otherwise
"""
try:
# Check if file is binary
with Path(file_path).open("rb") as file:
content = file.read(1024)
if b"\x00" in content:
return False
# Try to compile the regex pattern first
try:
# Use the MULTILINE flag to better handle text content
content_regex = re.compile(pattern, re.MULTILINE)
# Test the pattern with a simple string to catch syntax errors
test_str = "test\nstring"
if not content_regex.search(test_str):
# Pattern is valid but doesn't match test string
pass
except (re.error, TypeError, ValueError):
return False
# If not binary and regex is valid, check content
with Path(file_path).open(encoding="utf-8") as file:
file_content = file.read()
return bool(content_regex.search(file_content))
except (OSError, UnicodeDecodeError):
return False
def build_combined_filter(self, file_filter_patterns: str | None = None, content_filter_pattern: str | None = None):
"""Build a combined filter function from file and content patterns.
Args:
file_filter_patterns: Comma-separated glob patterns
content_filter_pattern: Regex pattern for content
Returns:
callable: Filter function that takes a file path and returns bool
"""
def combined_filter(file_path: str) -> bool:
try:
path = Path(file_path)
# Check if file exists and is readable
if not path.exists():
return False
# Check if file is binary
if self.is_binary(path):
return False
# Apply file pattern filters
if file_filter_patterns and not self.check_file_patterns(path, file_filter_patterns):
return False
# Apply content filter
return not (content_filter_pattern and not self.check_content_pattern(path, content_filter_pattern))
except Exception: # noqa: BLE001
return False
return combined_filter
@asynccontextmanager
async def temp_clone_dir(self):
"""Context manager for handling temporary clone directory."""
temp_dir = None
try:
temp_dir = tempfile.mkdtemp(prefix="langflow_clone_")
yield temp_dir
finally:
if temp_dir:
await anyio.Path(temp_dir).rmdir()
def update_build_config(self, build_config: dict, field_value: str, field_name: str | None = None) -> dict:
# Hide fields by default
build_config["repo_path"]["show"] = False
build_config["clone_url"]["show"] = False
if field_name == "repo_source":
if field_value == "Local":
build_config["repo_path"]["show"] = True
build_config["repo_path"]["required"] = True
build_config["clone_url"]["required"] = False
elif field_value == "Remote":
build_config["clone_url"]["show"] = True
build_config["clone_url"]["required"] = True
build_config["repo_path"]["required"] = False
return build_config
async def build_gitloader(self) -> GitLoader:
file_filter_patterns = getattr(self, "file_filter", None)
content_filter_pattern = getattr(self, "content_filter", None)
combined_filter = self.build_combined_filter(file_filter_patterns, content_filter_pattern)
repo_source = getattr(self, "repo_source", None)
if repo_source == "Local":
repo_path = self.repo_path
clone_url = None
else:
# Clone source
clone_url = self.clone_url
async with self.temp_clone_dir() as temp_dir:
repo_path = temp_dir
# Only pass branch if it's explicitly set
branch = getattr(self, "branch", None)
if not branch:
branch = None
return GitLoader(
repo_path=repo_path,
clone_url=clone_url if repo_source == "Remote" else None,
branch=branch,
file_filter=combined_filter,
)
async def load_documents(self) -> list[Data]:
gitloader = await self.build_gitloader()
data = [Data.from_document(doc) async for doc in gitloader.alazy_load()]
self.status = data
return data
Unstructured
This component uses the Unstructured Serverless API to load and parse PDF, DOCX, and TXT files into structured data.
This component does not work with the Unstructured open-source library.
Parameters
Name | Display Name | Info |
---|---|---|
file |
File |
The path to the file to be parsed (supported types: pdf, docx, txt) |
api_key |
API Key |
Unstructured API Key |
Name | Display Name | Info |
---|---|---|
data |
Data |
List of Data objects containing the parsed content from the input file |
Component code
unstructured.py
from langchain_unstructured import UnstructuredLoader
from langflow.base.data import BaseFileComponent
from langflow.inputs import DropdownInput, MessageTextInput, NestedDictInput, SecretStrInput
from langflow.schema import Data
class UnstructuredComponent(BaseFileComponent):
display_name = "Unstructured API"
description = (
"Uses Unstructured.io API to extract clean text from raw source documents. Supports a wide range of file types."
)
documentation = (
"https://python.langchain.com/api_reference/unstructured/document_loaders/"
"langchain_unstructured.document_loaders.UnstructuredLoader.html"
)
trace_type = "tool"
icon = "Unstructured"
name = "Unstructured"
# https://docs.unstructured.io/api-reference/api-services/overview#supported-file-types
VALID_EXTENSIONS = [
"bmp",
"csv",
"doc",
"docx",
"eml",
"epub",
"heic",
"html",
"jpeg",
"png",
"md",
"msg",
"odt",
"org",
"p7s",
"pdf",
"png",
"ppt",
"pptx",
"rst",
"rtf",
"tiff",
"txt",
"tsv",
"xls",
"xlsx",
"xml",
]
inputs = [
*BaseFileComponent._base_inputs,
SecretStrInput(
name="api_key",
display_name="Unstructured.io Serverless API Key",
required=True,
info="Unstructured API Key. Create at: https://app.unstructured.io/",
),
MessageTextInput(
name="api_url",
display_name="Unstructured.io API URL",
required=False,
info="Unstructured API URL.",
),
DropdownInput(
name="chunking_strategy",
display_name="Chunking Strategy",
info="Chunking strategy to use, see https://docs.unstructured.io/api-reference/api-services/chunking",
options=["", "basic", "by_title", "by_page", "by_similarity"],
real_time_refresh=False,
value="",
),
NestedDictInput(
name="unstructured_args",
display_name="Additional Arguments",
required=False,
info=(
"Optional dictionary of additional arguments to the Loader. "
"See https://docs.unstructured.io/api-reference/api-services/api-parameters for more information."
),
),
]
outputs = [
*BaseFileComponent._base_outputs,
]
def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:
file_paths = [str(file.path) for file in file_list if file.path]
if not file_paths:
self.log("No files to process.")
return file_list
# https://docs.unstructured.io/api-reference/api-services/api-parameters
args = self.unstructured_args or {}
if self.chunking_strategy:
args["chunking_strategy"] = self.chunking_strategy
args["api_key"] = self.api_key
args["partition_via_api"] = True
if self.api_url:
args["url"] = self.api_url
loader = UnstructuredLoader(
file_paths,
**args,
)
documents = loader.load()
processed_data: list[Data | None] = [Data.from_document(doc) if doc else None for doc in documents]
# Rename the `source` field to `self.SERVER_FILE_PATH_FIELDNAME`, to avoid conflicts with the `source` field
for data in processed_data:
if data and "source" in data.data:
data.data[self.SERVER_FILE_PATH_FIELDNAME] = data.data.pop("source")
return self.rollup_data(file_list, processed_data)