Logic components in Langflow

Logic components provide functionalities for routing, conditional processing, and flow management.

Conditional router

This component routes messages based on text comparison. It evaluates a condition by comparing two text inputs using a specified operator and routes the message accordingly.

Parameters

Inputs
Name Type Description

input_text

String

The primary text input for the operation.

match_text

String

The text input to compare against.

operator

Dropdown

The operator to apply for comparing the texts.

case_sensitive

Boolean

If true, the comparison will be case sensitive.

message

Message

The message to pass through either route.

max_iterations

Integer

The maximum number of iterations for the conditional router.

default_route

Dropdown

The default route to take when max iterations are reached.

Outputs
Name Type Description

true_result

Message

The output when the condition is true.

false_result

Message

The output when the condition is false.

Component code

conditional_router.py
from langflow.custom import Component
from langflow.io import BoolInput, DropdownInput, IntInput, MessageInput, MessageTextInput, Output
from langflow.schema.message import Message


class ConditionalRouterComponent(Component):
    display_name = "If-Else"
    description = "Routes an input message to a corresponding output based on text comparison."
    icon = "split"
    name = "ConditionalRouter"

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.__iteration_updated = False

    inputs = [
        MessageTextInput(
            name="input_text",
            display_name="Text Input",
            info="The primary text input for the operation.",
        ),
        MessageTextInput(
            name="match_text",
            display_name="Match Text",
            info="The text input to compare against.",
        ),
        DropdownInput(
            name="operator",
            display_name="Operator",
            options=["equals", "not equals", "contains", "starts with", "ends with"],
            info="The operator to apply for comparing the texts.",
            value="equals",
        ),
        BoolInput(
            name="case_sensitive",
            display_name="Case Sensitive",
            info="If true, the comparison will be case sensitive.",
            value=False,
            advanced=True,
        ),
        MessageInput(
            name="message",
            display_name="Message",
            info="The message to pass through either route.",
            advanced=True,
        ),
        IntInput(
            name="max_iterations",
            display_name="Max Iterations",
            info="The maximum number of iterations for the conditional router.",
            value=10,
        ),
        DropdownInput(
            name="default_route",
            display_name="Default Route",
            options=["true_result", "false_result"],
            info="The default route to take when max iterations are reached.",
            value="false_result",
            advanced=True,
        ),
    ]

    outputs = [
        Output(display_name="True", name="true_result", method="true_response"),
        Output(display_name="False", name="false_result", method="false_response"),
    ]

    def _pre_run_setup(self):
        self.__iteration_updated = False

    def evaluate_condition(self, input_text: str, match_text: str, operator: str, *, case_sensitive: bool) -> bool:
        if not case_sensitive:
            input_text = input_text.lower()
            match_text = match_text.lower()

        if operator == "equals":
            return input_text == match_text
        if operator == "not equals":
            return input_text != match_text
        if operator == "contains":
            return match_text in input_text
        if operator == "starts with":
            return input_text.startswith(match_text)
        if operator == "ends with":
            return input_text.endswith(match_text)
        return False

    def iterate_and_stop_once(self, route_to_stop: str):
        if not self.__iteration_updated:
            self.update_ctx({f"{self._id}_iteration": self.ctx.get(f"{self._id}_iteration", 0) + 1})
            self.__iteration_updated = True
            if self.ctx.get(f"{self._id}_iteration", 0) >= self.max_iterations and route_to_stop == self.default_route:
                # We need to stop the other route
                route_to_stop = "true_result" if route_to_stop == "false_result" else "false_result"
            self.stop(route_to_stop)

    def true_response(self) -> Message:
        result = self.evaluate_condition(
            self.input_text, self.match_text, self.operator, case_sensitive=self.case_sensitive
        )
        if result:
            self.status = self.message
            self.iterate_and_stop_once("false_result")
            return self.message
        self.iterate_and_stop_once("true_result")
        return self.message

    def false_response(self) -> Message:
        result = self.evaluate_condition(
            self.input_text, self.match_text, self.operator, case_sensitive=self.case_sensitive
        )
        if not result:
            self.status = self.message
            self.iterate_and_stop_once("true_result")
            return self.message
        self.iterate_and_stop_once("false_result")
        return self.message

