Loaders

This Langflow feature is currently in public preview. Development is ongoing, and the features and functionality are subject to change. Langflow, and the use of such, is subject to the DataStax Preview Terms.

Loaders are components used to load documents from various sources, such as databases, websites, and local files. They can be used to fetch data from external sources and convert it into a format that can be processed by other components.

Confluence

The component integrates with Confluence, a wiki collaboration platform, to load and process documents. It uses the ConfluenceLoader from LangChain to fetch content from a specified Confluence space.

Parameters

Inputs
Name Display Name Info

url

Site URL

The base URL of the Confluence Space (e.g., https://<company>.atlassian.net/wiki)

username

Username

Atlassian User E-mail (e.g., email@example.com)

api_key

API Key

Atlassian API Key (Create at: https://id.atlassian.com/manage-profile/security/api-tokens)

space_key

Space Key

The key of the Confluence space to access

cloud

Use Cloud?

Whether to use Confluence Cloud (default: true)

content_format

Content Format

Specify content format (default: STORAGE)

max_pages

Max Pages

Maximum number of pages to retrieve (default: 1000)

Outputs
Name Display Name Info

data

Data

List of Data objects containing the loaded Confluence documents

Component code

Confluence.py
from typing import List
from langflow.custom import Component
from langflow.io import StrInput, SecretStrInput, BoolInput, DropdownInput, Output, IntInput
from langflow.schema import Data
from langchain_community.document_loaders import ConfluenceLoader
from langchain_community.document_loaders.confluence import ContentFormat


class ConfluenceComponent(Component):
    display_name = "Confluence"
    description = "Confluence wiki collaboration platform"
    documentation = "https://python.langchain.com/v0.2/docs/integrations/document_loaders/confluence/"
    trace_type = "tool"
    icon = "Confluence"
    name = "Confluence"

    inputs = [
        StrInput(
            name="url",
            display_name="Site URL",
            required=True,
            info="The base URL of the Confluence Space. Example: https://<company>.atlassian.net/wiki.",
        ),
        StrInput(
            name="username",
            display_name="Username",
            required=True,
            info="Atlassian User E-mail. Example: email@example.com",
        ),
        SecretStrInput(
            name="api_key",
            display_name="API Key",
            required=True,
            info="Atlassian Key. Create at: https://id.atlassian.com/manage-profile/security/api-tokens",
        ),
        StrInput(name="space_key", display_name="Space Key", required=True),
        BoolInput(name="cloud", display_name="Use Cloud?", required=True, value=True, advanced=True),
        DropdownInput(
            name="content_format",
            display_name="Content Format",
            options=[
                ContentFormat.EDITOR.value,
                ContentFormat.EXPORT_VIEW.value,
                ContentFormat.ANONYMOUS_EXPORT_VIEW.value,
                ContentFormat.STORAGE.value,
                ContentFormat.VIEW.value,
            ],
            value=ContentFormat.STORAGE.value,
            required=True,
            advanced=True,
            info="Specify content format, defaults to ContentFormat.STORAGE",
        ),
        IntInput(
            name="max_pages",
            display_name="Max Pages",
            required=False,
            value=1000,
            advanced=True,
            info="Maximum number of pages to retrieve in total, defaults 1000",
        ),
    ]

    outputs = [
        Output(name="data", display_name="Data", method="load_documents"),
    ]

    def build_confluence(self) -> ConfluenceLoader:
        content_format = ContentFormat(self.content_format)
        loader = ConfluenceLoader(
            url=self.url,
            username=self.username,
            api_key=self.api_key,
            cloud=self.cloud,
            space_key=self.space_key,
            content_format=content_format,
            max_pages=self.max_pages,
        )
        return loader

    def load_documents(self) -> List[Data]:
        confluence = self.build_confluence()
        documents = confluence.load()
        data = [Data.from_document(doc) for doc in documents]  # Using the from_document method of Data
        self.status = data
        return data

GitLoader

This component utilizes the GitLoader from LangChain to fetch and load documents from a specified Git repository.

Parameters

Inputs
Name Display Name Info

repo_path

Repository Path

The local path to the Git repository

clone_url

Clone URL

The URL to clone the Git repository from (optional)

branch

Branch

The branch to load files from (default: 'main')

file_filter

File Filter

Patterns to filter files (e.g., '.py' to include only .py files, '!.py' to exclude .py files)

content_filter

Content Filter

A regex pattern to filter files based on their content

Outputs
Name Display Name Info

data

Data

List of Data objects containing the loaded Git repository documents

Component code

GitLoader.py
from pathlib import Path
from typing import List
import re

from langchain_community.document_loaders.git import GitLoader
from langflow.custom import Component
from langflow.io import MessageTextInput, Output
from langflow.schema import Data


class GitLoaderComponent(Component):
    display_name = "GitLoader"
    description = "Load files from a Git repository"
    documentation = "https://python.langchain.com/v0.2/docs/integrations/document_loaders/git/"
    trace_type = "tool"
    icon = "GitLoader"
    name = "GitLoader"

    inputs = [
        MessageTextInput(
            name="repo_path",
            display_name="Repository Path",
            required=True,
            info="The local path to the Git repository.",
        ),
        MessageTextInput(
            name="clone_url",
            display_name="Clone URL",
            required=False,
            info="The URL to clone the Git repository from.",
        ),
        MessageTextInput(
            name="branch",
            display_name="Branch",
            required=False,
            value="main",
            info="The branch to load files from. Defaults to 'main'.",
        ),
        MessageTextInput(
            name="file_filter",
            display_name="File Filter",
            required=False,
            advanced=True,
            info="A list of patterns to filter files. Example to include only .py files: '*.py'. "
            "Example to exclude .py files: '!*.py'. Multiple patterns can be separated by commas.",
        ),
        MessageTextInput(
            name="content_filter",
            display_name="Content Filter",
            required=False,
            advanced=True,
            info="A regex pattern to filter files based on their content.",
        ),
    ]

    outputs = [
        Output(name="data", display_name="Data", method="load_documents"),
    ]

    @staticmethod
    def is_binary(file_path: str) -> bool:
        """
        Check if a file is binary by looking for null bytes.
        This is necessary because when searches are performed using
        the content_filter, binary files need to be ignored.
        """
        with open(file_path, "rb") as file:
            return b"\x00" in file.read(1024)

    def build_gitloader(self) -> GitLoader:
        file_filter_patterns = getattr(self, "file_filter", None)
        content_filter_pattern = getattr(self, "content_filter", None)

        file_filters = []
        if file_filter_patterns:
            patterns = [pattern.strip() for pattern in file_filter_patterns.split(",")]

            def file_filter(file_path: Path) -> bool:
                if len(patterns) == 1 and patterns[0].startswith("!"):
                    return not file_path.match(patterns[0][1:])
                included = any(file_path.match(pattern) for pattern in patterns if not pattern.startswith("!"))
                excluded = any(file_path.match(pattern[1:]) for pattern in patterns if pattern.startswith("!"))
                return included and not excluded

            file_filters.append(file_filter)

        if content_filter_pattern:
            content_regex = re.compile(content_filter_pattern)

            def content_filter(file_path: Path) -> bool:
                with file_path.open("r", encoding="utf-8", errors="ignore") as file:
                    content = file.read()
                    return bool(content_regex.search(content))

            file_filters.append(content_filter)

        def combined_filter(file_path: str) -> bool:
            path = Path(file_path)
            if self.is_binary(file_path):
                return False
            return all(f(path) for f in file_filters)

        loader = GitLoader(
            repo_path=self.repo_path,
            clone_url=self.clone_url,
            branch=self.branch,
            file_filter=combined_filter,
        )
        return loader

    def load_documents(self) -> List[Data]:
        gitloader = self.build_gitloader()
        documents = list(gitloader.lazy_load())
        data = [Data.from_document(doc) for doc in documents]
        self.status = data
        return data

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com