Loaders

This Langflow feature is currently in public preview. Development is ongoing, and the features and functionality are subject to change. Langflow, and the use of such, is subject to the DataStax Preview Terms.

As of Langflow 1.1, loader components are now found in the Components menu under Bundles.

Loaders fetch data into Langflow from various sources, such as databases, websites, and local files.

Use a loader component in a flow

This flow creates a question-and-answer chatbot for documents that are loaded into the flow. The Unstructured.io loader component loads files from your local machine, and then parses them into a list of structured Data objects. This loaded data informs the Open AI component’s responses to your questions.

starter flow unstructured qa

Confluence

The component integrates with Confluence, a wiki collaboration platform, to load and process documents. It uses the ConfluenceLoader from LangChain to fetch content from a specified Confluence space.

Parameters

Inputs
Name Display Name Info

url

Site URL

The base URL of the Confluence Space (e.g., https://<company>.atlassian.net/wiki)

username

Username

Atlassian User E-mail (e.g., email@example.com)

api_key

API Key

Atlassian API Key (Create at: https://id.atlassian.com/manage-profile/security/api-tokens)

space_key

Space Key

The key of the Confluence space to access

cloud

Use Cloud?

Whether to use Confluence Cloud (default: true)

content_format

Content Format

Specify content format (default: STORAGE)

max_pages

Max Pages

Maximum number of pages to retrieve (default: 1000)

Outputs
Name Display Name Info

data

Data

List of Data objects containing the loaded Confluence documents

Component code

confluence.py
from langchain_community.document_loaders import ConfluenceLoader
from langchain_community.document_loaders.confluence import ContentFormat

from langflow.custom import Component
from langflow.io import BoolInput, DropdownInput, IntInput, Output, SecretStrInput, StrInput
from langflow.schema import Data


class ConfluenceComponent(Component):
    display_name = "Confluence"
    description = "Confluence wiki collaboration platform"
    documentation = "https://python.langchain.com/v0.2/docs/integrations/document_loaders/confluence/"
    trace_type = "tool"
    icon = "Confluence"
    name = "Confluence"

    inputs = [
        StrInput(
            name="url",
            display_name="Site URL",
            required=True,
            info="The base URL of the Confluence Space. Example: https://<company>.atlassian.net/wiki.",
        ),
        StrInput(
            name="username",
            display_name="Username",
            required=True,
            info="Atlassian User E-mail. Example: email@example.com",
        ),
        SecretStrInput(
            name="api_key",
            display_name="API Key",
            required=True,
            info="Atlassian Key. Create at: https://id.atlassian.com/manage-profile/security/api-tokens",
        ),
        StrInput(name="space_key", display_name="Space Key", required=True),
        BoolInput(name="cloud", display_name="Use Cloud?", required=True, value=True, advanced=True),
        DropdownInput(
            name="content_format",
            display_name="Content Format",
            options=[
                ContentFormat.EDITOR.value,
                ContentFormat.EXPORT_VIEW.value,
                ContentFormat.ANONYMOUS_EXPORT_VIEW.value,
                ContentFormat.STORAGE.value,
                ContentFormat.VIEW.value,
            ],
            value=ContentFormat.STORAGE.value,
            required=True,
            advanced=True,
            info="Specify content format, defaults to ContentFormat.STORAGE",
        ),
        IntInput(
            name="max_pages",
            display_name="Max Pages",
            required=False,
            value=1000,
            advanced=True,
            info="Maximum number of pages to retrieve in total, defaults 1000",
        ),
    ]

    outputs = [
        Output(name="data", display_name="Data", method="load_documents"),
    ]

    def build_confluence(self) -> ConfluenceLoader:
        content_format = ContentFormat(self.content_format)
        return ConfluenceLoader(
            url=self.url,
            username=self.username,
            api_key=self.api_key,
            cloud=self.cloud,
            space_key=self.space_key,
            content_format=content_format,
            max_pages=self.max_pages,
        )

    def load_documents(self) -> list[Data]:
        confluence = self.build_confluence()
        documents = confluence.load()
        data = [Data.from_document(doc) for doc in documents]  # Using the from_document method of Data
        self.status = data
        return data

GitLoader

This component utilizes the GitLoader from LangChain to fetch and load documents from a specified Git repository.

Parameters

Inputs
Name Display Name Info

repo_path

Repository Path

The local path to the Git repository

clone_url

Clone URL

The URL to clone the Git repository from (optional)

branch

Branch

The branch to load files from (default: 'main')

file_filter

File Filter

Patterns to filter files (e.g., '.py' to include only .py files, '!.py' to exclude .py files)

content_filter

Content Filter

A regex pattern to filter files based on their content

Outputs
Name Display Name Info

data

Data

List of Data objects containing the loaded Git repository documents

Component code

git.py
import re
import tempfile
from contextlib import asynccontextmanager
from fnmatch import fnmatch
from pathlib import Path

import anyio
from langchain_community.document_loaders.git import GitLoader

from langflow.custom import Component
from langflow.io import DropdownInput, MessageTextInput, Output
from langflow.schema import Data


class GitLoaderComponent(Component):
    display_name = "Git"
    description = (
        "Load and filter documents from a local or remote Git repository. "
        "Use a local repo path or clone from a remote URL."
    )
    trace_type = "tool"
    icon = "GitLoader"

    inputs = [
        DropdownInput(
            name="repo_source",
            display_name="Repository Source",
            options=["Local", "Remote"],
            required=True,
            info="Select whether to use a local repo path or clone from a remote URL.",
            real_time_refresh=True,
        ),
        MessageTextInput(
            name="repo_path",
            display_name="Local Repository Path",
            required=False,
            info="The local path to the existing Git repository (used if 'Local' is selected).",
            dynamic=True,
            show=False,
        ),
        MessageTextInput(
            name="clone_url",
            display_name="Clone URL",
            required=False,
            info="The URL of the Git repository to clone (used if 'Clone' is selected).",
            dynamic=True,
            show=False,
        ),
        MessageTextInput(
            name="branch",
            display_name="Branch",
            required=False,
            value="main",
            info="The branch to load files from. Defaults to 'main'.",
        ),
        MessageTextInput(
            name="file_filter",
            display_name="File Filter",
            required=False,
            advanced=True,
            info=(
                "Patterns to filter files. For example:\n"
                "Include only .py files: '*.py'\n"
                "Exclude .py files: '!*.py'\n"
                "Multiple patterns can be separated by commas."
            ),
        ),
        MessageTextInput(
            name="content_filter",
            display_name="Content Filter",
            required=False,
            advanced=True,
            info="A regex pattern to filter files based on their content.",
        ),
    ]

    outputs = [
        Output(name="data", display_name="Data", method="load_documents"),
    ]

    @staticmethod
    def is_binary(file_path: str | Path) -> bool:
        """Check if a file is binary by looking for null bytes."""
        try:
            with Path(file_path).open("rb") as file:
                content = file.read(1024)
                return b"\x00" in content
        except Exception:  # noqa: BLE001
            return True

    @staticmethod
    def check_file_patterns(file_path: str | Path, patterns: str) -> bool:
        """Check if a file matches the given patterns.

        Args:
            file_path: Path to the file to check
            patterns: Comma-separated list of glob patterns

        Returns:
            bool: True if file should be included, False if excluded
        """
        # Handle empty or whitespace-only patterns
        if not patterns or patterns.isspace():
            return True

        path_str = str(file_path)
        file_name = Path(path_str).name
        pattern_list: list[str] = [pattern.strip() for pattern in patterns.split(",") if pattern.strip()]

        # If no valid patterns after stripping, treat as include all
        if not pattern_list:
            return True

        # Process exclusion patterns first
        for pattern in pattern_list:
            if pattern.startswith("!"):
                # For exclusions, match against both full path and filename
                exclude_pattern = pattern[1:]
                if fnmatch(path_str, exclude_pattern) or fnmatch(file_name, exclude_pattern):
                    return False

        # Then check inclusion patterns
        include_patterns = [p for p in pattern_list if not p.startswith("!")]
        # If no include patterns, treat as include all
        if not include_patterns:
            return True

        # For inclusions, match against both full path and filename
        return any(fnmatch(path_str, pattern) or fnmatch(file_name, pattern) for pattern in include_patterns)

    @staticmethod
    def check_content_pattern(file_path: str | Path, pattern: str) -> bool:
        """Check if file content matches the given regex pattern.

        Args:
            file_path: Path to the file to check
            pattern: Regex pattern to match against content

        Returns:
            bool: True if content matches, False otherwise
        """
        try:
            # Check if file is binary
            with Path(file_path).open("rb") as file:
                content = file.read(1024)
                if b"\x00" in content:
                    return False

            # Try to compile the regex pattern first
            try:
                # Use the MULTILINE flag to better handle text content
                content_regex = re.compile(pattern, re.MULTILINE)
                # Test the pattern with a simple string to catch syntax errors
                test_str = "test\nstring"
                if not content_regex.search(test_str):
                    # Pattern is valid but doesn't match test string
                    pass
            except (re.error, TypeError, ValueError):
                return False

            # If not binary and regex is valid, check content
            with Path(file_path).open(encoding="utf-8") as file:
                file_content = file.read()
            return bool(content_regex.search(file_content))
        except (OSError, UnicodeDecodeError):
            return False

    def build_combined_filter(self, file_filter_patterns: str | None = None, content_filter_pattern: str | None = None):
        """Build a combined filter function from file and content patterns.

        Args:
            file_filter_patterns: Comma-separated glob patterns
            content_filter_pattern: Regex pattern for content

        Returns:
            callable: Filter function that takes a file path and returns bool
        """

        def combined_filter(file_path: str) -> bool:
            try:
                path = Path(file_path)

                # Check if file exists and is readable
                if not path.exists():
                    return False

                # Check if file is binary
                if self.is_binary(path):
                    return False

                # Apply file pattern filters
                if file_filter_patterns and not self.check_file_patterns(path, file_filter_patterns):
                    return False

                # Apply content filter
                return not (content_filter_pattern and not self.check_content_pattern(path, content_filter_pattern))
            except Exception:  # noqa: BLE001
                return False

        return combined_filter

    @asynccontextmanager
    async def temp_clone_dir(self):
        """Context manager for handling temporary clone directory."""
        temp_dir = None
        try:
            temp_dir = tempfile.mkdtemp(prefix="langflow_clone_")
            yield temp_dir
        finally:
            if temp_dir:
                await anyio.Path(temp_dir).rmdir()

    def update_build_config(self, build_config: dict, field_value: str, field_name: str | None = None) -> dict:
        # Hide fields by default
        build_config["repo_path"]["show"] = False
        build_config["clone_url"]["show"] = False

        if field_name == "repo_source":
            if field_value == "Local":
                build_config["repo_path"]["show"] = True
                build_config["repo_path"]["required"] = True
                build_config["clone_url"]["required"] = False
            elif field_value == "Remote":
                build_config["clone_url"]["show"] = True
                build_config["clone_url"]["required"] = True
                build_config["repo_path"]["required"] = False

        return build_config

    async def build_gitloader(self) -> GitLoader:
        file_filter_patterns = getattr(self, "file_filter", None)
        content_filter_pattern = getattr(self, "content_filter", None)

        combined_filter = self.build_combined_filter(file_filter_patterns, content_filter_pattern)

        repo_source = getattr(self, "repo_source", None)
        if repo_source == "Local":
            repo_path = self.repo_path
            clone_url = None
        else:
            # Clone source
            clone_url = self.clone_url
            async with self.temp_clone_dir() as temp_dir:
                repo_path = temp_dir

        # Only pass branch if it's explicitly set
        branch = getattr(self, "branch", None)
        if not branch:
            branch = None

        return GitLoader(
            repo_path=repo_path,
            clone_url=clone_url if repo_source == "Remote" else None,
            branch=branch,
            file_filter=combined_filter,
        )

    async def load_documents(self) -> list[Data]:
        gitloader = await self.build_gitloader()
        data = [Data.from_document(doc) async for doc in gitloader.alazy_load()]
        self.status = data
        return data

Unstructured

This component uses the Unstructured Serverless API to load and parse PDF, DOCX, and TXT files into structured data.

This component does not work with the Unstructured open-source library.

Parameters

Inputs
Name Display Name Info

file

File

The path to the file to be parsed (supported types: pdf, docx, txt)

api_key

API Key

Unstructured API Key

Outputs
Name Display Name Info

data

Data

List of Data objects containing the parsed content from the input file

Component code

unstructured.py
from langchain_unstructured import UnstructuredLoader

from langflow.base.data import BaseFileComponent
from langflow.inputs import DropdownInput, MessageTextInput, NestedDictInput, SecretStrInput
from langflow.schema import Data


class UnstructuredComponent(BaseFileComponent):
    display_name = "Unstructured API"
    description = (
        "Uses Unstructured.io API to extract clean text from raw source documents. Supports a wide range of file types."
    )
    documentation = (
        "https://python.langchain.com/api_reference/unstructured/document_loaders/"
        "langchain_unstructured.document_loaders.UnstructuredLoader.html"
    )
    trace_type = "tool"
    icon = "Unstructured"
    name = "Unstructured"

    # https://docs.unstructured.io/api-reference/api-services/overview#supported-file-types
    VALID_EXTENSIONS = [
        "bmp",
        "csv",
        "doc",
        "docx",
        "eml",
        "epub",
        "heic",
        "html",
        "jpeg",
        "png",
        "md",
        "msg",
        "odt",
        "org",
        "p7s",
        "pdf",
        "png",
        "ppt",
        "pptx",
        "rst",
        "rtf",
        "tiff",
        "txt",
        "tsv",
        "xls",
        "xlsx",
        "xml",
    ]

    inputs = [
        *BaseFileComponent._base_inputs,
        SecretStrInput(
            name="api_key",
            display_name="Unstructured.io Serverless API Key",
            required=True,
            info="Unstructured API Key. Create at: https://app.unstructured.io/",
        ),
        MessageTextInput(
            name="api_url",
            display_name="Unstructured.io API URL",
            required=False,
            info="Unstructured API URL.",
        ),
        DropdownInput(
            name="chunking_strategy",
            display_name="Chunking Strategy",
            info="Chunking strategy to use, see https://docs.unstructured.io/api-reference/api-services/chunking",
            options=["", "basic", "by_title", "by_page", "by_similarity"],
            real_time_refresh=False,
            value="",
        ),
        NestedDictInput(
            name="unstructured_args",
            display_name="Additional Arguments",
            required=False,
            info=(
                "Optional dictionary of additional arguments to the Loader. "
                "See https://docs.unstructured.io/api-reference/api-services/api-parameters for more information."
            ),
        ),
    ]

    outputs = [
        *BaseFileComponent._base_outputs,
    ]

    def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:
        file_paths = [str(file.path) for file in file_list if file.path]

        if not file_paths:
            self.log("No files to process.")
            return file_list

        # https://docs.unstructured.io/api-reference/api-services/api-parameters
        args = self.unstructured_args or {}

        if self.chunking_strategy:
            args["chunking_strategy"] = self.chunking_strategy

        args["api_key"] = self.api_key
        args["partition_via_api"] = True
        if self.api_url:
            args["url"] = self.api_url

        loader = UnstructuredLoader(
            file_paths,
            **args,
        )

        documents = loader.load()

        processed_data: list[Data | None] = [Data.from_document(doc) if doc else None for doc in documents]

        # Rename the `source` field to `self.SERVER_FILE_PATH_FIELDNAME`, to avoid conflicts with the `source` field
        for data in processed_data:
            if data and "source" in data.data:
                data.data[self.SERVER_FILE_PATH_FIELDNAME] = data.data.pop("source")

        return self.rollup_data(file_list, processed_data)

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com