Data conditional router

This component routes Data objects based on a condition applied to a specified key, including boolean validation.

Parameters

Inputs
Name Type Description

data_input

Data

The data object or list of data objects to process.

key_name

String

The name of the key in the data object to check.

operator

Dropdown

The operator to apply for comparing the values.

compare_value

String

The value to compare against (not used for boolean validator).

Outputs
Name Type Description

true_output

Data/List

Output when the condition is met.

false_output

Data/List

Output when the condition is not met.

Component code

data_conditional_router.py
from typing import Any

from langflow.custom import Component
from langflow.io import DataInput, DropdownInput, MessageTextInput, Output
from langflow.schema import Data, dotdict


class DataConditionalRouterComponent(Component):
    display_name = "Condition"
    description = "Route Data object(s) based on a condition applied to a specified key, including boolean validation."
    icon = "split"
    name = "DataConditionalRouter"
    legacy = True

    inputs = [
        DataInput(
            name="data_input",
            display_name="Data Input",
            info="The Data object or list of Data objects to process",
            is_list=True,
        ),
        MessageTextInput(
            name="key_name",
            display_name="Key Name",
            info="The name of the key in the Data object(s) to check",
        ),
        DropdownInput(
            name="operator",
            display_name="Operator",
            options=["equals", "not equals", "contains", "starts with", "ends with", "boolean validator"],
            info="The operator to apply for comparing the values. 'boolean validator' treats the value as a boolean.",
            value="equals",
        ),
        MessageTextInput(
            name="compare_value",
            display_name="Match Text",
            info="The value to compare against (not used for boolean validator)",
        ),
    ]

    outputs = [
        Output(display_name="True Output", name="true_output", method="process_data"),
        Output(display_name="False Output", name="false_output", method="process_data"),
    ]

    def compare_values(self, item_value: str, compare_value: str, operator: str) -> bool:
        if operator == "equals":
            return item_value == compare_value
        if operator == "not equals":
            return item_value != compare_value
        if operator == "contains":
            return compare_value in item_value
        if operator == "starts with":
            return item_value.startswith(compare_value)
        if operator == "ends with":
            return item_value.endswith(compare_value)
        if operator == "boolean validator":
            return self.parse_boolean(item_value)
        return False

    def parse_boolean(self, value):
        if isinstance(value, bool):
            return value
        if isinstance(value, str):
            return value.lower() in {"true", "1", "yes", "y", "on"}
        return bool(value)

    def validate_input(self, data_item: Data) -> bool:
        if not isinstance(data_item, Data):
            self.status = "Input is not a Data object"
            return False
        if self.key_name not in data_item.data:
            self.status = f"Key '{self.key_name}' not found in Data"
            return False
        return True

    def process_data(self) -> Data | list[Data]:
        if isinstance(self.data_input, list):
            true_output = []
            false_output = []
            for item in self.data_input:
                if self.validate_input(item):
                    result = self.process_single_data(item)
                    if result:
                        true_output.append(item)
                    else:
                        false_output.append(item)
            self.stop("false_output" if true_output else "true_output")
            return true_output or false_output
        if not self.validate_input(self.data_input):
            return Data(data={"error": self.status})
        result = self.process_single_data(self.data_input)
        self.stop("false_output" if result else "true_output")
        return self.data_input

    def process_single_data(self, data_item: Data) -> bool:
        item_value = data_item.data[self.key_name]
        operator = self.operator

        if operator == "boolean validator":
            condition_met = self.parse_boolean(item_value)
            condition_description = f"Boolean validation of '{self.key_name}'"
        else:
            compare_value = self.compare_value
            condition_met = self.compare_values(str(item_value), compare_value, operator)
            condition_description = f"{self.key_name} {operator} {compare_value}"

        if condition_met:
            self.status = f"Condition met: {condition_description}"
            return True
        self.status = f"Condition not met: {condition_description}"
        return False

    def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
        if field_name == "operator":
            if field_value == "boolean validator":
                build_config["compare_value"]["show"] = False
                build_config["compare_value"]["advanced"] = True
                build_config["compare_value"]["value"] = None
            else:
                build_config["compare_value"]["show"] = True
                build_config["compare_value"]["advanced"] = False

        return build_config

