Logic components in Langflow

This Langflow feature is currently in public preview. Development is ongoing, and the features and functionality are subject to change. Langflow, and the use of such, is subject to the DataStax Preview Terms.

Logic components provide functionalities for routing, conditional processing, and flow management.

Use a logic component in a flow

This flow creates a "for each" loop with the Loop component.

The component iterates over a list of Data objects until it’s completed, and then the Done loop aggregates the results.

The File component loads text files from your local machine, and then the Parse Data component parses them into a list of structured Data objects. The Loop component passes each Data object to a Prompt to be summarized.

When the Loop component runs out of Data, the Done loop activates, which counts the number of pages and summarizes their tone with another Prompt. This is represented in Langflow by connecting the Parse Data component’s Data List output to the Loop component’s Data loop input.

A loop text summarizer flow

The output is similar to the following:

Document Summary
Total Pages Processed
Total Pages: 2
Overall Tone of Document
Tone: Informative and Instructional
The documentation outlines microservices architecture patterns and best practices.
It emphasizes service isolation and inter-service communication protocols.
The use of asynchronous messaging patterns is recommended for system scalability.
It includes code examples of REST and gRPC implementations to demonstrate integration approaches.

Conditional router (If-Else component)

This component routes messages based on text comparison. It evaluates a condition by comparing two text inputs using a specified operator and routes the message accordingly.

Parameters

Inputs
Name Type Description

input_text

String

The primary text input for the operation.

match_text

String

The text input to compare against.

operator

Dropdown

The operator to apply for comparing the texts.

case_sensitive

Boolean

If true, the comparison will be case sensitive.

message

Message

The message to pass through either route.

max_iterations

Integer

The maximum number of iterations for the conditional router.

default_route

Dropdown

The default route to take when max iterations are reached.

Outputs
Name Type Description

true_result

Message

The output when the condition is true.

false_result

Message

The output when the condition is false.

Component code

conditional_router.py
import re

from langflow.custom import Component
from langflow.io import BoolInput, DropdownInput, IntInput, MessageInput, MessageTextInput, Output
from langflow.schema.message import Message


class ConditionalRouterComponent(Component):
    display_name = "If-Else"
    description = "Routes an input message to a corresponding output based on text comparison."
    icon = "split"
    name = "ConditionalRouter"

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.__iteration_updated = False

    inputs = [
        MessageTextInput(
            name="input_text",
            display_name="Text Input",
            info="The primary text input for the operation.",
            required=True,
        ),
        MessageTextInput(
            name="match_text",
            display_name="Match Text",
            info="The text input to compare against.",
            required=True,
        ),
        DropdownInput(
            name="operator",
            display_name="Operator",
            options=["equals", "not equals", "contains", "starts with", "ends with", "regex"],
            info="The operator to apply for comparing the texts.",
            value="equals",
            real_time_refresh=True,
        ),
        BoolInput(
            name="case_sensitive",
            display_name="Case Sensitive",
            info="If true, the comparison will be case sensitive.",
            value=False,
        ),
        MessageInput(
            name="message",
            display_name="Message",
            info="The message to pass through either route.",
        ),
        IntInput(
            name="max_iterations",
            display_name="Max Iterations",
            info="The maximum number of iterations for the conditional router.",
            value=10,
            advanced=True,
        ),
        DropdownInput(
            name="default_route",
            display_name="Default Route",
            options=["true_result", "false_result"],
            info="The default route to take when max iterations are reached.",
            value="false_result",
            advanced=True,
        ),
    ]

    outputs = [
        Output(display_name="True", name="true_result", method="true_response"),
        Output(display_name="False", name="false_result", method="false_response"),
    ]

    def _pre_run_setup(self):
        self.__iteration_updated = False

    def evaluate_condition(self, input_text: str, match_text: str, operator: str, *, case_sensitive: bool) -> bool:
        if not case_sensitive and operator != "regex":
            input_text = input_text.lower()
            match_text = match_text.lower()

        if operator == "equals":
            return input_text == match_text
        if operator == "not equals":
            return input_text != match_text
        if operator == "contains":
            return match_text in input_text
        if operator == "starts with":
            return input_text.startswith(match_text)
        if operator == "ends with":
            return input_text.endswith(match_text)
        if operator == "regex":
            try:
                return bool(re.match(match_text, input_text))
            except re.error:
                return False  # Return False if the regex is invalid
        return False

    def iterate_and_stop_once(self, route_to_stop: str):
        if not self.__iteration_updated:
            self.update_ctx({f"{self._id}_iteration": self.ctx.get(f"{self._id}_iteration", 0) + 1})
            self.__iteration_updated = True
            if self.ctx.get(f"{self._id}_iteration", 0) >= self.max_iterations and route_to_stop == self.default_route:
                route_to_stop = "true_result" if route_to_stop == "false_result" else "false_result"
            self.stop(route_to_stop)

    def true_response(self) -> Message:
        result = self.evaluate_condition(
            self.input_text, self.match_text, self.operator, case_sensitive=self.case_sensitive
        )
        if result:
            self.status = self.message
            self.iterate_and_stop_once("false_result")
            return self.message
        self.iterate_and_stop_once("true_result")
        return Message(content="")

    def false_response(self) -> Message:
        result = self.evaluate_condition(
            self.input_text, self.match_text, self.operator, case_sensitive=self.case_sensitive
        )
        if not result:
            self.status = self.message
            self.iterate_and_stop_once("true_result")
            return self.message
        self.iterate_and_stop_once("false_result")
        return Message(content="")

    def update_build_config(self, build_config: dict, field_value: str, field_name: str | None = None) -> dict:
        if field_name == "operator":
            if field_value == "regex":
                build_config.pop("case_sensitive", None)

            # Ensure case_sensitive is present for all other operators
            elif "case_sensitive" not in build_config:
                case_sensitive_input = next(
                    (input_field for input_field in self.inputs if input_field.name == "case_sensitive"), None
                )
                if case_sensitive_input:
                    build_config["case_sensitive"] = case_sensitive_input.to_dict()
        return build_config

