Logic components in Langflow
This Langflow feature is currently in public preview. Development is ongoing, and the features and functionality are subject to change. Langflow, and the use of such, is subject to the DataStax Preview Terms. |
Logic components provide functionalities for routing, conditional processing, and flow management.
Use a logic component in a flow
This flow creates a "for each" loop with the Loop component.
The component iterates over a list of Data
objects until it’s completed, and then the Done loop aggregates the results.
The File component loads text files from your local machine, and then the Parse Data component parses them into a list of structured Data
objects.
The Loop component passes each Data
object to a Prompt to be summarized.
When the Loop component runs out of Data
, the Done loop activates, which counts the number of pages and summarizes their tone with another Prompt.
This is represented in Langflow by connecting the Parse Data component’s Data List output to the Loop component’s Data
loop input.
data:image/s3,"s3://crabby-images/65898/65898c4e513ed9e91c07a67da3c44b16083f6e23" alt="A loop text summarizer flow"
The output is similar to the following:
Document Summary
Total Pages Processed
Total Pages: 2
Overall Tone of Document
Tone: Informative and Instructional
The documentation outlines microservices architecture patterns and best practices.
It emphasizes service isolation and inter-service communication protocols.
The use of asynchronous messaging patterns is recommended for system scalability.
It includes code examples of REST and gRPC implementations to demonstrate integration approaches.
Conditional router (If-Else component)
This component routes messages based on text comparison. It evaluates a condition by comparing two text inputs using a specified operator and routes the message accordingly.
Parameters
Name | Type | Description |
---|---|---|
input_text |
String |
The primary text input for the operation. |
match_text |
String |
The text input to compare against. |
operator |
Dropdown |
The operator to apply for comparing the texts. |
case_sensitive |
Boolean |
If true, the comparison will be case sensitive. |
message |
Message |
The message to pass through either route. |
max_iterations |
Integer |
The maximum number of iterations for the conditional router. |
default_route |
Dropdown |
The default route to take when max iterations are reached. |
Name | Type | Description |
---|---|---|
true_result |
Message |
The output when the condition is true. |
false_result |
Message |
The output when the condition is false. |
Component code
conditional_router.py
import re
from langflow.custom import Component
from langflow.io import BoolInput, DropdownInput, IntInput, MessageInput, MessageTextInput, Output
from langflow.schema.message import Message
class ConditionalRouterComponent(Component):
display_name = "If-Else"
description = "Routes an input message to a corresponding output based on text comparison."
icon = "split"
name = "ConditionalRouter"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__iteration_updated = False
inputs = [
MessageTextInput(
name="input_text",
display_name="Text Input",
info="The primary text input for the operation.",
required=True,
),
MessageTextInput(
name="match_text",
display_name="Match Text",
info="The text input to compare against.",
required=True,
),
DropdownInput(
name="operator",
display_name="Operator",
options=["equals", "not equals", "contains", "starts with", "ends with", "regex"],
info="The operator to apply for comparing the texts.",
value="equals",
real_time_refresh=True,
),
BoolInput(
name="case_sensitive",
display_name="Case Sensitive",
info="If true, the comparison will be case sensitive.",
value=False,
),
MessageInput(
name="message",
display_name="Message",
info="The message to pass through either route.",
),
IntInput(
name="max_iterations",
display_name="Max Iterations",
info="The maximum number of iterations for the conditional router.",
value=10,
advanced=True,
),
DropdownInput(
name="default_route",
display_name="Default Route",
options=["true_result", "false_result"],
info="The default route to take when max iterations are reached.",
value="false_result",
advanced=True,
),
]
outputs = [
Output(display_name="True", name="true_result", method="true_response"),
Output(display_name="False", name="false_result", method="false_response"),
]
def _pre_run_setup(self):
self.__iteration_updated = False
def evaluate_condition(self, input_text: str, match_text: str, operator: str, *, case_sensitive: bool) -> bool:
if not case_sensitive and operator != "regex":
input_text = input_text.lower()
match_text = match_text.lower()
if operator == "equals":
return input_text == match_text
if operator == "not equals":
return input_text != match_text
if operator == "contains":
return match_text in input_text
if operator == "starts with":
return input_text.startswith(match_text)
if operator == "ends with":
return input_text.endswith(match_text)
if operator == "regex":
try:
return bool(re.match(match_text, input_text))
except re.error:
return False # Return False if the regex is invalid
return False
def iterate_and_stop_once(self, route_to_stop: str):
if not self.__iteration_updated:
self.update_ctx({f"{self._id}_iteration": self.ctx.get(f"{self._id}_iteration", 0) + 1})
self.__iteration_updated = True
if self.ctx.get(f"{self._id}_iteration", 0) >= self.max_iterations and route_to_stop == self.default_route:
route_to_stop = "true_result" if route_to_stop == "false_result" else "false_result"
self.stop(route_to_stop)
def true_response(self) -> Message:
result = self.evaluate_condition(
self.input_text, self.match_text, self.operator, case_sensitive=self.case_sensitive
)
if result:
self.status = self.message
self.iterate_and_stop_once("false_result")
return self.message
self.iterate_and_stop_once("true_result")
return Message(content="")
def false_response(self) -> Message:
result = self.evaluate_condition(
self.input_text, self.match_text, self.operator, case_sensitive=self.case_sensitive
)
if not result:
self.status = self.message
self.iterate_and_stop_once("true_result")
return self.message
self.iterate_and_stop_once("false_result")
return Message(content="")
def update_build_config(self, build_config: dict, field_value: str, field_name: str | None = None) -> dict:
if field_name == "operator":
if field_value == "regex":
build_config.pop("case_sensitive", None)
# Ensure case_sensitive is present for all other operators
elif "case_sensitive" not in build_config:
case_sensitive_input = next(
(input_field for input_field in self.inputs if input_field.name == "case_sensitive"), None
)
if case_sensitive_input:
build_config["case_sensitive"] = case_sensitive_input.to_dict()
return build_config
Data conditional router
This component routes Data objects based on a condition applied to a specified key, including boolean validation.
Parameters
Name | Type | Description |
---|---|---|
data_input |
Data |
The data object or list of data objects to process. |
key_name |
String |
The name of the key in the data object to check. |
operator |
Dropdown |
The operator to apply for comparing the values. |
compare_value |
String |
The value to compare against (not used for boolean validator). |
Name | Type | Description |
---|---|---|
true_output |
Data/List |
Output when the condition is met. |
false_output |
Data/List |
Output when the condition is not met. |
Component code
data_conditional_router.py
from typing import Any
from langflow.custom import Component
from langflow.io import DataInput, DropdownInput, MessageTextInput, Output
from langflow.schema import Data, dotdict
class DataConditionalRouterComponent(Component):
display_name = "Condition"
description = "Route Data object(s) based on a condition applied to a specified key, including boolean validation."
icon = "split"
name = "DataConditionalRouter"
legacy = True
inputs = [
DataInput(
name="data_input",
display_name="Data Input",
info="The Data object or list of Data objects to process",
is_list=True,
),
MessageTextInput(
name="key_name",
display_name="Key Name",
info="The name of the key in the Data object(s) to check",
),
DropdownInput(
name="operator",
display_name="Operator",
options=["equals", "not equals", "contains", "starts with", "ends with", "boolean validator"],
info="The operator to apply for comparing the values. 'boolean validator' treats the value as a boolean.",
value="equals",
),
MessageTextInput(
name="compare_value",
display_name="Match Text",
info="The value to compare against (not used for boolean validator)",
),
]
outputs = [
Output(display_name="True Output", name="true_output", method="process_data"),
Output(display_name="False Output", name="false_output", method="process_data"),
]
def compare_values(self, item_value: str, compare_value: str, operator: str) -> bool:
if operator == "equals":
return item_value == compare_value
if operator == "not equals":
return item_value != compare_value
if operator == "contains":
return compare_value in item_value
if operator == "starts with":
return item_value.startswith(compare_value)
if operator == "ends with":
return item_value.endswith(compare_value)
if operator == "boolean validator":
return self.parse_boolean(item_value)
return False
def parse_boolean(self, value):
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.lower() in {"true", "1", "yes", "y", "on"}
return bool(value)
def validate_input(self, data_item: Data) -> bool:
if not isinstance(data_item, Data):
self.status = "Input is not a Data object"
return False
if self.key_name not in data_item.data:
self.status = f"Key '{self.key_name}' not found in Data"
return False
return True
def process_data(self) -> Data | list[Data]:
if isinstance(self.data_input, list):
true_output = []
false_output = []
for item in self.data_input:
if self.validate_input(item):
result = self.process_single_data(item)
if result:
true_output.append(item)
else:
false_output.append(item)
self.stop("false_output" if true_output else "true_output")
return true_output or false_output
if not self.validate_input(self.data_input):
return Data(data={"error": self.status})
result = self.process_single_data(self.data_input)
self.stop("false_output" if result else "true_output")
return self.data_input
def process_single_data(self, data_item: Data) -> bool:
item_value = data_item.data[self.key_name]
operator = self.operator
if operator == "boolean validator":
condition_met = self.parse_boolean(item_value)
condition_description = f"Boolean validation of '{self.key_name}'"
else:
compare_value = self.compare_value
condition_met = self.compare_values(str(item_value), compare_value, operator)
condition_description = f"{self.key_name} {operator} {compare_value}"
if condition_met:
self.status = f"Condition met: {condition_description}"
return True
self.status = f"Condition not met: {condition_description}"
return False
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
if field_name == "operator":
if field_value == "boolean validator":
build_config["compare_value"]["show"] = False
build_config["compare_value"]["advanced"] = True
build_config["compare_value"]["value"] = None
else:
build_config["compare_value"]["show"] = True
build_config["compare_value"]["advanced"] = False
return build_config
Flow as Tool
This component is deprecated as of Langflow version 1.1.2. Instead, use the Run flow component. |
Parameters
Name | Type | Description |
---|---|---|
flow_name |
Dropdown |
The name of the flow to run. |
tool_name |
String |
The name of the tool. |
tool_description |
String |
The description of the tool. |
return_direct |
Boolean |
If true, returns the result directly from the tool. |
Name | Type | Description |
---|---|---|
api_build_tool |
Tool |
The constructed tool from the flow. |
Component code
flow_tool.py
from typing import Any
from loguru import logger
from typing_extensions import override
from langflow.base.langchain_utilities.model import LCToolComponent
from langflow.base.tools.flow_tool import FlowTool
from langflow.field_typing import Tool
from langflow.graph.graph.base import Graph
from langflow.helpers.flow import get_flow_inputs
from langflow.io import BoolInput, DropdownInput, Output, StrInput
from langflow.schema import Data
from langflow.schema.dotdict import dotdict
class FlowToolComponent(LCToolComponent):
display_name = "Flow as Tool [Deprecated]"
description = "Construct a Tool from a function that runs the loaded Flow."
field_order = ["flow_name", "name", "description", "return_direct"]
trace_type = "tool"
name = "FlowTool"
legacy: bool = True
icon = "hammer"
async def get_flow_names(self) -> list[str]:
flow_datas = await self.alist_flows()
return [flow_data.data["name"] for flow_data in flow_datas]
async def get_flow(self, flow_name: str) -> Data | None:
"""Retrieves a flow by its name.
Args:
flow_name (str): The name of the flow to retrieve.
Returns:
Optional[Text]: The flow record if found, None otherwise.
"""
flow_datas = await self.alist_flows()
for flow_data in flow_datas:
if flow_data.data["name"] == flow_name:
return flow_data
return None
@override
async def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
if field_name == "flow_name":
build_config["flow_name"]["options"] = self.get_flow_names()
return build_config
inputs = [
DropdownInput(
name="flow_name", display_name="Flow Name", info="The name of the flow to run.", refresh_button=True
),
StrInput(
name="tool_name",
display_name="Name",
info="The name of the tool.",
),
StrInput(
name="tool_description",
display_name="Description",
info="The description of the tool; defaults to the Flow's description.",
),
BoolInput(
name="return_direct",
display_name="Return Direct",
info="Return the result directly from the Tool.",
advanced=True,
),
]
outputs = [
Output(name="api_build_tool", display_name="Tool", method="build_tool"),
]
async def build_tool(self) -> Tool:
FlowTool.model_rebuild()
if "flow_name" not in self._attributes or not self._attributes["flow_name"]:
msg = "Flow name is required"
raise ValueError(msg)
flow_name = self._attributes["flow_name"]
flow_data = await self.get_flow(flow_name)
if not flow_data:
msg = "Flow not found."
raise ValueError(msg)
graph = Graph.from_payload(
flow_data.data["data"],
user_id=str(self.user_id),
)
try:
graph.set_run_id(self.graph.run_id)
except Exception: # noqa: BLE001
logger.opt(exception=True).warning("Failed to set run_id")
inputs = get_flow_inputs(graph)
tool_description = self.tool_description.strip() or flow_data.description
tool = FlowTool(
name=self.tool_name,
description=tool_description,
graph=graph,
return_direct=self.return_direct,
inputs=inputs,
flow_id=str(flow_data.id),
user_id=str(self.user_id),
session_id=self.graph.session_id if hasattr(self, "graph") else None,
)
description_repr = repr(tool.description).strip("'")
args_str = "\n".join([f"- {arg_name}: {arg_data['description']}" for arg_name, arg_data in tool.args.items()])
self.status = f"{description_repr}\nArguments:\n{args_str}"
return tool
Listen
This component listens for a notification and retrieves its associated state.
Parameters
Name | Type | Description |
---|---|---|
name |
String |
The name of the notification to listen for. |
Name | Type | Description |
---|---|---|
output |
Data |
The state associated with the notification. |
Component code
listen.py
from langflow.custom import CustomComponent
from langflow.schema import Data
class ListenComponent(CustomComponent):
display_name = "Listen"
description = "A component to listen for a notification."
name = "Listen"
beta: bool = True
icon = "Radio"
def build_config(self):
return {
"name": {
"display_name": "Name",
"info": "The name of the notification to listen for.",
},
}
def build(self, name: str) -> Data:
state = self.get_state(name)
self._set_successors_ids()
self.status = state
return state
def _set_successors_ids(self):
self._vertex.is_state = True
successors = self._vertex.graph.successor_map.get(self._vertex.id, [])
return successors + self._vertex.graph.activated_vertices
Loop
This component iterates over a list of Data
objects, outputting one item at a time and aggregating results from loop inputs.
Parameters
Name | Type | Description |
---|---|---|
data |
Data/List |
The initial list of Data objects to iterate over. |
Name | Type | Description |
---|---|---|
item |
Data |
Outputs one item at a time from the data list. |
done |
Data |
Triggered when iteration complete, returns aggregated results. |
Component code
loop.py
from langflow.custom import Component
from langflow.io import DataInput, Output
from langflow.schema import Data
class LoopComponent(Component):
display_name = "Loop"
description = (
"Iterates over a list of Data objects, outputting one item at a time and aggregating results from loop inputs."
)
icon = "infinity"
inputs = [
DataInput(
name="data",
display_name="Data",
info="The initial list of Data objects to iterate over.",
),
]
outputs = [
Output(display_name="Item", name="item", method="item_output", allows_loop=True),
Output(display_name="Done", name="done", method="done_output"),
]
def initialize_data(self) -> None:
"""Initialize the data list, context index, and aggregated list."""
if self.ctx.get(f"{self._id}_initialized", False):
return
# Ensure data is a list of Data objects
data_list = self._validate_data(self.data)
# Store the initial data and context variables
self.update_ctx(
{
f"{self._id}_data": data_list,
f"{self._id}_index": 0,
f"{self._id}_aggregated": [],
f"{self._id}_initialized": True,
}
)
def _validate_data(self, data):
"""Validate and return a list of Data objects."""
if isinstance(data, Data):
return [data]
if isinstance(data, list) and all(isinstance(item, Data) for item in data):
return data
msg = "The 'data' input must be a list of Data objects or a single Data object."
raise TypeError(msg)
def evaluate_stop_loop(self) -> bool:
"""Evaluate whether to stop item or done output."""
current_index = self.ctx.get(f"{self._id}_index", 0)
data_length = len(self.ctx.get(f"{self._id}_data", []))
return current_index > data_length
def item_output(self) -> Data:
"""Output the next item in the list or stop if done."""
self.initialize_data()
current_item = Data(text="")
if self.evaluate_stop_loop():
self.stop("item")
return Data(text="")
# Get data list and current index
data_list, current_index = self.loop_variables()
if current_index < len(data_list):
# Output current item and increment index
try:
current_item = data_list[current_index]
except IndexError:
current_item = Data(text="")
self.aggregated_output()
self.update_ctx({f"{self._id}_index": current_index + 1})
return current_item
def done_output(self) -> Data:
"""Trigger the done output when iteration is complete."""
self.initialize_data()
if self.evaluate_stop_loop():
self.stop("item")
self.start("done")
return self.ctx.get(f"{self._id}_aggregated", [])
self.stop("done")
return Data(text="")
def loop_variables(self):
"""Retrieve loop variables from context."""
return (
self.ctx.get(f"{self._id}_data", []),
self.ctx.get(f"{self._id}_index", 0),
)
def aggregated_output(self) -> Data:
"""Return the aggregated list once all items are processed."""
self.initialize_data()
# Get data list and aggregated list
data_list = self.ctx.get(f"{self._id}_data", [])
aggregated = self.ctx.get(f"{self._id}_aggregated", [])
# Check if loop input is provided and append to aggregated list
if self.item is not None and not isinstance(self.item, str) and len(aggregated) <= len(data_list):
aggregated.append(self.item)
self.update_ctx({f"{self._id}_aggregated": aggregated})
return aggregated
Notify
This component generates a notification for the Listen component to use.
Parameters
Name | Type | Description |
---|---|---|
name |
String |
The name of the notification. |
data |
Data |
The data to store in the notification. |
append |
Boolean |
If true, the record will be appended to the existing notification. |
Name | Type | Description |
---|---|---|
output |
Data |
The data stored in the notification. |
Component code
notify.py
from langflow.custom import CustomComponent
from langflow.schema import Data
class NotifyComponent(CustomComponent):
display_name = "Notify"
description = "A component to generate a notification to Get Notified component."
icon = "Notify"
name = "Notify"
beta: bool = True
def build_config(self):
return {
"name": {"display_name": "Name", "info": "The name of the notification."},
"data": {"display_name": "Data", "info": "The data to store."},
"append": {
"display_name": "Append",
"info": "If True, the record will be appended to the notification.",
},
}
def build(self, name: str, *, data: Data | None = None, append: bool = False) -> Data:
if data and not isinstance(data, Data):
if isinstance(data, str):
data = Data(text=data)
elif isinstance(data, dict):
data = Data(data=data)
else:
data = Data(text=str(data))
elif not data:
data = Data(text="")
if data:
if append:
self.append_state(name, data)
else:
self.update_state(name, data)
else:
self.status = "No record provided."
self.status = data
self._set_successors_ids()
return data
def _set_successors_ids(self):
self._vertex.is_state = True
successors = self._vertex.graph.successor_map.get(self._vertex.id, [])
return successors + self._vertex.graph.activated_vertices
Pass message
This component forwards the input message, unchanged.
Parameters
Name | Display Name | Info |
---|---|---|
input_message |
Input Message |
The message to be passed forward. |
ignored_message |
Ignored Message |
A second message to be ignored. Used as a workaround for continuity. |
Name | Display Name | Info |
---|---|---|
output_message |
Output Message |
The forwarded input message, unchanged. |
Component code
pass_message.py
from langflow.custom import Component
from langflow.io import MessageInput
from langflow.schema.message import Message
from langflow.template import Output
class PassMessageComponent(Component):
display_name = "Pass"
description = "Forwards the input message, unchanged."
name = "Pass"
icon = "arrow-right"
inputs = [
MessageInput(
name="input_message",
display_name="Input Message",
info="The message to be passed forward.",
required=True,
),
MessageInput(
name="ignored_message",
display_name="Ignored Message",
info="A second message to be ignored. Used as a workaround for continuity.",
advanced=True,
),
]
outputs = [
Output(display_name="Output Message", name="output_message", method="pass_message"),
]
def pass_message(self) -> Message:
self.status = self.input_message
return self.input_message
Run flow
This component runs a specified flow within a larger workflow with custom inputs and tweaks.
The Run Flow component can also be used as a tool when connected to an Agent. The name
and description
metadata that the Agent uses to register the tool are created automatically.
When you select a flow, the component fetches the flow’s graph structure and uses it to generate the inputs and outputs for the Run Flow component.
To use the Run Flow component as a tool, do the following:
-
Add the Run Flow component to the Simple agent flow.
-
In the Flow Name menu, select the sub-flow you want to run. The appearance of the Run Flow component changes to reflect the inputs and outputs of the selected flow.
-
On the Run Flow component, enable Tool Mode.
-
Connect the Run Flow component to the Toolset input of the Agent. Your flow should now look like this:
-
Run the flow. The Agent uses the Run Flow component as a tool to run the selected sub-flow.
Parameters
Name | Type | Description |
---|---|---|
input_value |
String |
The input value for the flow to process. |
flow_name |
Dropdown |
The name of the flow to run. |
tweaks |
Nested Dict |
Tweaks to apply to the flow. |
Name | Type | Description |
---|---|---|
run_outputs |
List[Data] |
The results generated from running the flow. |
Component code
run_flow.py
from typing import Any
from loguru import logger
from langflow.base.tools.run_flow import RunFlowBaseComponent
from langflow.helpers.flow import run_flow
from langflow.schema import dotdict
class RunFlowComponent(RunFlowBaseComponent):
display_name = "Run Flow"
description = (
"Creates a tool component from a Flow that takes all its inputs and runs it. "
" \n **Select a Flow to use the tool mode**"
)
beta = True
name = "RunFlow"
icon = "Workflow"
inputs = RunFlowBaseComponent._base_inputs
outputs = RunFlowBaseComponent._base_outputs
async def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
if field_name == "flow_name_selected":
build_config["flow_name_selected"]["options"] = await self.get_flow_names()
missing_keys = [key for key in self.default_keys if key not in build_config]
if missing_keys:
msg = f"Missing required keys in build_config: {missing_keys}"
raise ValueError(msg)
if field_value is not None:
try:
graph = await self.get_graph(field_value)
build_config = self.update_build_config_from_graph(build_config, graph)
except Exception as e:
msg = f"Error building graph for flow {field_value}"
logger.exception(msg)
raise RuntimeError(msg) from e
return build_config
async def run_flow_with_tweaks(self):
tweaks: dict = {}
flow_name_selected = self._attributes.get("flow_name_selected")
parsed_flow_tweak_data = self._attributes.get("flow_tweak_data", {})
if not isinstance(parsed_flow_tweak_data, dict):
parsed_flow_tweak_data = parsed_flow_tweak_data.dict()
if parsed_flow_tweak_data != {}:
for field in parsed_flow_tweak_data:
if "~" in field:
[node, name] = field.split("~")
if node not in tweaks:
tweaks[node] = {}
tweaks[node][name] = parsed_flow_tweak_data[field]
else:
for field in self._attributes:
if field not in self.default_keys and "~" in field:
[node, name] = field.split("~")
if node not in tweaks:
tweaks[node] = {}
tweaks[node][name] = self._attributes[field]
return await run_flow(
inputs=None,
output_type="all",
flow_id=None,
flow_name=flow_name_selected,
tweaks=tweaks,
user_id=str(self.user_id),
session_id=self.graph.session_id or self.session_id,
)
Sub Flow
This component is deprecated as of Langflow version 1.1.2. Instead, use the Run flow component. |
This component integrates entire flows as components. It dynamically generates inputs based on the selected flow and executes the flow with provided parameters.
Parameters
Name | Type | Description |
---|---|---|
flow_name |
Dropdown |
The name of the flow to run. |
Name | Type | Description |
---|---|---|
flow_outputs |
List[Data] |
The outputs generated from the flow. |
Component code
sub_flow.py
from typing import Any
from loguru import logger
from langflow.base.flow_processing.utils import build_data_from_result_data
from langflow.custom import Component
from langflow.graph.graph.base import Graph
from langflow.graph.vertex.base import Vertex
from langflow.helpers.flow import get_flow_inputs
from langflow.io import DropdownInput, Output
from langflow.schema import Data, dotdict
class SubFlowComponent(Component):
display_name = "Sub Flow [Deprecated]"
description = "Generates a Component from a Flow, with all of its inputs, and "
name = "SubFlow"
legacy: bool = True
icon = "Workflow"
async def get_flow_names(self) -> list[str]:
flow_data = await self.alist_flows()
return [flow_data.data["name"] for flow_data in flow_data]
async def get_flow(self, flow_name: str) -> Data | None:
flow_datas = await self.alist_flows()
for flow_data in flow_datas:
if flow_data.data["name"] == flow_name:
return flow_data
return None
async def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
if field_name == "flow_name":
build_config["flow_name"]["options"] = await self.get_flow_names()
for key in list(build_config.keys()):
if key not in [x.name for x in self.inputs] + ["code", "_type", "get_final_results_only"]:
del build_config[key]
if field_value is not None and field_name == "flow_name":
try:
flow_data = await self.get_flow(field_value)
except Exception: # noqa: BLE001
logger.exception(f"Error getting flow {field_value}")
else:
if not flow_data:
msg = f"Flow {field_value} not found."
logger.error(msg)
else:
try:
graph = Graph.from_payload(flow_data.data["data"])
# Get all inputs from the graph
inputs = get_flow_inputs(graph)
# Add inputs to the build config
build_config = self.add_inputs_to_build_config(inputs, build_config)
except Exception: # noqa: BLE001
logger.exception(f"Error building graph for flow {field_value}")
return build_config
def add_inputs_to_build_config(self, inputs_vertex: list[Vertex], build_config: dotdict):
new_fields: list[dotdict] = []
for vertex in inputs_vertex:
new_vertex_inputs = []
field_template = vertex.data["node"]["template"]
for inp in field_template:
if inp not in {"code", "_type"}:
field_template[inp]["display_name"] = (
vertex.display_name + " - " + field_template[inp]["display_name"]
)
field_template[inp]["name"] = vertex.id + "|" + inp
new_vertex_inputs.append(field_template[inp])
new_fields += new_vertex_inputs
for field in new_fields:
build_config[field["name"]] = field
return build_config
inputs = [
DropdownInput(
name="flow_name",
display_name="Flow Name",
info="The name of the flow to run.",
options=[],
refresh_button=True,
real_time_refresh=True,
),
]
outputs = [Output(name="flow_outputs", display_name="Flow Outputs", method="generate_results")]
async def generate_results(self) -> list[Data]:
tweaks: dict = {}
for field in self._attributes:
if field != "flow_name" and "|" in field:
[node, name] = field.split("|")
if node not in tweaks:
tweaks[node] = {}
tweaks[node][name] = self._attributes[field]
flow_name = self._attributes.get("flow_name")
run_outputs = await self.run_flow(
tweaks=tweaks,
flow_name=flow_name,
output_type="all",
)
data: list[Data] = []
if not run_outputs:
return data
run_output = run_outputs[0]
if run_output is not None:
for output in run_output.outputs:
if output:
data.extend(build_data_from_result_data(output))
return data