Flow as Tool

This component constructs a tool from a function that runs a loaded flow.

Parameters

Inputs
Name Type Description

flow_name

Dropdown

The name of the flow to run.

tool_name

String

The name of the tool.

tool_description

String

The description of the tool.

return_direct

Boolean

If true, returns the result directly from the tool.

Outputs
Name Type Description

api_build_tool

Tool

The constructed tool from the flow.

Component code

flow_tool.py
from typing import Any

from loguru import logger
from typing_extensions import override

from langflow.base.langchain_utilities.model import LCToolComponent
from langflow.base.tools.flow_tool import FlowTool
from langflow.field_typing import Tool
from langflow.graph.graph.base import Graph
from langflow.helpers.flow import get_flow_inputs
from langflow.io import BoolInput, DropdownInput, Output, StrInput
from langflow.schema import Data
from langflow.schema.dotdict import dotdict


class FlowToolComponent(LCToolComponent):
    display_name = "Flow as Tool"
    description = "Construct a Tool from a function that runs the loaded Flow."
    field_order = ["flow_name", "name", "description", "return_direct"]
    trace_type = "tool"
    name = "FlowTool"
    beta = True
    icon = "hammer"

    def get_flow_names(self) -> list[str]:
        flow_datas = self.list_flows()
        return [flow_data.data["name"] for flow_data in flow_datas]

    def get_flow(self, flow_name: str) -> Data | None:
        """Retrieves a flow by its name.

        Args:
            flow_name (str): The name of the flow to retrieve.

        Returns:
            Optional[Text]: The flow record if found, None otherwise.
        """
        flow_datas = self.list_flows()
        for flow_data in flow_datas:
            if flow_data.data["name"] == flow_name:
                return flow_data
        return None

    @override
    def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
        if field_name == "flow_name":
            build_config["flow_name"]["options"] = self.get_flow_names()

        return build_config

    inputs = [
        DropdownInput(
            name="flow_name", display_name="Flow Name", info="The name of the flow to run.", refresh_button=True
        ),
        StrInput(
            name="tool_name",
            display_name="Name",
            info="The name of the tool.",
        ),
        StrInput(
            name="tool_description",
            display_name="Description",
            info="The description of the tool.",
        ),
        BoolInput(
            name="return_direct",
            display_name="Return Direct",
            info="Return the result directly from the Tool.",
            advanced=True,
        ),
    ]

    outputs = [
        Output(name="api_build_tool", display_name="Tool", method="build_tool"),
    ]

    def build_tool(self) -> Tool:
        FlowTool.model_rebuild()
        if "flow_name" not in self._attributes or not self._attributes["flow_name"]:
            msg = "Flow name is required"
            raise ValueError(msg)
        flow_name = self._attributes["flow_name"]
        flow_data = self.get_flow(flow_name)
        if not flow_data:
            msg = "Flow not found."
            raise ValueError(msg)
        graph = Graph.from_payload(
            flow_data.data["data"],
            user_id=str(self.user_id),
        )
        try:
            graph.set_run_id(self.graph.run_id)
        except Exception:  # noqa: BLE001
            logger.opt(exception=True).warning("Failed to set run_id")
        inputs = get_flow_inputs(graph)
        tool = FlowTool(
            name=self.tool_name,
            description=self.tool_description,
            graph=graph,
            return_direct=self.return_direct,
            inputs=inputs,
            flow_id=str(flow_data.id),
            user_id=str(self.user_id),
            session_id=self.graph.session_id if hasattr(self, "graph") else None,
        )
        description_repr = repr(tool.description).strip("'")
        args_str = "\n".join([f"- {arg_name}: {arg_data['description']}" for arg_name, arg_data in tool.args.items()])
        self.status = f"{description_repr}\nArguments:\n{args_str}"
        return tool

Listen

This component listens for a notification and retrieves its associated state.

Parameters

Inputs
Name Type Description

name

String

The name of the notification to listen for.

Outputs
Name Type Description

output

Data

The state associated with the notification.

Component code

listen.py
from langflow.custom import CustomComponent
from langflow.schema import Data