Data conditional router

This component routes Data objects based on a condition applied to a specified key, including boolean validation.

Parameters

Inputs
Name Type Description

data_input

Data

The data object or list of data objects to process.

key_name

String

The name of the key in the data object to check.

operator

Dropdown

The operator to apply for comparing the values.

compare_value

String

The value to compare against (not used for boolean validator).

Outputs
Name Type Description

true_output

Data/List

Output when the condition is met.

false_output

Data/List

Output when the condition is not met.

Component code

data_conditional_router.py
from typing import Any

from langflow.custom import Component
from langflow.io import DataInput, DropdownInput, MessageTextInput, Output
from langflow.schema import Data, dotdict


class DataConditionalRouterComponent(Component):
    display_name = "Condition"
    description = "Route Data object(s) based on a condition applied to a specified key, including boolean validation."
    icon = "split"
    name = "DataConditionalRouter"
    legacy = True

    inputs = [
        DataInput(
            name="data_input",
            display_name="Data Input",
            info="The Data object or list of Data objects to process",
            is_list=True,
        ),
        MessageTextInput(
            name="key_name",
            display_name="Key Name",
            info="The name of the key in the Data object(s) to check",
        ),
        DropdownInput(
            name="operator",
            display_name="Operator",
            options=["equals", "not equals", "contains", "starts with", "ends with", "boolean validator"],
            info="The operator to apply for comparing the values. 'boolean validator' treats the value as a boolean.",
            value="equals",
        ),
        MessageTextInput(
            name="compare_value",
            display_name="Match Text",
            info="The value to compare against (not used for boolean validator)",
        ),
    ]

    outputs = [
        Output(display_name="True Output", name="true_output", method="process_data"),
        Output(display_name="False Output", name="false_output", method="process_data"),
    ]

    def compare_values(self, item_value: str, compare_value: str, operator: str) -> bool:
        if operator == "equals":
            return item_value == compare_value
        if operator == "not equals":
            return item_value != compare_value
        if operator == "contains":
            return compare_value in item_value
        if operator == "starts with":
            return item_value.startswith(compare_value)
        if operator == "ends with":
            return item_value.endswith(compare_value)
        if operator == "boolean validator":
            return self.parse_boolean(item_value)
        return False

    def parse_boolean(self, value):
        if isinstance(value, bool):
            return value
        if isinstance(value, str):
            return value.lower() in {"true", "1", "yes", "y", "on"}
        return bool(value)

    def validate_input(self, data_item: Data) -> bool:
        if not isinstance(data_item, Data):
            self.status = "Input is not a Data object"
            return False
        if self.key_name not in data_item.data:
            self.status = f"Key '{self.key_name}' not found in Data"
            return False
        return True

    def process_data(self) -> Data | list[Data]:
        if isinstance(self.data_input, list):
            true_output = []
            false_output = []
            for item in self.data_input:
                if self.validate_input(item):
                    result = self.process_single_data(item)
                    if result:
                        true_output.append(item)
                    else:
                        false_output.append(item)
            self.stop("false_output" if true_output else "true_output")
            return true_output or false_output
        if not self.validate_input(self.data_input):
            return Data(data={"error": self.status})
        result = self.process_single_data(self.data_input)
        self.stop("false_output" if result else "true_output")
        return self.data_input

    def process_single_data(self, data_item: Data) -> bool:
        item_value = data_item.data[self.key_name]
        operator = self.operator

        if operator == "boolean validator":
            condition_met = self.parse_boolean(item_value)
            condition_description = f"Boolean validation of '{self.key_name}'"
        else:
            compare_value = self.compare_value
            condition_met = self.compare_values(str(item_value), compare_value, operator)
            condition_description = f"{self.key_name} {operator} {compare_value}"

        if condition_met:
            self.status = f"Condition met: {condition_description}"
            return True
        self.status = f"Condition not met: {condition_description}"
        return False

    def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
        if field_name == "operator":
            if field_value == "boolean validator":
                build_config["compare_value"]["show"] = False
                build_config["compare_value"]["advanced"] = True
                build_config["compare_value"]["value"] = None
            else:
                build_config["compare_value"]["show"] = True
                build_config["compare_value"]["advanced"] = False

        return build_config

