Loaders

This Langflow feature is currently in public preview. Development is ongoing, and the features and functionality are subject to change. Langflow, and the use of such, is subject to the DataStax Preview Terms.

Loaders are components used to load documents from various sources, such as databases, websites, and local files. They can be used to fetch data from external sources and convert it into a format that can be processed by other components.

Confluence

The component integrates with Confluence, a wiki collaboration platform, to load and process documents. It uses the ConfluenceLoader from LangChain to fetch content from a specified Confluence space.

Parameters

Inputs
Name Display Name Info

url

Site URL

The base URL of the Confluence Space (e.g., https://<company>.atlassian.net/wiki)

username

Username

Atlassian User E-mail (e.g., email@example.com)

api_key

API Key

Atlassian API Key (Create at: https://id.atlassian.com/manage-profile/security/api-tokens)

space_key

Space Key

The key of the Confluence space to access

cloud

Use Cloud?

Whether to use Confluence Cloud (default: true)

content_format

Content Format

Specify content format (default: STORAGE)

max_pages

Max Pages

Maximum number of pages to retrieve (default: 1000)

Outputs
Name Display Name Info

data

Data

List of Data objects containing the loaded Confluence documents

Component code

confluence.py
from langchain_community.document_loaders import ConfluenceLoader
from langchain_community.document_loaders.confluence import ContentFormat

from langflow.custom import Component
from langflow.io import BoolInput, DropdownInput, IntInput, Output, SecretStrInput, StrInput
from langflow.schema import Data


class ConfluenceComponent(Component):
    display_name = "Confluence"
    description = "Confluence wiki collaboration platform"
    documentation = "https://python.langchain.com/v0.2/docs/integrations/document_loaders/confluence/"
    trace_type = "tool"
    icon = "Confluence"
    name = "Confluence"

    inputs = [
        StrInput(
            name="url",
            display_name="Site URL",
            required=True,
            info="The base URL of the Confluence Space. Example: https://<company>.atlassian.net/wiki.",
        ),
        StrInput(
            name="username",
            display_name="Username",
            required=True,
            info="Atlassian User E-mail. Example: email@example.com",
        ),
        SecretStrInput(
            name="api_key",
            display_name="API Key",
            required=True,
            info="Atlassian Key. Create at: https://id.atlassian.com/manage-profile/security/api-tokens",
        ),
        StrInput(name="space_key", display_name="Space Key", required=True),
        BoolInput(name="cloud", display_name="Use Cloud?", required=True, value=True, advanced=True),
        DropdownInput(
            name="content_format",
            display_name="Content Format",
            options=[
                ContentFormat.EDITOR.value,
                ContentFormat.EXPORT_VIEW.value,
                ContentFormat.ANONYMOUS_EXPORT_VIEW.value,
                ContentFormat.STORAGE.value,
                ContentFormat.VIEW.value,
            ],
            value=ContentFormat.STORAGE.value,
            required=True,
            advanced=True,
            info="Specify content format, defaults to ContentFormat.STORAGE",
        ),
        IntInput(
            name="max_pages",
            display_name="Max Pages",
            required=False,
            value=1000,
            advanced=True,
            info="Maximum number of pages to retrieve in total, defaults 1000",
        ),
    ]

    outputs = [
        Output(name="data", display_name="Data", method="load_documents"),
    ]

    def build_confluence(self) -> ConfluenceLoader:
        content_format = ContentFormat(self.content_format)
        return ConfluenceLoader(
            url=self.url,
            username=self.username,
            api_key=self.api_key,
            cloud=self.cloud,
            space_key=self.space_key,
            content_format=content_format,
            max_pages=self.max_pages,
        )

    def load_documents(self) -> list[Data]:
        confluence = self.build_confluence()
        documents = confluence.load()
        data = [Data.from_document(doc) for doc in documents]  # Using the from_document method of Data
        self.status = data
        return data

GitLoader

This component utilizes the GitLoader from LangChain to fetch and load documents from a specified Git repository.

Parameters

Inputs
Name Display Name Info

repo_path

Repository Path

The local path to the Git repository

clone_url

Clone URL

The URL to clone the Git repository from (optional)

branch

Branch

The branch to load files from (default: 'main')

file_filter

File Filter

Patterns to filter files (e.g., '.py' to include only .py files, '!.py' to exclude .py files)

content_filter

Content Filter

A regex pattern to filter files based on their content

Outputs
Name Display Name Info

data

Data

List of Data objects containing the loaded Git repository documents

Component code

git.py
import re
from pathlib import Path

from langchain_community.document_loaders.git import GitLoader

from langflow.custom import Component
from langflow.io import MessageTextInput, Output
from langflow.schema import Data


class GitLoaderComponent(Component):
    display_name = "GitLoader"
    description = "Load files from a Git repository"
    documentation = "https://python.langchain.com/v0.2/docs/integrations/document_loaders/git/"
    trace_type = "tool"
    icon = "GitLoader"
    name = "GitLoader"

    inputs = [
        MessageTextInput(
            name="repo_path",
            display_name="Repository Path",
            required=False,
            info="The local path to the Git repository.",
        ),
        MessageTextInput(
            name="clone_url",
            display_name="Clone URL",
            required=False,
            info="The URL to clone the Git repository from.",
        ),
        MessageTextInput(
            name="branch",
            display_name="Branch",
            required=False,
            value="main",
            info="The branch to load files from. Defaults to 'main'.",
        ),
        MessageTextInput(
            name="file_filter",
            display_name="File Filter",
            required=False,
            advanced=True,
            info="A list of patterns to filter files. Example to include only .py files: '*.py'. "
            "Example to exclude .py files: '!*.py'. Multiple patterns can be separated by commas.",
        ),
        MessageTextInput(
            name="content_filter",
            display_name="Content Filter",
            required=False,
            advanced=True,
            info="A regex pattern to filter files based on their content.",
        ),
    ]

    outputs = [
        Output(name="data", display_name="Data", method="load_documents"),
    ]

    @staticmethod
    def is_binary(file_path: str) -> bool:
        """Check if a file is binary by looking for null bytes.

        This is necessary because when searches are performed using
        the content_filter, binary files need to be ignored.
        """
        with Path(file_path).open("rb") as file:
            return b"\x00" in file.read(1024)

    def build_gitloader(self) -> GitLoader:
        file_filter_patterns = getattr(self, "file_filter", None)
        content_filter_pattern = getattr(self, "content_filter", None)

        file_filters = []
        if file_filter_patterns:
            patterns = [pattern.strip() for pattern in file_filter_patterns.split(",")]

            def file_filter(file_path: Path) -> bool:
                if len(patterns) == 1 and patterns[0].startswith("!"):
                    return not file_path.match(patterns[0][1:])
                included = any(file_path.match(pattern) for pattern in patterns if not pattern.startswith("!"))
                excluded = any(file_path.match(pattern[1:]) for pattern in patterns if pattern.startswith("!"))
                return included and not excluded

            file_filters.append(file_filter)

        if content_filter_pattern:
            content_regex = re.compile(content_filter_pattern)

            def content_filter(file_path: Path) -> bool:
                content = file_path.read_text(encoding="utf-8", errors="ignore")
                return bool(content_regex.search(content))

            file_filters.append(content_filter)

        def combined_filter(file_path: str) -> bool:
            path = Path(file_path)
            if self.is_binary(file_path):
                return False
            return all(f(path) for f in file_filters)

        return GitLoader(
            repo_path=self.repo_path,
            clone_url=self.clone_url,
            branch=self.branch,
            file_filter=combined_filter,
        )

    def load_documents(self) -> list[Data]:
        gitloader = self.build_gitloader()
        documents = list(gitloader.lazy_load())
        data = [Data.from_document(doc) for doc in documents]
        self.status = data
        return data

Unstructured

This component uses the Unstructured Serverless API to load and parse PDF, DOCX, and TXT files into structured data.

This component does not work with the Unstructured open-source library.

Parameters

Inputs
Name Display Name Info

file

File

The path to the file to be parsed (supported types: pdf, docx, txt)

api_key

API Key

Unstructured API Key

Outputs
Name Display Name Info

data

Data

List of Data objects containing the parsed content from the input file

Component code

unstructured.py
from langchain_unstructured import UnstructuredLoader

from langflow.base.data import BaseFileComponent
from langflow.inputs import DropdownInput, MessageTextInput, NestedDictInput, SecretStrInput
from langflow.schema import Data


class UnstructuredComponent(BaseFileComponent):
    display_name = "Unstructured API"
    description = (
        "Uses Unstructured.io API to extract clean text from raw source documents. Supports a wide range of file types."
    )
    documentation = (
        "https://python.langchain.com/api_reference/unstructured/document_loaders/"
        "langchain_unstructured.document_loaders.UnstructuredLoader.html"
    )
    trace_type = "tool"
    icon = "Unstructured"
    name = "Unstructured"

    # https://docs.unstructured.io/api-reference/api-services/overview#supported-file-types
    VALID_EXTENSIONS = [
        "bmp",
        "csv",
        "doc",
        "docx",
        "eml",
        "epub",
        "heic",
        "html",
        "jpeg",
        "png",
        "md",
        "msg",
        "odt",
        "org",
        "p7s",
        "pdf",
        "png",
        "ppt",
        "pptx",
        "rst",
        "rtf",
        "tiff",
        "txt",
        "tsv",
        "xls",
        "xlsx",
        "xml",
    ]

    inputs = [
        *BaseFileComponent._base_inputs,
        SecretStrInput(
            name="api_key",
            display_name="Unstructured.io Serverless API Key",
            required=True,
            info="Unstructured API Key. Create at: https://app.unstructured.io/",
        ),
        MessageTextInput(
            name="api_url",
            display_name="Unstructured.io API URL",
            required=False,
            info="Unstructured API URL.",
        ),
        DropdownInput(
            name="chunking_strategy",
            display_name="Chunking Strategy",
            info="Chunking strategy to use, see https://docs.unstructured.io/api-reference/api-services/chunking",
            options=["", "basic", "by_title", "by_page", "by_similarity"],
            real_time_refresh=False,
            value="",
        ),
        NestedDictInput(
            name="unstructured_args",
            display_name="Additional Arguments",
            required=False,
            info=(
                "Optional dictionary of additional arguments to the Loader. "
                "See https://docs.unstructured.io/api-reference/api-services/api-parameters for more information."
            ),
        ),
    ]

    outputs = [
        *BaseFileComponent._base_outputs,
    ]

    def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:
        file_paths = [str(file.path) for file in file_list if file.path]

        if not file_paths:
            self.log("No files to process.")
            return file_list

        # https://docs.unstructured.io/api-reference/api-services/api-parameters
        args = self.unstructured_args or {}

        if self.chunking_strategy:
            args["chunking_strategy"] = self.chunking_strategy

        args["api_key"] = self.api_key
        args["partition_via_api"] = True
        if self.api_url:
            args["url"] = self.api_url

        loader = UnstructuredLoader(
            file_paths,
            **args,
        )

        documents = loader.load()

        processed_data: list[Data | None] = [Data.from_document(doc) if doc else None for doc in documents]

        # Rename the `source` field to `self.SERVER_FILE_PATH_FIELDNAME`, to avoid conflicts with the `source` field
        for data in processed_data:
            if data and "source" in data.data:
                data.data[self.SERVER_FILE_PATH_FIELDNAME] = data.data.pop("source")

        return self.rollup_data(file_list, processed_data)

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com