class ListenComponent(CustomComponent):
    display_name = "Listen"
    description = "A component to listen for a notification."
    name = "Listen"
    beta: bool = True
    icon = "Radio"

    def build_config(self):
        return {
            "name": {
                "display_name": "Name",
                "info": "The name of the notification to listen for.",
            },
        }

    def build(self, name: str) -> Data:
        state = self.get_state(name)
        self._set_successors_ids()
        self.status = state
        return state

    def _set_successors_ids(self):
        self._vertex.is_state = True
        successors = self._vertex.graph.successor_map.get(self._vertex.id, [])
        return successors + self._vertex.graph.activated_vertices

Notify

This component generates a notification for the Listen component to use.

Parameters

Inputs
Name Type Description

name

String

The name of the notification.

data

Data

The data to store in the notification.

append

Boolean

If true, the record will be appended to the existing notification.

Outputs
Name Type Description

output

Data

The data stored in the notification.

Component code

notify.py
from langflow.custom import CustomComponent
from langflow.schema import Data


class NotifyComponent(CustomComponent):
    display_name = "Notify"
    description = "A component to generate a notification to Get Notified component."
    icon = "Notify"
    name = "Notify"
    beta: bool = True

    def build_config(self):
        return {
            "name": {"display_name": "Name", "info": "The name of the notification."},
            "data": {"display_name": "Data", "info": "The data to store."},
            "append": {
                "display_name": "Append",
                "info": "If True, the record will be appended to the notification.",
            },
        }

    def build(self, name: str, *, data: Data | None = None, append: bool = False) -> Data:
        if data and not isinstance(data, Data):
            if isinstance(data, str):
                data = Data(text=data)
            elif isinstance(data, dict):
                data = Data(data=data)
            else:
                data = Data(text=str(data))
        elif not data:
            data = Data(text="")
        if data:
            if append:
                self.append_state(name, data)
            else:
                self.update_state(name, data)
        else:
            self.status = "No record provided."
        self.status = data
        self._set_successors_ids()
        return data

    def _set_successors_ids(self):
        self._vertex.is_state = True
        successors = self._vertex.graph.successor_map.get(self._vertex.id, [])
        return successors + self._vertex.graph.activated_vertices

Run flow

This component runs a specified flow within a larger workflow with custom inputs and tweaks.

Parameters

Inputs
Name Type Description

input_value

String

The input value for the flow to process.

flow_name

Dropdown

The name of the flow to run.

tweaks

Nested Dict

Tweaks to apply to the flow.

Outputs
Name Type Description

run_outputs

List[Data]

The results generated from running the flow.

Component code

run_flow.py
from typing import TYPE_CHECKING, Any

from typing_extensions import override

from langflow.base.flow_processing.utils import build_data_from_run_outputs
from langflow.custom import Component
from langflow.io import DropdownInput, MessageTextInput, NestedDictInput, Output
from langflow.schema import Data, dotdict

if TYPE_CHECKING:
    from langflow.graph.schema import RunOutputs


class RunFlowComponent(Component):
    display_name = "Run Flow"
    description = "A component to run a flow."
    name = "RunFlow"
    legacy: bool = True
    icon = "workflow"

    def get_flow_names(self) -> list[str]:
        flow_data = self.list_flows()
        return [flow_data.data["name"] for flow_data in flow_data]

    @override
    def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
        if field_name == "flow_name":
            build_config["flow_name"]["options"] = self.get_flow_names()

        return build_config

    inputs = [
        MessageTextInput(
            name="input_value",
            display_name="Input Value",
            info="The input value to be processed by the flow.",
        ),
        DropdownInput(
            name="flow_name",
            display_name="Flow Name",
            info="The name of the flow to run.",
            options=[],
            refresh_button=True,
        ),
        NestedDictInput(
            name="tweaks",
            display_name="Tweaks",
            info="Tweaks to apply to the flow.",
        ),
    ]

    outputs = [
        Output(display_name="Run Outputs", name="run_outputs", method="generate_results"),
    ]

    async def generate_results(self) -> list[Data]:
        if "flow_name" not in self._attributes or not self._attributes["flow_name"]:
            msg = "Flow name is required"
            raise ValueError(msg)
        flow_name = self._attributes["flow_name"]

        results: list[RunOutputs | None] = await self.run_flow(
            inputs={"input_value": self.input_value}, flow_name=flow_name, tweaks=self.tweaks
        )
        if isinstance(results, list):
            data = []
            for result in results:
                if result:
                    data.extend(build_data_from_run_outputs(result))
        else:
            data = build_data_from_run_outputs()(results)

        self.status = data
        return data