Flow as Tool

This component is deprecated as of Langflow version 1.1.2. Instead, use the Run flow component.

Parameters

Inputs
Name Type Description

flow_name

Dropdown

The name of the flow to run.

tool_name

String

The name of the tool.

tool_description

String

The description of the tool.

return_direct

Boolean

If true, returns the result directly from the tool.

Outputs
Name Type Description

api_build_tool

Tool

The constructed tool from the flow.

Component code

flow_tool.py
from typing import Any

from loguru import logger
from typing_extensions import override

from langflow.base.langchain_utilities.model import LCToolComponent
from langflow.base.tools.flow_tool import FlowTool
from langflow.field_typing import Tool
from langflow.graph.graph.base import Graph
from langflow.helpers.flow import get_flow_inputs
from langflow.io import BoolInput, DropdownInput, Output, StrInput
from langflow.schema import Data
from langflow.schema.dotdict import dotdict


class FlowToolComponent(LCToolComponent):
    display_name = "Flow as Tool [Deprecated]"
    description = "Construct a Tool from a function that runs the loaded Flow."
    field_order = ["flow_name", "name", "description", "return_direct"]
    trace_type = "tool"
    name = "FlowTool"
    legacy: bool = True
    icon = "hammer"

    async def get_flow_names(self) -> list[str]:
        flow_datas = await self.alist_flows()
        return [flow_data.data["name"] for flow_data in flow_datas]

    async def get_flow(self, flow_name: str) -> Data | None:
        """Retrieves a flow by its name.

        Args:
            flow_name (str): The name of the flow to retrieve.

        Returns:
            Optional[Text]: The flow record if found, None otherwise.
        """
        flow_datas = await self.alist_flows()
        for flow_data in flow_datas:
            if flow_data.data["name"] == flow_name:
                return flow_data
        return None

    @override
    async def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
        if field_name == "flow_name":
            build_config["flow_name"]["options"] = self.get_flow_names()

        return build_config

    inputs = [
        DropdownInput(
            name="flow_name", display_name="Flow Name", info="The name of the flow to run.", refresh_button=True
        ),
        StrInput(
            name="tool_name",
            display_name="Name",
            info="The name of the tool.",
        ),
        StrInput(
            name="tool_description",
            display_name="Description",
            info="The description of the tool; defaults to the Flow's description.",
        ),
        BoolInput(
            name="return_direct",
            display_name="Return Direct",
            info="Return the result directly from the Tool.",
            advanced=True,
        ),
    ]

    outputs = [
        Output(name="api_build_tool", display_name="Tool", method="build_tool"),
    ]

    async def build_tool(self) -> Tool:
        FlowTool.model_rebuild()
        if "flow_name" not in self._attributes or not self._attributes["flow_name"]:
            msg = "Flow name is required"
            raise ValueError(msg)
        flow_name = self._attributes["flow_name"]
        flow_data = await self.get_flow(flow_name)
        if not flow_data:
            msg = "Flow not found."
            raise ValueError(msg)
        graph = Graph.from_payload(
            flow_data.data["data"],
            user_id=str(self.user_id),
        )
        try:
            graph.set_run_id(self.graph.run_id)
        except Exception:  # noqa: BLE001
            logger.opt(exception=True).warning("Failed to set run_id")
        inputs = get_flow_inputs(graph)
        tool_description = self.tool_description.strip() or flow_data.description
        tool = FlowTool(
            name=self.tool_name,
            description=tool_description,
            graph=graph,
            return_direct=self.return_direct,
            inputs=inputs,
            flow_id=str(flow_data.id),
            user_id=str(self.user_id),
            session_id=self.graph.session_id if hasattr(self, "graph") else None,
        )
        description_repr = repr(tool.description).strip("'")
        args_str = "\n".join([f"- {arg_name}: {arg_data['description']}" for arg_name, arg_data in tool.args.items()])
        self.status = f"{description_repr}\nArguments:\n{args_str}"
        return tool

Listen

This component listens for a notification and retrieves its associated state.

Parameters

Inputs
Name Type Description

name

String

The name of the notification to listen for.

Outputs
Name Type Description

output

Data

The state associated with the notification.

Component code

listen.py
from langflow.custom import CustomComponent
from langflow.schema import Data


class ListenComponent(CustomComponent):
    display_name = "Listen"
    description = "A component to listen for a notification."
    name = "Listen"
    beta: bool = True
    icon = "Radio"

    def build_config(self):
        return {
            "name": {
                "display_name": "Name",
                "info": "The name of the notification to listen for.",
            },
        }

    def build(self, name: str) -> Data:
        state = self.get_state(name)
        self._set_successors_ids()
        self.status = state
        return state

    def _set_successors_ids(self):
        self._vertex.is_state = True
        successors = self._vertex.graph.successor_map.get(self._vertex.id, [])
        return successors + self._vertex.graph.activated_vertices

Loop

This component iterates over a list of Data objects, outputting one item at a time and aggregating results from loop inputs.

Parameters

Inputs
Name Type Description

data

Data/List

The initial list of Data objects to iterate over.

Outputs
Name Type Description

item

Data

Outputs one item at a time from the data list.

done

Data

Triggered when iteration complete, returns aggregated results.

Component code

loop.py
from langflow.custom import Component
from langflow.io import DataInput, Output
from langflow.schema import Data