Sub Flow

This component integrates entire flows as components. It dynamically generates inputs based on the selected flow and executes the flow with provided parameters.

Parameters

Inputs
Name Type Description

flow_name

Dropdown

The name of the flow to run.

Outputs
Name Type Description

flow_outputs

List[Data]

The outputs generated from the flow.

Component code

sub_flow.py
from typing import Any

from loguru import logger

from langflow.base.flow_processing.utils import build_data_from_result_data
from langflow.custom import Component
from langflow.graph.graph.base import Graph
from langflow.graph.vertex.base import Vertex
from langflow.helpers.flow import get_flow_inputs
from langflow.io import DropdownInput, Output
from langflow.schema import Data, dotdict


class SubFlowComponent(Component):
    display_name = "Sub Flow"
    description = "Generates a Component from a Flow, with all of its inputs, and "
    name = "SubFlow"
    beta: bool = True
    icon = "Workflow"

    def get_flow_names(self) -> list[str]:
        flow_data = self.list_flows()
        return [flow_data.data["name"] for flow_data in flow_data]

    def get_flow(self, flow_name: str) -> Data | None:
        flow_datas = self.list_flows()
        for flow_data in flow_datas:
            if flow_data.data["name"] == flow_name:
                return flow_data
        return None

    def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
        if field_name == "flow_name":
            build_config["flow_name"]["options"] = self.get_flow_names()

        for key in list(build_config.keys()):
            if key not in [x.name for x in self.inputs] + ["code", "_type", "get_final_results_only"]:
                del build_config[key]
        if field_value is not None and field_name == "flow_name":
            try:
                flow_data = self.get_flow(field_value)
            except Exception:  # noqa: BLE001
                logger.exception(f"Error getting flow {field_value}")
            else:
                if not flow_data:
                    msg = f"Flow {field_value} not found."
                    logger.error(msg)
                else:
                    try:
                        graph = Graph.from_payload(flow_data.data["data"])
                        # Get all inputs from the graph
                        inputs = get_flow_inputs(graph)
                        # Add inputs to the build config
                        build_config = self.add_inputs_to_build_config(inputs, build_config)
                    except Exception:  # noqa: BLE001
                        logger.exception(f"Error building graph for flow {field_value}")

        return build_config

    def add_inputs_to_build_config(self, inputs_vertex: list[Vertex], build_config: dotdict):
        new_fields: list[dotdict] = []

        for vertex in inputs_vertex:
            new_vertex_inputs = []
            field_template = vertex.data["node"]["template"]
            for inp in field_template:
                if inp not in {"code", "_type"}:
                    field_template[inp]["display_name"] = (
                        vertex.display_name + " - " + field_template[inp]["display_name"]
                    )
                    field_template[inp]["name"] = vertex.id + "|" + inp
                    new_vertex_inputs.append(field_template[inp])
            new_fields += new_vertex_inputs
        for field in new_fields:
            build_config[field["name"]] = field
        return build_config

    inputs = [
        DropdownInput(
            name="flow_name",
            display_name="Flow Name",
            info="The name of the flow to run.",
            options=[],
            refresh_button=True,
            real_time_refresh=True,
        ),
    ]

    outputs = [Output(name="flow_outputs", display_name="Flow Outputs", method="generate_results")]

    async def generate_results(self) -> list[Data]:
        tweaks: dict = {}
        for field in self._attributes:
            if field != "flow_name" and "|" in field:
                [node, name] = field.split("|")
                if node not in tweaks:
                    tweaks[node] = {}
                tweaks[node][name] = self._attributes[field]
        flow_name = self._attributes.get("flow_name")
        run_outputs = await self.run_flow(
            tweaks=tweaks,
            flow_name=flow_name,
            output_type="all",
        )
        data: list[Data] = []
        if not run_outputs:
            return data
        run_output = run_outputs[0]

        if run_output is not None:
            for output in run_output.outputs:
                if output:
                    data.extend(build_data_from_result_data(output))
        return data

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com