class LoopComponent(Component):
    display_name = "Loop"
    description = (
        "Iterates over a list of Data objects, outputting one item at a time and aggregating results from loop inputs."
    )
    icon = "infinity"

    inputs = [
        DataInput(
            name="data",
            display_name="Data",
            info="The initial list of Data objects to iterate over.",
        ),
    ]

    outputs = [
        Output(display_name="Item", name="item", method="item_output", allows_loop=True),
        Output(display_name="Done", name="done", method="done_output"),
    ]

    def initialize_data(self) -> None:
        """Initialize the data list, context index, and aggregated list."""
        if self.ctx.get(f"{self._id}_initialized", False):
            return

        # Ensure data is a list of Data objects
        data_list = self._validate_data(self.data)

        # Store the initial data and context variables
        self.update_ctx(
            {
                f"{self._id}_data": data_list,
                f"{self._id}_index": 0,
                f"{self._id}_aggregated": [],
                f"{self._id}_initialized": True,
            }
        )

    def _validate_data(self, data):
        """Validate and return a list of Data objects."""
        if isinstance(data, Data):
            return [data]
        if isinstance(data, list) and all(isinstance(item, Data) for item in data):
            return data
        msg = "The 'data' input must be a list of Data objects or a single Data object."
        raise TypeError(msg)

    def evaluate_stop_loop(self) -> bool:
        """Evaluate whether to stop item or done output."""
        current_index = self.ctx.get(f"{self._id}_index", 0)
        data_length = len(self.ctx.get(f"{self._id}_data", []))
        return current_index > data_length

    def item_output(self) -> Data:
        """Output the next item in the list or stop if done."""
        self.initialize_data()
        current_item = Data(text="")

        if self.evaluate_stop_loop():
            self.stop("item")
            return Data(text="")

        # Get data list and current index
        data_list, current_index = self.loop_variables()
        if current_index < len(data_list):
            # Output current item and increment index
            try:
                current_item = data_list[current_index]
            except IndexError:
                current_item = Data(text="")
        self.aggregated_output()
        self.update_ctx({f"{self._id}_index": current_index + 1})
        return current_item

    def done_output(self) -> Data:
        """Trigger the done output when iteration is complete."""
        self.initialize_data()

        if self.evaluate_stop_loop():
            self.stop("item")
            self.start("done")

            return self.ctx.get(f"{self._id}_aggregated", [])
        self.stop("done")
        return Data(text="")

    def loop_variables(self):
        """Retrieve loop variables from context."""
        return (
            self.ctx.get(f"{self._id}_data", []),
            self.ctx.get(f"{self._id}_index", 0),
        )

    def aggregated_output(self) -> Data:
        """Return the aggregated list once all items are processed."""
        self.initialize_data()

        # Get data list and aggregated list
        data_list = self.ctx.get(f"{self._id}_data", [])
        aggregated = self.ctx.get(f"{self._id}_aggregated", [])

        # Check if loop input is provided and append to aggregated list
        if self.item is not None and not isinstance(self.item, str) and len(aggregated) <= len(data_list):
            aggregated.append(self.item)
            self.update_ctx({f"{self._id}_aggregated": aggregated})
        return aggregated

Notify

This component generates a notification for the Listen component to use.

Parameters

Inputs
Name Type Description

name

String

The name of the notification.

data

Data

The data to store in the notification.

append

Boolean

If true, the record will be appended to the existing notification.

Outputs
Name Type Description

output

Data

The data stored in the notification.

Component code

notify.py
from langflow.custom import CustomComponent
from langflow.schema import Data


class NotifyComponent(CustomComponent):
    display_name = "Notify"
    description = "A component to generate a notification to Get Notified component."
    icon = "Notify"
    name = "Notify"
    beta: bool = True

    def build_config(self):
        return {
            "name": {"display_name": "Name", "info": "The name of the notification."},
            "data": {"display_name": "Data", "info": "The data to store."},
            "append": {
                "display_name": "Append",
                "info": "If True, the record will be appended to the notification.",
            },
        }

    def build(self, name: str, *, data: Data | None = None, append: bool = False) -> Data:
        if data and not isinstance(data, Data):
            if isinstance(data, str):
                data = Data(text=data)
            elif isinstance(data, dict):
                data = Data(data=data)
            else:
                data = Data(text=str(data))
        elif not data:
            data = Data(text="")
        if data:
            if append:
                self.append_state(name, data)
            else:
                self.update_state(name, data)
        else:
            self.status = "No record provided."
        self.status = data
        self._set_successors_ids()
        return data

    def _set_successors_ids(self):
        self._vertex.is_state = True
        successors = self._vertex.graph.successor_map.get(self._vertex.id, [])
        return successors + self._vertex.graph.activated_vertices

Pass message

This component forwards the input message, unchanged.

Parameters

Inputs
Name Display Name Info

input_message

Input Message

The message to be passed forward.

ignored_message

Ignored Message

A second message to be ignored. Used as a workaround for continuity.

Outputs
Name Display Name Info

output_message

Output Message

The forwarded input message, unchanged.

Component code

pass_message.py
from langflow.custom import Component
from langflow.io import MessageInput
from langflow.schema.message import Message
from langflow.template import Output


class PassMessageComponent(Component):
    display_name = "Pass"
    description = "Forwards the input message, unchanged."
    name = "Pass"
    icon = "arrow-right"

    inputs = [
        MessageInput(
            name="input_message",
            display_name="Input Message",
            info="The message to be passed forward.",
            required=True,
        ),
        MessageInput(
            name="ignored_message",
            display_name="Ignored Message",
            info="A second message to be ignored. Used as a workaround for continuity.",
            advanced=True,
        ),
    ]

    outputs = [
        Output(display_name="Output Message", name="output_message", method="pass_message"),
    ]

    def pass_message(self) -> Message:
        self.status = self.input_message
        return self.input_message

Run flow

This component runs a specified flow within a larger workflow with custom inputs and tweaks.

The Run Flow component can also be used as a tool when connected to an Agent. The name and description metadata that the Agent uses to register the tool are created automatically.

When you select a flow, the component fetches the flow’s graph structure and uses it to generate the inputs and outputs for the Run Flow component.

To use the Run Flow component as a tool, do the following:

  1. Add the Run Flow component to the Simple agent flow.

  2. In the Flow Name menu, select the sub-flow you want to run. The appearance of the Run Flow component changes to reflect the inputs and outputs of the selected flow.

  3. On the Run Flow component, enable Tool Mode.

  4. Connect the Run Flow component to the Toolset input of the Agent. Your flow should now look like this:

    A run flow component running a flow
  5. Run the flow. The Agent uses the Run Flow component as a tool to run the selected sub-flow.

Parameters

Inputs
Name Type Description

input_value

String

The input value for the flow to process.

flow_name

Dropdown

The name of the flow to run.

tweaks

Nested Dict

Tweaks to apply to the flow.

Outputs
Name Type Description

run_outputs

List[Data]

The results generated from running the flow.

Component code

run_flow.py
from typing import Any

from loguru import logger

from langflow.base.tools.run_flow import RunFlowBaseComponent
from langflow.helpers.flow import run_flow
from langflow.schema import dotdict


class RunFlowComponent(RunFlowBaseComponent):
    display_name = "Run Flow"
    description = (
        "Creates a tool component from a Flow that takes all its inputs and runs it. "
        " \n **Select a Flow to use the tool mode**"
    )
    beta = True
    name = "RunFlow"
    icon = "Workflow"

    inputs = RunFlowBaseComponent._base_inputs
    outputs = RunFlowBaseComponent._base_outputs

    async def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
        if field_name == "flow_name_selected":
            build_config["flow_name_selected"]["options"] = await self.get_flow_names()
            missing_keys = [key for key in self.default_keys if key not in build_config]
            if missing_keys:
                msg = f"Missing required keys in build_config: {missing_keys}"
                raise ValueError(msg)
            if field_value is not None:
                try:
                    graph = await self.get_graph(field_value)
                    build_config = self.update_build_config_from_graph(build_config, graph)
                except Exception as e:
                    msg = f"Error building graph for flow {field_value}"
                    logger.exception(msg)
                    raise RuntimeError(msg) from e
        return build_config

    async def run_flow_with_tweaks(self):
        tweaks: dict = {}

        flow_name_selected = self._attributes.get("flow_name_selected")
        parsed_flow_tweak_data = self._attributes.get("flow_tweak_data", {})
        if not isinstance(parsed_flow_tweak_data, dict):
            parsed_flow_tweak_data = parsed_flow_tweak_data.dict()

        if parsed_flow_tweak_data != {}:
            for field in parsed_flow_tweak_data:
                if "~" in field:
                    [node, name] = field.split("~")
                    if node not in tweaks:
                        tweaks[node] = {}
                    tweaks[node][name] = parsed_flow_tweak_data[field]
        else:
            for field in self._attributes:
                if field not in self.default_keys and "~" in field:
                    [node, name] = field.split("~")
                    if node not in tweaks:
                        tweaks[node] = {}
                    tweaks[node][name] = self._attributes[field]

        return await run_flow(
            inputs=None,
            output_type="all",
            flow_id=None,
            flow_name=flow_name_selected,
            tweaks=tweaks,
            user_id=str(self.user_id),
            session_id=self.graph.session_id or self.session_id,
        )

Sub Flow

This component is deprecated as of Langflow version 1.1.2. Instead, use the Run flow component.

This component integrates entire flows as components. It dynamically generates inputs based on the selected flow and executes the flow with provided parameters.

Parameters

Inputs
Name Type Description

flow_name

Dropdown

The name of the flow to run.

Outputs
Name Type Description

flow_outputs

List[Data]

The outputs generated from the flow.

Component code

sub_flow.py
from typing import Any

from loguru import logger

from langflow.base.flow_processing.utils import build_data_from_result_data
from langflow.custom import Component
from langflow.graph.graph.base import Graph
from langflow.graph.vertex.base import Vertex
from langflow.helpers.flow import get_flow_inputs
from langflow.io import DropdownInput, Output
from langflow.schema import Data, dotdict


class SubFlowComponent(Component):
    display_name = "Sub Flow [Deprecated]"
    description = "Generates a Component from a Flow, with all of its inputs, and "
    name = "SubFlow"
    legacy: bool = True
    icon = "Workflow"

    async def get_flow_names(self) -> list[str]:
        flow_data = await self.alist_flows()
        return [flow_data.data["name"] for flow_data in flow_data]

    async def get_flow(self, flow_name: str) -> Data | None:
        flow_datas = await self.alist_flows()
        for flow_data in flow_datas:
            if flow_data.data["name"] == flow_name:
                return flow_data
        return None

    async def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
        if field_name == "flow_name":
            build_config["flow_name"]["options"] = await self.get_flow_names()

        for key in list(build_config.keys()):
            if key not in [x.name for x in self.inputs] + ["code", "_type", "get_final_results_only"]:
                del build_config[key]
        if field_value is not None and field_name == "flow_name":
            try:
                flow_data = await self.get_flow(field_value)
            except Exception:  # noqa: BLE001
                logger.exception(f"Error getting flow {field_value}")
            else:
                if not flow_data:
                    msg = f"Flow {field_value} not found."
                    logger.error(msg)
                else:
                    try:
                        graph = Graph.from_payload(flow_data.data["data"])
                        # Get all inputs from the graph
                        inputs = get_flow_inputs(graph)
                        # Add inputs to the build config
                        build_config = self.add_inputs_to_build_config(inputs, build_config)
                    except Exception:  # noqa: BLE001
                        logger.exception(f"Error building graph for flow {field_value}")

        return build_config

    def add_inputs_to_build_config(self, inputs_vertex: list[Vertex], build_config: dotdict):
        new_fields: list[dotdict] = []

        for vertex in inputs_vertex:
            new_vertex_inputs = []
            field_template = vertex.data["node"]["template"]
            for inp in field_template:
                if inp not in {"code", "_type"}:
                    field_template[inp]["display_name"] = (
                        vertex.display_name + " - " + field_template[inp]["display_name"]
                    )
                    field_template[inp]["name"] = vertex.id + "|" + inp
                    new_vertex_inputs.append(field_template[inp])
            new_fields += new_vertex_inputs
        for field in new_fields:
            build_config[field["name"]] = field
        return build_config

    inputs = [
        DropdownInput(
            name="flow_name",
            display_name="Flow Name",
            info="The name of the flow to run.",
            options=[],
            refresh_button=True,
            real_time_refresh=True,
        ),
    ]

    outputs = [Output(name="flow_outputs", display_name="Flow Outputs", method="generate_results")]

    async def generate_results(self) -> list[Data]:
        tweaks: dict = {}
        for field in self._attributes:
            if field != "flow_name" and "|" in field:
                [node, name] = field.split("|")
                if node not in tweaks:
                    tweaks[node] = {}
                tweaks[node][name] = self._attributes[field]
        flow_name = self._attributes.get("flow_name")
        run_outputs = await self.run_flow(
            tweaks=tweaks,
            flow_name=flow_name,
            output_type="all",
        )
        data: list[Data] = []
        if not run_outputs:
            return data
        run_output = run_outputs[0]

        if run_output is not None:
            for output in run_output.outputs:
                if output:
                    data.extend(build_data_from_result_data(output))
        return data

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax | Privacy policy | Terms of use | Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com