Processing components in Langflow
This Langflow feature is currently in public preview. Development is ongoing, and the features and functionality are subject to change. Langflow, and the use of such, is subject to the DataStax Preview Terms. |
Processing components process and transform data within a flow.
Use a processing component in a flow
The Split Text processing component in this flow splits the incoming Data
into chunks to be embedded into the vector store component.
The component offers control over chunk size, overlap, and separator, which affect context and granularity in vector store retrieval results.

Alter Metadata
This component is in Legacy. Legacy components can be used in flows, but may not work due to Langflow core updates. |
This component modifies metadata of input objects. It can add new metadata, update existing metadata, and remove specified metadata fields. The component works with both Message
and Data
objects, and can also create a new Data
object from user-provided text.
Parameters
Name | Display Name | Info |
---|---|---|
input_value |
Input |
Objects to which Metadata should be added. |
text_in |
User Text |
Text input; the value will be in the 'text' attribute of the Data object. Empty text entries are ignored. |
metadata |
Metadata |
Metadata to add to each object. |
remove_fields |
Fields to Remove |
Metadata fields to remove. |
Name | Display Name | Info |
---|---|---|
data |
Data |
List of Input objects, each with added Metadata. |
Component code
alter_metadata.py
from langflow.custom import Component
from langflow.inputs import MessageTextInput
from langflow.io import HandleInput, NestedDictInput, Output, StrInput
from langflow.schema import Data, DataFrame
class AlterMetadataComponent(Component):
display_name = "Alter Metadata"
description = "Adds/Removes Metadata Dictionary on inputs"
icon = "merge"
name = "AlterMetadata"
legacy = True
inputs = [
HandleInput(
name="input_value",
display_name="Input",
info="Object(s) to which Metadata should be added",
required=False,
input_types=["Message", "Data"],
is_list=True,
),
StrInput(
name="text_in",
display_name="User Text",
info="Text input; value will be in 'text' attribute of Data object. Empty text entries are ignored.",
required=False,
),
NestedDictInput(
name="metadata",
display_name="Metadata",
info="Metadata to add to each object",
input_types=["Data"],
required=True,
),
MessageTextInput(
name="remove_fields",
display_name="Fields to Remove",
info="Metadata Fields to Remove",
required=False,
is_list=True,
),
]
outputs = [
Output(
name="data",
display_name="Data",
info="List of Input objects each with added Metadata",
method="process_output",
),
Output(
display_name="DataFrame",
name="dataframe",
info="Data objects as a DataFrame, with metadata as columns",
method="as_dataframe",
),
]
def _as_clean_dict(self, obj):
"""Convert a Data object or a standard dictionary to a standard dictionary."""
if isinstance(obj, dict):
as_dict = obj
elif isinstance(obj, Data):
as_dict = obj.data
else:
msg = f"Expected a Data object or a dictionary but got {type(obj)}."
raise TypeError(msg)
return {k: v for k, v in (as_dict or {}).items() if k and k.strip()}
def process_output(self) -> list[Data]:
# Ensure metadata is a dictionary, filtering out any empty keys
metadata = self._as_clean_dict(self.metadata)
# Convert text_in to a Data object if it exists, and initialize our list of Data objects
data_objects = [Data(text=self.text_in)] if self.text_in else []
# Append existing Data objects from input_value, if any
if self.input_value:
data_objects.extend(self.input_value)
# Update each Data object with the new metadata, preserving existing fields
for data in data_objects:
data.data.update(metadata)
# Handle removal of fields specified in remove_fields
if self.remove_fields:
fields_to_remove = {field.strip() for field in self.remove_fields if field.strip()}
# Remove specified fields from each Data object's metadata
for data in data_objects:
data.data = {k: v for k, v in data.data.items() if k not in fields_to_remove}
# Set the status for tracking/debugging purposes
self.status = data_objects
return data_objects
def as_dataframe(self) -> DataFrame:
"""Convert the processed data objects into a DataFrame.
Returns:
DataFrame: A DataFrame where each row corresponds to a Data object,
with metadata fields as columns.
"""
data_list = self.process_output()
return DataFrame(data_list)
Combine data
Prior to Langflow version 1.1.3, this component was named Merge Data. |
This component combines multiple data objects into a unified list of data objects.
Parameters
Name | Display Name | Info |
---|---|---|
data_inputs |
data Inputs |
A list of data input objects to be merged. |
Name | Display Name | Info |
---|---|---|
merged_data |
Merged data |
The resulting list of merged data objects with consistent keys. |
Component code
merge_data.py
from enum import Enum
from typing import cast
from loguru import logger
from langflow.custom import Component
from langflow.io import DataInput, DropdownInput, Output
from langflow.schema import DataFrame
class DataOperation(str, Enum):
CONCATENATE = "Concatenate"
APPEND = "Append"
MERGE = "Merge"
JOIN = "Join"
class MergeDataComponent(Component):
display_name = "Combine Data"
description = "Combines data using different operations"
icon = "merge"
MIN_INPUTS_REQUIRED = 2
inputs = [
DataInput(name="data_inputs", display_name="Data Inputs", info="Data to combine", is_list=True, required=True),
DropdownInput(
name="operation",
display_name="Operation Type",
options=[op.value for op in DataOperation],
value=DataOperation.CONCATENATE.value,
),
]
outputs = [Output(display_name="DataFrame", name="combined_data", method="combine_data")]
def combine_data(self) -> DataFrame:
if not self.data_inputs or len(self.data_inputs) < self.MIN_INPUTS_REQUIRED:
empty_dataframe = DataFrame()
self.status = empty_dataframe
return empty_dataframe
operation = DataOperation(self.operation)
try:
combined_dataframe = self._process_operation(operation)
self.status = combined_dataframe
except Exception as e:
logger.error(f"Error during operation {operation}: {e!s}")
raise
else:
return combined_dataframe
def _process_operation(self, operation: DataOperation) -> DataFrame:
if operation == DataOperation.CONCATENATE:
combined_data: dict[str, str | object] = {}
for data_input in self.data_inputs:
for key, value in data_input.data.items():
if key in combined_data:
if isinstance(combined_data[key], str) and isinstance(value, str):
combined_data[key] = f"{combined_data[key]}\n{value}"
else:
combined_data[key] = value
else:
combined_data[key] = value
return DataFrame([combined_data])
if operation == DataOperation.APPEND:
rows = [data_input.data for data_input in self.data_inputs]
return DataFrame(rows)
if operation == DataOperation.MERGE:
result_data: dict[str, str | list[str] | object] = {}
for data_input in self.data_inputs:
for key, value in data_input.data.items():
if key in result_data and isinstance(value, str):
if isinstance(result_data[key], list):
cast("list[str]", result_data[key]).append(value)
else:
result_data[key] = [result_data[key], value]
else:
result_data[key] = value
return DataFrame([result_data])
if operation == DataOperation.JOIN:
combined_data = {}
for idx, data_input in enumerate(self.data_inputs, 1):
for key, value in data_input.data.items():
new_key = f"{key}_doc{idx}" if idx > 1 else key
combined_data[new_key] = value
return DataFrame([combined_data])
return DataFrame()
Combine Text
This component concatenates two text sources into a single text chunk using a specified delimiter.
Parameters
Name | Display Name | Info |
---|---|---|
first_text |
First Text |
The first text input to concatenate. |
second_text |
Second Text |
The second text input to concatenate. |
delimiter |
Delimiter |
A string used to separate the two text inputs. Defaults to a space. |
Name | Display Name | Info |
---|---|---|
message |
Message |
A Message object containing the combined text. |
Component code
combine_text.py
from langflow.custom import Component
from langflow.io import MessageTextInput, Output
from langflow.schema.message import Message
class CombineTextComponent(Component):
display_name = "Combine Text"
description = "Concatenate two text sources into a single text chunk using a specified delimiter."
icon = "merge"
name = "CombineText"
inputs = [
MessageTextInput(
name="text1",
display_name="First Text",
info="The first text input to concatenate.",
),
MessageTextInput(
name="text2",
display_name="Second Text",
info="The second text input to concatenate.",
),
MessageTextInput(
name="delimiter",
display_name="Delimiter",
info="A string used to separate the two text inputs. Defaults to a whitespace.",
value=" ",
),
]
outputs = [
Output(display_name="Combined Text", name="combined_text", method="combine_texts"),
]
def combine_texts(self) -> Message:
combined = self.delimiter.join([self.text1, self.text2])
self.status = combined
return Message(text=combined)
Create data
This component is in Legacy as of Langflow version 1.1.3. Legacy components can be used in flows, but may not work due to Langflow core updates. |
This component dynamically creates a Data
object with a specified number of fields.
Parameters
Name | Display Name | Info |
---|---|---|
number_of_fields |
Number of Fields |
The number of fields to be added to the record. |
text_key |
Text Key |
Key that identifies the field to be used as the text content. |
text_key_validator |
Text Key Validator |
If enabled, checks if the given 'Text Key' is present in the given 'Data'. |
Name | Display Name | Info |
---|---|---|
data |
Data |
A |
Component code
create_data.py
from typing import Any
from langflow.custom import Component
from langflow.field_typing.range_spec import RangeSpec
from langflow.inputs.inputs import BoolInput, DictInput, IntInput, MessageTextInput
from langflow.io import Output
from langflow.schema import Data
from langflow.schema.dotdict import dotdict
class CreateDataComponent(Component):
display_name: str = "Create Data"
description: str = "Dynamically create a Data with a specified number of fields."
name: str = "CreateData"
MAX_FIELDS = 15 # Define a constant for maximum number of fields
legacy = True
icon = "ListFilter"
inputs = [
IntInput(
name="number_of_fields",
display_name="Number of Fields",
info="Number of fields to be added to the record.",
real_time_refresh=True,
value=1,
range_spec=RangeSpec(min=1, max=MAX_FIELDS, step=1, step_type="int"),
),
MessageTextInput(
name="text_key",
display_name="Text Key",
info="Key that identifies the field to be used as the text content.",
advanced=True,
),
BoolInput(
name="text_key_validator",
display_name="Text Key Validator",
advanced=True,
info="If enabled, checks if the given 'Text Key' is present in the given 'Data'.",
),
]
outputs = [
Output(display_name="Data", name="data", method="build_data"),
]
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
if field_name == "number_of_fields":
default_keys = ["code", "_type", "number_of_fields", "text_key", "text_key_validator"]
try:
field_value_int = int(field_value)
except ValueError:
return build_config
existing_fields = {}
if field_value_int > self.MAX_FIELDS:
build_config["number_of_fields"]["value"] = self.MAX_FIELDS
msg = (
f"Number of fields cannot exceed {self.MAX_FIELDS}. "
"Please adjust the number of fields to be within the allowed limit."
)
raise ValueError(msg)
if len(build_config) > len(default_keys):
# back up the existing template fields
for key in build_config.copy():
if key not in default_keys:
existing_fields[key] = build_config.pop(key)
for i in range(1, field_value_int + 1):
key = f"field_{i}_key"
if key in existing_fields:
field = existing_fields[key]
build_config[key] = field
else:
field = DictInput(
display_name=f"Field {i}",
name=key,
info=f"Key for field {i}.",
input_types=["Message", "Data"],
)
build_config[field.name] = field.to_dict()
build_config["number_of_fields"]["value"] = field_value_int
return build_config
async def build_data(self) -> Data:
data = self.get_data()
return_data = Data(data=data, text_key=self.text_key)
self.status = return_data
if self.text_key_validator:
self.validate_text_key()
return return_data
def get_data(self):
"""Function to get the Data from the attributes."""
data = {}
for value_dict in self._attributes.values():
if isinstance(value_dict, dict):
# Check if the value of the value_dict is a Data
value_dict_ = {
key: value.get_text() if isinstance(value, Data) else value for key, value in value_dict.items()
}
data.update(value_dict_)
return data
def validate_text_key(self) -> None:
"""This function validates that the Text Key is one of the keys in the Data."""
data_keys = self.get_data().keys()
if self.text_key not in data_keys and self.text_key != "":
formatted_data_keys = ", ".join(data_keys)
msg = f"Text Key: '{self.text_key}' not found in the Data keys: '{formatted_data_keys}'"
raise ValueError(msg)
DataFrame operations
This component performs various operations on Pandas DataFrames. Here’s an overview of the available operations and their inputs:
Operation | Description | Required Inputs |
---|---|---|
Add Column |
Adds a new column with a constant value |
new_column_name, new_column_value |
Drop Column |
Removes a specified column |
column_name |
Filter |
Filters rows based on column value |
column_name, filter_value |
Head |
Returns first n rows |
num_rows |
Rename Column |
Renames an existing column |
column_name, new_column_name |
Replace Value |
Replaces values in a column |
column_name, replace_value, replacement_value |
Select Columns |
Selects specific columns |
columns_to_select |
Sort |
Sorts DataFrame by column |
column_name, ascending |
Tail |
Returns last n rows |
num_rows |
Parameters
Name | Display Name | Info |
---|---|---|
df |
DataFrame |
The input DataFrame to operate on. |
operation |
Operation |
Select the DataFrame operation to perform. Options: Add Column, Drop Column, Filter, Head, Rename Column, Replace Value, Select Columns, Sort, Tail |
column_name |
Column Name |
The column name to use for the operation. |
filter_value |
Filter Value |
The value to filter rows by. |
ascending |
Sort Ascending |
Whether to sort in ascending order. |
new_column_name |
New Column Name |
The new column name when renaming or adding a column. |
new_column_value |
New Column Value |
The value to populate the new column with. |
columns_to_select |
Columns to Select |
List of column names to select. |
num_rows |
Number of Rows |
Number of rows to return (for head/tail). Default: 5 |
replace_value |
Value to Replace |
The value to replace in the column. |
replacement_value |
Replacement Value |
The value to replace with. |
Name | Display Name | Info |
---|---|---|
output |
DataFrame |
The resulting DataFrame after the operation. |
Component code
dataframe_operations.py
from langflow.custom import Component
from langflow.io import BoolInput, DataFrameInput, DropdownInput, IntInput, MessageTextInput, Output, StrInput
from langflow.schema import DataFrame
class DataFrameOperationsComponent(Component):
display_name = "DataFrame Operations"
description = "Perform various operations on a DataFrame."
icon = "table"
# Available operations
OPERATION_CHOICES = [
"Add Column",
"Drop Column",
"Filter",
"Head",
"Rename Column",
"Replace Value",
"Select Columns",
"Sort",
"Tail",
]
inputs = [
DataFrameInput(
name="df",
display_name="DataFrame",
info="The input DataFrame to operate on.",
),
DropdownInput(
name="operation",
display_name="Operation",
options=OPERATION_CHOICES,
info="Select the DataFrame operation to perform.",
real_time_refresh=True,
),
StrInput(
name="column_name",
display_name="Column Name",
info="The column name to use for the operation.",
dynamic=True,
show=False,
),
MessageTextInput(
name="filter_value",
display_name="Filter Value",
info="The value to filter rows by.",
dynamic=True,
show=False,
),
BoolInput(
name="ascending",
display_name="Sort Ascending",
info="Whether to sort in ascending order.",
dynamic=True,
show=False,
value=True,
),
StrInput(
name="new_column_name",
display_name="New Column Name",
info="The new column name when renaming or adding a column.",
dynamic=True,
show=False,
),
MessageTextInput(
name="new_column_value",
display_name="New Column Value",
info="The value to populate the new column with.",
dynamic=True,
show=False,
),
StrInput(
name="columns_to_select",
display_name="Columns to Select",
dynamic=True,
is_list=True,
show=False,
),
IntInput(
name="num_rows",
display_name="Number of Rows",
info="Number of rows to return (for head/tail).",
dynamic=True,
show=False,
value=5,
),
MessageTextInput(
name="replace_value",
display_name="Value to Replace",
info="The value to replace in the column.",
dynamic=True,
show=False,
),
MessageTextInput(
name="replacement_value",
display_name="Replacement Value",
info="The value to replace with.",
dynamic=True,
show=False,
),
]
outputs = [
Output(
display_name="DataFrame",
name="output",
method="perform_operation",
info="The resulting DataFrame after the operation.",
)
]
def update_build_config(self, build_config, field_value, field_name=None):
# Hide all dynamic fields by default
dynamic_fields = [
"column_name",
"filter_value",
"ascending",
"new_column_name",
"new_column_value",
"columns_to_select",
"num_rows",
"replace_value",
"replacement_value",
]
for field in dynamic_fields:
build_config[field]["show"] = False
# Show relevant fields based on the selected operation
if field_name == "operation":
if field_value == "Filter":
build_config["column_name"]["show"] = True
build_config["filter_value"]["show"] = True
elif field_value == "Sort":
build_config["column_name"]["show"] = True
build_config["ascending"]["show"] = True
elif field_value == "Drop Column":
build_config["column_name"]["show"] = True
elif field_value == "Rename Column":
build_config["column_name"]["show"] = True
build_config["new_column_name"]["show"] = True
elif field_value == "Add Column":
build_config["new_column_name"]["show"] = True
build_config["new_column_value"]["show"] = True
elif field_value == "Select Columns":
build_config["columns_to_select"]["show"] = True
elif field_value in {"Head", "Tail"}:
build_config["num_rows"]["show"] = True
elif field_value == "Replace Value":
build_config["column_name"]["show"] = True
build_config["replace_value"]["show"] = True
build_config["replacement_value"]["show"] = True
return build_config
def perform_operation(self) -> DataFrame:
dataframe_copy = self.df.copy()
operation = self.operation
if operation == "Filter":
return self.filter_rows_by_value(dataframe_copy)
if operation == "Sort":
return self.sort_by_column(dataframe_copy)
if operation == "Drop Column":
return self.drop_column(dataframe_copy)
if operation == "Rename Column":
return self.rename_column(dataframe_copy)
if operation == "Add Column":
return self.add_column(dataframe_copy)
if operation == "Select Columns":
return self.select_columns(dataframe_copy)
if operation == "Head":
return self.head(dataframe_copy)
if operation == "Tail":
return self.tail(dataframe_copy)
if operation == "Replace Value":
return self.replace_values(dataframe_copy)
msg = f"Unsupported operation: {operation}"
raise ValueError(msg)
# Existing methods
def filter_rows_by_value(self, df: DataFrame) -> DataFrame:
return DataFrame(df[df[self.column_name] == self.filter_value])
def sort_by_column(self, df: DataFrame) -> DataFrame:
return DataFrame(df.sort_values(by=self.column_name, ascending=self.ascending))
def drop_column(self, df: DataFrame) -> DataFrame:
return DataFrame(df.drop(columns=[self.column_name]))
def rename_column(self, df: DataFrame) -> DataFrame:
return DataFrame(df.rename(columns={self.column_name: self.new_column_name}))
def add_column(self, df: DataFrame) -> DataFrame:
df[self.new_column_name] = [self.new_column_value] * len(df)
return DataFrame(df)
def select_columns(self, df: DataFrame) -> DataFrame:
columns = [col.strip() for col in self.columns_to_select]
return DataFrame(df[columns])
# New methods
def head(self, df: DataFrame) -> DataFrame:
return DataFrame(df.head(self.num_rows))
def tail(self, df: DataFrame) -> DataFrame:
return DataFrame(df.tail(self.num_rows))
def replace_values(self, df: DataFrame) -> DataFrame:
df[self.column_name] = df[self.column_name].replace(self.replace_value, self.replacement_value)
return DataFrame(df)
Data to message
This component is in Legacy, which means it is no longer in active development as of Langflow version 1.3. Instead, use the Parser component. |
This component converts data objects into plain text using a specified template.
Parameters
Name | Display Name | Info |
---|---|---|
data |
data |
The data to convert to text. |
template |
Template |
The template to use for formatting the data. It can contain the keys {text}, {data} or any other key in the data. |
sep |
Separator |
The separator to use between multiple data items. |
Name | Display Name | Info |
---|---|---|
text |
Text |
The resulting formatted text string as a message object. |
Component code
parse_data.py
from langflow.custom import Component
from langflow.helpers.data import data_to_text, data_to_text_list
from langflow.io import DataInput, MultilineInput, Output, StrInput
from langflow.schema import Data
from langflow.schema.message import Message
class ParseDataComponent(Component):
display_name = "Data to Message"
description = "Convert Data objects into Messages using any {field_name} from input data."
icon = "message-square"
name = "ParseData"
legacy = True
metadata = {
"legacy_name": "Parse Data",
}
inputs = [
DataInput(
name="data",
display_name="Data",
info="The data to convert to text.",
is_list=True,
required=True,
),
MultilineInput(
name="template",
display_name="Template",
info="The template to use for formatting the data. "
"It can contain the keys {text}, {data} or any other key in the Data.",
value="{text}",
required=True,
),
StrInput(name="sep", display_name="Separator", advanced=True, value="\n"),
]
outputs = [
Output(
display_name="Message",
name="text",
info="Data as a single Message, with each input Data separated by Separator",
method="parse_data",
),
Output(
display_name="Data List",
name="data_list",
info="Data as a list of new Data, each having `text` formatted by Template",
method="parse_data_as_list",
),
]
def _clean_args(self) -> tuple[list[Data], str, str]:
data = self.data if isinstance(self.data, list) else [self.data]
template = self.template
sep = self.sep
return data, template, sep
def parse_data(self) -> Message:
data, template, sep = self._clean_args()
result_string = data_to_text(template, data, sep)
self.status = result_string
return Message(text=result_string)
def parse_data_as_list(self) -> list[Data]:
data, template, _ = self._clean_args()
text_list, data_list = data_to_text_list(template, data)
for item, text in zip(data_list, text_list, strict=True):
item.set_text(text)
self.status = data_list
return data_list
Filter data
This component is in Beta as of Langflow version 1.1.3, and is not yet fully supported. |
This component filters a data object based on a list of specified keys. This component allows for selective extraction of data from a data object, retaining only the key-value pairs that match the provided filter criteria.
Parameters
Name | Display Name | Info |
---|---|---|
data |
data |
data object to filter |
filter_criteria |
Filter Criteria |
List of keys to filter by. |
Name | Display Name | Info |
---|---|---|
filtered_data |
Filtered data |
The resulting filtered data object. |
Component code
filter_data.py
from langflow.custom import Component
from langflow.io import DataInput, MessageTextInput, Output
from langflow.schema import Data
class FilterDataComponent(Component):
display_name = "Filter Data"
description = "Filters a Data object based on a list of keys."
icon = "filter"
beta = True
name = "FilterData"
inputs = [
DataInput(
name="data",
display_name="Data",
info="Data object to filter.",
),
MessageTextInput(
name="filter_criteria",
display_name="Filter Criteria",
info="List of keys to filter by.",
is_list=True,
),
]
outputs = [
Output(display_name="Filtered Data", name="filtered_data", method="filter_data"),
]
def filter_data(self) -> Data:
filter_criteria: list[str] = self.filter_criteria
data = self.data.data if isinstance(self.data, Data) else {}
# Filter the data
filtered = {key: value for key, value in data.items() if key in filter_criteria}
# Create a new Data object with the filtered data
filtered_data = Data(data=filtered)
self.status = filtered_data
return filtered_data
Filter Values
This component is in Beta as of Langflow version 1.1.3, and is not yet fully supported. |
This component filters a list of data items based on a specified key, filter value, and comparison operator.
Parameters
Name | Display Name | Info |
---|---|---|
input_data |
Input data |
The list of data items to filter. |
filter_key |
Filter Key |
The key to filter on (for example, 'route'). |
filter_value |
Filter Value |
The value to filter by (for example, 'CMIP'). |
operator |
Comparison Operator |
The operator to apply for comparing the values. |
Name | Display Name | Info |
---|---|---|
filtered_data |
Filtered data |
The resulting list of filtered data items. |
Component code
filter_data_values.py
from typing import Any
from langflow.custom import Component
from langflow.io import DataInput, DropdownInput, MessageTextInput, Output
from langflow.schema import Data
class DataFilterComponent(Component):
display_name = "Filter Values"
description = (
"Filter a list of data items based on a specified key, filter value,"
" and comparison operator. Check advanced options to select match comparision."
)
icon = "filter"
beta = True
name = "FilterDataValues"
inputs = [
DataInput(name="input_data", display_name="Input Data", info="The list of data items to filter.", is_list=True),
MessageTextInput(
name="filter_key",
display_name="Filter Key",
info="The key to filter on (e.g., 'route').",
value="route",
input_types=["Data"],
),
MessageTextInput(
name="filter_value",
display_name="Filter Value",
info="The value to filter by (e.g., 'CMIP').",
value="CMIP",
input_types=["Data"],
),
DropdownInput(
name="operator",
display_name="Comparison Operator",
options=["equals", "not equals", "contains", "starts with", "ends with"],
info="The operator to apply for comparing the values.",
value="equals",
advanced=True,
),
]
outputs = [
Output(display_name="Filtered Data", name="filtered_data", method="filter_data"),
]
def compare_values(self, item_value: Any, filter_value: str, operator: str) -> bool:
if operator == "equals":
return str(item_value) == filter_value
if operator == "not equals":
return str(item_value) != filter_value
if operator == "contains":
return filter_value in str(item_value)
if operator == "starts with":
return str(item_value).startswith(filter_value)
if operator == "ends with":
return str(item_value).endswith(filter_value)
return False
def filter_data(self) -> list[Data]:
# Extract inputs
input_data: list[Data] = self.input_data
filter_key: str = self.filter_key.text
filter_value: str = self.filter_value.text
operator: str = self.operator
# Validate inputs
if not input_data:
self.status = "Input data is empty."
return []
if not filter_key or not filter_value:
self.status = "Filter key or value is missing."
return input_data
# Filter the data
filtered_data = []
for item in input_data:
if isinstance(item.data, dict) and filter_key in item.data:
if self.compare_values(item.data[filter_key], filter_value, operator):
filtered_data.append(item)
else:
self.status = f"Warning: Some items don't have the key '{filter_key}' or are not dictionaries."
self.status = filtered_data
return filtered_data
JSON Cleaner
This component is in Legacy. Legacy components can be used in flows, but may not work due to Langflow core updates. |
This component cleans JSON strings to ensure they are fully compliant with the JSON specification.
Parameters
Name | Display Name | Info |
---|---|---|
json_str |
JSON String |
The JSON string to be cleaned. This can be a raw, potentially malformed JSON string produced by language models or other sources that may not fully comply with JSON specifications. |
remove_control_chars |
Remove Control Characters |
If set to True, this option removes control characters (ASCII characters 0-31 and 127) from the JSON string. This can help eliminate invisible characters that might cause parsing issues or make the JSON invalid. |
normalize_unicode |
Normalize Unicode |
When enabled, this option normalizes Unicode characters in the JSON string to their canonical composition form (NFC). This ensures consistent representation of Unicode characters across different systems and prevents potential issues with character encoding. |
validate_json |
Validate JSON |
If set to True, this option attempts to parse the JSON string to ensure it is well-formed before applying the final repair operation. It raises a ValueError if the JSON is invalid, allowing for early detection of major structural issues in the JSON. |
Name | Display Name | Info |
---|---|---|
output |
Cleaned JSON String |
The resulting cleaned, repaired, and validated JSON string that fully complies with the JSON specification. |
Component code
json_cleaner.py
import json
import unicodedata
from langflow.custom import Component
from langflow.inputs import BoolInput, MessageTextInput
from langflow.schema.message import Message
from langflow.template import Output
class JSONCleaner(Component):
icon = "braces"
display_name = "JSON Cleaner"
description = (
"Cleans the messy and sometimes incorrect JSON strings produced by LLMs "
"so that they are fully compliant with the JSON spec."
)
legacy = True
inputs = [
MessageTextInput(
name="json_str", display_name="JSON String", info="The JSON string to be cleaned.", required=True
),
BoolInput(
name="remove_control_chars",
display_name="Remove Control Characters",
info="Remove control characters from the JSON string.",
required=False,
),
BoolInput(
name="normalize_unicode",
display_name="Normalize Unicode",
info="Normalize Unicode characters in the JSON string.",
required=False,
),
BoolInput(
name="validate_json",
display_name="Validate JSON",
info="Validate the JSON string to ensure it is well-formed.",
required=False,
),
]
outputs = [
Output(display_name="Cleaned JSON String", name="output", method="clean_json"),
]
def clean_json(self) -> Message:
try:
from json_repair import repair_json
except ImportError as e:
msg = "Could not import the json_repair package. Please install it with `pip install json_repair`."
raise ImportError(msg) from e
"""Clean the input JSON string based on provided options and return the cleaned JSON string."""
json_str = self.json_str
remove_control_chars = self.remove_control_chars
normalize_unicode = self.normalize_unicode
validate_json = self.validate_json
start = json_str.find("{")
end = json_str.rfind("}")
if start == -1 or end == -1:
msg = "Invalid JSON string: Missing '{' or '}'"
raise ValueError(msg)
try:
json_str = json_str[start : end + 1]
if remove_control_chars:
json_str = self._remove_control_characters(json_str)
if normalize_unicode:
json_str = self._normalize_unicode(json_str)
if validate_json:
json_str = self._validate_json(json_str)
cleaned_json_str = repair_json(json_str)
result = str(cleaned_json_str)
self.status = result
return Message(text=result)
except Exception as e:
msg = f"Error cleaning JSON string: {e}"
raise ValueError(msg) from e
def _remove_control_characters(self, s: str) -> str:
"""Remove control characters from the string."""
return s.translate(self.translation_table)
def _normalize_unicode(self, s: str) -> str:
"""Normalize Unicode characters in the string."""
return unicodedata.normalize("NFC", s)
def _validate_json(self, s: str) -> str:
"""Validate the JSON string."""
try:
json.loads(s)
except json.JSONDecodeError as e:
msg = f"Invalid JSON string: {e}"
raise ValueError(msg) from e
return s
def __init__(self, *args, **kwargs):
# Create a translation table that maps control characters to None
super().__init__(*args, **kwargs)
self.translation_table = str.maketrans("", "", "".join(chr(i) for i in range(32)) + chr(127))
Lambda filter
This component uses an LLM to generate a Lambda function for filtering or transforming structured data.
To use the Lambda filter component, you must connect it to a Language Model component, which the component uses to generate a function based on the natural language instructions in the Instructions field.
This example gets JSON data from the https://jsonplaceholder.typicode.com/users
API endpoint.
The Instructions field in the Lambda filter component specifies the task extract emails
.
The connected LLM creates a filter based on the instructions, and successfully extracts a list of email addresses from the JSON data.

Parameters
Name | Display Name | Info |
---|---|---|
data |
Data |
The structured data to filter or transform using a Lambda function. |
llm |
Language Model |
The connection port for a Model component. |
filter_instruction |
Instructions |
Natural language instructions for how to filter or transform the data using a Lambda function, such as |
sample_size |
Sample Size |
For large datasets, the number of characters to sample from the dataset head and tail. |
max_size |
Max Size |
The number of characters for the data to be considered "large", which triggers sampling by the |
Name | Display Name | Info |
---|---|---|
filtered_data |
Filtered Data |
The filtered or transformed data object. |
dataframe |
DataFrame |
The filtered data returned as a |
Component code
lambda_filter.py
from __future__ import annotations
import json
import re
from typing import TYPE_CHECKING, Any
from langflow.custom import Component
from langflow.io import DataInput, HandleInput, IntInput, MultilineInput, Output
from langflow.schema import Data
from langflow.schema.dataframe import DataFrame
from langflow.utils.data_structure import get_data_structure
if TYPE_CHECKING:
from collections.abc import Callable
class LambdaFilterComponent(Component):
display_name = "Lambda Filter"
description = "Uses an LLM to generate a lambda function for filtering or transforming structured data."
icon = "filter"
name = "LambdaFilter"
beta = True
inputs = [
DataInput(
name="data",
display_name="Data",
info="The structured data to filter or transform using a lambda function.",
is_list=True,
required=True,
),
HandleInput(
name="llm",
display_name="Language Model",
info="Connect the 'Language Model' output from your LLM component here.",
input_types=["LanguageModel"],
required=True,
),
MultilineInput(
name="filter_instruction",
display_name="Instructions",
info=(
"Natural language instructions for how to filter or transform the data using a lambda function. "
"Example: Filter the data to only include items where the 'status' is 'active'."
),
value="Filter the data to...",
required=True,
),
IntInput(
name="sample_size",
display_name="Sample Size",
info="For large datasets, number of items to sample from head/tail.",
value=1000,
advanced=True,
),
IntInput(
name="max_size",
display_name="Max Size",
info="Number of characters for the data to be considered large.",
value=30000,
advanced=True,
),
]
outputs = [
Output(
display_name="Filtered Data",
name="filtered_data",
method="filter_data",
),
Output(
display_name="DataFrame",
name="dataframe",
method="as_dataframe",
),
]
def get_data_structure(self, data):
"""Extract the structure of a dictionary, replacing values with their types."""
return {k: get_data_structure(v) for k, v in data.items()}
def _validate_lambda(self, lambda_text: str) -> bool:
"""Validate the provided lambda function text."""
# Return False if the lambda function does not start with 'lambda' or does not contain a colon
return lambda_text.strip().startswith("lambda") and ":" in lambda_text
async def filter_data(self) -> list[Data]:
self.log(str(self.data))
data = self.data[0].data if isinstance(self.data, list) else self.data.data
dump = json.dumps(data)
self.log(str(data))
llm = self.llm
instruction = self.filter_instruction
sample_size = self.sample_size
# Get data structure and samples
data_structure = self.get_data_structure(data)
dump_structure = json.dumps(data_structure)
self.log(dump_structure)
# For large datasets, sample from head and tail
if len(dump) > self.max_size:
data_sample = (
f"Data is too long to display... \n\n First lines (head): {dump[:sample_size]} \n\n"
f" Last lines (tail): {dump[-sample_size:]})"
)
else:
data_sample = dump
self.log(data_sample)
prompt = f"""Given this data structure and examples, create a Python lambda function that
implements the following instruction:
Data Structure:
{dump_structure}
Example Items:
{data_sample}
Instruction: {instruction}
Return ONLY the lambda function and nothing else. No need for ```python or whatever.
Just a string starting with lambda.
"""
response = await llm.ainvoke(prompt)
response_text = response.content if hasattr(response, "content") else str(response)
self.log(response_text)
# Extract lambda using regex
lambda_match = re.search(r"lambda\s+\w+\s*:.*?(?=\n|$)", response_text)
if not lambda_match:
msg = f"Could not find lambda in response: {response_text}"
raise ValueError(msg)
lambda_text = lambda_match.group().strip()
self.log(lambda_text)
# Validation is commented out as requested
if not self._validate_lambda(lambda_text):
msg = f"Invalid lambda format: {lambda_text}"
raise ValueError(msg)
# Create and apply the function
fn: Callable[[Any], Any] = eval(lambda_text) # noqa: S307
# Apply the lambda function to the data
processed_data = fn(data)
# If it's a dict, wrap it in a Data object
if isinstance(processed_data, dict):
return [Data(**processed_data)]
# If it's a list, convert each item to a Data object
if isinstance(processed_data, list):
return [Data(**item) if isinstance(item, dict) else Data(text=str(item)) for item in processed_data]
# If it's anything else, convert to string and wrap in a Data object
return [Data(text=str(processed_data))]
async def as_dataframe(self) -> DataFrame:
"""Return filtered data as a DataFrame."""
filtered_data = await self.filter_data()
return DataFrame(filtered_data)
LLM Router
This component routes requests to the most appropriate LLM based on the OpenRouter model specifications.
Parameters
Name | Display Name | Info |
---|---|---|
models |
Language Models |
List of LLMs to route between. |
input_value |
Input |
The input message to be routed. |
judge_llm |
Judge LLM |
LLM that will evaluate and select the most appropriate model. |
optimization |
Optimization |
Optimization preference (quality/speed/cost/balanced). |
Name | Display Name | Info |
---|---|---|
output |
Output |
The response from the selected model. |
selected_model |
Selected Model |
Name of the chosen model. |
Component code
llm_router.py
import json
import requests
from langflow.base.models.chat_result import get_chat_result
from langflow.base.models.model_utils import get_model_name
from langflow.custom import Component
from langflow.io import DropdownInput, HandleInput, Output
from langflow.schema.message import Message
class LLMRouterComponent(Component):
display_name = "LLM Router"
description = "Routes the input to the most appropriate LLM based on OpenRouter model specifications"
icon = "git-branch"
inputs = [
HandleInput(
name="models",
display_name="Language Models",
input_types=["LanguageModel"],
required=True,
is_list=True,
info="List of LLMs to route between",
),
HandleInput(
name="input_value",
display_name="Input",
input_types=["Message"],
info="The input message to be routed",
),
HandleInput(
name="judge_llm",
display_name="Judge LLM",
input_types=["LanguageModel"],
info="LLM that will evaluate and select the most appropriate model",
),
DropdownInput(
name="optimization",
display_name="Optimization",
options=["quality", "speed", "cost", "balanced"],
value="balanced",
info="Optimization preference for model selection",
),
]
outputs = [
Output(display_name="Output", name="output", method="route_to_model"),
Output(
display_name="Selected Model",
name="selected_model",
method="get_selected_model",
required_inputs=["output"],
),
]
_selected_model_name: str | None = None
def get_selected_model(self) -> str:
return self._selected_model_name or ""
def _get_model_specs(self, model_name: str) -> str:
"""Fetch specific model information from OpenRouter API."""
http_success = 200
base_info = f"Model: {model_name}\n"
# Remove any special characters and spaces, keep only alphanumeric
clean_name = "".join(c.lower() for c in model_name if c.isalnum())
url = f"https://openrouter.ai/api/v1/models/{clean_name}/endpoints"
try:
response = requests.get(url, timeout=10)
except requests.exceptions.RequestException as e:
return base_info + f"Error fetching specs: {e!s}"
if response.status_code != http_success:
return base_info + "No specifications available"
try:
data = response.json().get("data", {})
except (json.JSONDecodeError, requests.exceptions.JSONDecodeError):
return base_info + "Error parsing response data"
# Extract relevant information
context_length = data.get("context_length", "Unknown")
max_completion_tokens = data.get("max_completion_tokens", "Unknown")
architecture = data.get("architecture", {})
tokenizer = architecture.get("tokenizer", "Unknown")
instruct_type = architecture.get("instruct_type", "Unknown")
pricing = data.get("pricing", {})
prompt_price = pricing.get("prompt", "Unknown")
completion_price = pricing.get("completion", "Unknown")
description = data.get("description", "No description available")
created = data.get("created", "Unknown")
return f"""
Model: {model_name}
Description: {description}
Context Length: {context_length} tokens
Max Completion Tokens: {max_completion_tokens}
Tokenizer: {tokenizer}
Instruct Type: {instruct_type}
Pricing: ${prompt_price}/1k tokens (prompt), ${completion_price}/1k tokens (completion)
Created: {created}
"""
MISSING_INPUTS_MSG = "Missing required inputs: models, input_value, or judge_llm"
async def route_to_model(self) -> Message:
if not self.models or not self.input_value or not self.judge_llm:
raise ValueError(self.MISSING_INPUTS_MSG)
system_prompt = {
"role": "system",
"content": (
"You are a model selection expert. Analyze the input and select the most "
"appropriate model based on:\n"
"1. Task complexity and requirements\n"
"2. Context length needed\n"
"3. Model capabilities\n"
"4. Cost considerations\n"
"5. Speed requirements\n\n"
"Consider the detailed model specifications provided and the user's "
"optimization preference. Return only the index number (0-based) of the best model."
),
}
# Create list of available models with their detailed specs
models_info = []
for i, model in enumerate(self.models):
model_name = get_model_name(model)
model_specs = self._get_model_specs(model_name)
models_info.append(f"=== Model {i} ===\n{model_specs}")
models_str = "\n\n".join(models_info)
user_message = {
"role": "user",
"content": f"""Available Models with Specifications:\n{models_str}\n
Optimization Preference: {self.optimization}\n
Input Query: "{self.input_value.text}"\n
Based on the model specifications and optimization preference,
select the most appropriate model (return only the index number):""",
}
try:
# Get judge's decision
response = await self.judge_llm.ainvoke([system_prompt, user_message])
try:
selected_index = int(response.content.strip())
if 0 <= selected_index < len(self.models):
chosen_model = self.models[selected_index]
self._selected_model_name = get_model_name(chosen_model)
else:
chosen_model = self.models[0]
self._selected_model_name = get_model_name(chosen_model)
except ValueError:
chosen_model = self.models[0]
self._selected_model_name = get_model_name(chosen_model)
# Get response from chosen model
return get_chat_result(
runnable=chosen_model,
input_value=self.input_value,
)
except (RuntimeError, ValueError) as e:
self.status = f"Error: {e!s}"
# Fallback to first model
chosen_model = self.models[0]
self._selected_model_name = get_model_name(chosen_model)
return get_chat_result(
runnable=chosen_model,
input_value=self.input_value,
)
Message to data
This component converts a message object to a data object.
Parameters
Name | Display Name | Info |
---|---|---|
message |
message |
The message object to convert to a data object. |
Name | Display Name | Info |
---|---|---|
data |
data |
The resulting data object converted from the input message. |
Component code
message_to_data.py
from loguru import logger
from langflow.custom import Component
from langflow.io import MessageInput, Output
from langflow.schema import Data
from langflow.schema.message import Message
class MessageToDataComponent(Component):
display_name = "Message to Data"
description = "Convert a Message object to a Data object"
icon = "message-square-share"
beta = True
name = "MessagetoData"
inputs = [
MessageInput(
name="message",
display_name="Message",
info="The Message object to convert to a Data object",
),
]
outputs = [
Output(display_name="Data", name="data", method="convert_message_to_data"),
]
def convert_message_to_data(self) -> Data:
if isinstance(self.message, Message):
# Convert Message to Data
return Data(data=self.message.data)
msg = "Error converting Message to Data: Input must be a Message object"
logger.opt(exception=True).debug(msg)
self.status = msg
return Data(data={"error": msg})
Parse DataFrame
This component is in Legacy, which means it is no longer in active development as of Langflow version 1.3. Instead, use the Parser component. |
This component converts a DataFrame into plain text following a specified template. Each column in the DataFrame is treated as a possible template key, for example, {col_name}
.
Parameters
Name | Display Name | Info |
---|---|---|
df |
DataFrame |
The DataFrame to convert to text rows. |
template |
Template |
The template for formatting each row. Use placeholders matching column names in the DataFrame, for example, |
sep |
Separator |
String that joins all row texts when building the single Text output. |
Name | Display Name | Info |
---|---|---|
text |
Text |
All rows combined into a single text, each row formatted by the template and separated by the separator value defined in |
Component code
parse_dataframe.py
from langflow.custom import Component
from langflow.io import DataFrameInput, MultilineInput, Output, StrInput
from langflow.schema.message import Message
class ParseDataFrameComponent(Component):
display_name = "Parse DataFrame"
description = (
"Convert a DataFrame into plain text following a specified template. "
"Each column in the DataFrame is treated as a possible template key, e.g. {col_name}."
)
icon = "braces"
name = "ParseDataFrame"
legacy = True
inputs = [
DataFrameInput(name="df", display_name="DataFrame", info="The DataFrame to convert to text rows."),
MultilineInput(
name="template",
display_name="Template",
info=(
"The template for formatting each row. "
"Use placeholders matching column names in the DataFrame, for example '{col1}', '{col2}'."
),
value="{text}",
),
StrInput(
name="sep",
display_name="Separator",
advanced=True,
value="\n",
info="String that joins all row texts when building the single Text output.",
),
]
outputs = [
Output(
display_name="Text",
name="text",
info="All rows combined into a single text, each row formatted by the template and separated by `sep`.",
method="parse_data",
),
]
def _clean_args(self):
dataframe = self.df
template = self.template or "{text}"
sep = self.sep or "\n"
return dataframe, template, sep
def parse_data(self) -> Message:
"""Converts each row of the DataFrame into a formatted string using the template.
then joins them with `sep`. Returns a single combined string as a Message.
"""
dataframe, template, sep = self._clean_args()
lines = []
# For each row in the DataFrame, build a dict and format
for _, row in dataframe.iterrows():
row_dict = row.to_dict()
text_line = template.format(**row_dict) # e.g. template="{text}", row_dict={"text": "Hello"}
lines.append(text_line)
# Join all lines with the provided separator
result_string = sep.join(lines)
self.status = result_string # store in self.status for UI logs
return Message(text=result_string)
Parse JSON
This component is in Legacy as of Langflow version 1.1.3. Legacy components can be used in flows, but may not work due to Langflow core updates. |
This component converts and extracts JSON fields using JQ queries.
Parameters
Name | Display Name | Info |
---|---|---|
input_value |
Input |
The data object to filter. It can be a message or data object. |
query |
JQ Query |
JQ Query to filter the data. The input is always a JSON list. |
Name | Display Name | Info |
---|---|---|
filtered_data |
Filtered data |
Filtered data as a list of data objects. |
Component code
parse_json_data.py
import json
from json import JSONDecodeError
import jq
from json_repair import repair_json
from loguru import logger
from langflow.custom import Component
from langflow.inputs import HandleInput, MessageTextInput
from langflow.io import Output
from langflow.schema import Data
from langflow.schema.message import Message
class ParseJSONDataComponent(Component):
display_name = "Parse JSON"
description = "Convert and extract JSON fields."
icon = "braces"
name = "ParseJSONData"
legacy: bool = True
inputs = [
HandleInput(
name="input_value",
display_name="Input",
info="Data object to filter.",
required=True,
input_types=["Message", "Data"],
),
MessageTextInput(
name="query",
display_name="JQ Query",
info="JQ Query to filter the data. The input is always a JSON list.",
required=True,
),
]
outputs = [
Output(display_name="Filtered Data", name="filtered_data", method="filter_data"),
]
def _parse_data(self, input_value) -> str:
if isinstance(input_value, Message) and isinstance(input_value.text, str):
return input_value.text
if isinstance(input_value, Data):
return json.dumps(input_value.data)
return str(input_value)
def filter_data(self) -> list[Data]:
to_filter = self.input_value
if not to_filter:
return []
# Check if input is a list
if isinstance(to_filter, list):
to_filter = [self._parse_data(f) for f in to_filter]
else:
to_filter = self._parse_data(to_filter)
# If input is not a list, don't wrap it in a list
if not isinstance(to_filter, list):
to_filter = repair_json(to_filter)
try:
to_filter_as_dict = json.loads(to_filter)
except JSONDecodeError:
try:
to_filter_as_dict = json.loads(repair_json(to_filter))
except JSONDecodeError as e:
msg = f"Invalid JSON: {e}"
raise ValueError(msg) from e
else:
to_filter = [repair_json(f) for f in to_filter]
to_filter_as_dict = []
for f in to_filter:
try:
to_filter_as_dict.append(json.loads(f))
except JSONDecodeError:
try:
to_filter_as_dict.append(json.loads(repair_json(f)))
except JSONDecodeError as e:
msg = f"Invalid JSON: {e}"
raise ValueError(msg) from e
to_filter = to_filter_as_dict
full_filter_str = json.dumps(to_filter_as_dict)
logger.info("to_filter: ", to_filter)
results = jq.compile(self.query).input_text(full_filter_str).all()
logger.info("results: ", results)
return [Data(data=value) if isinstance(value, dict) else Data(text=str(value)) for value in results]
Parser
This component formats DataFrame
or Data
objects into text using templates, with an option to convert inputs directly to strings using stringify
.
To use this component, create variables for values in the template
the same way you would in a Prompt component. For DataFrames
, use column names, for example Name: {Name}
. For Data
objects, use {text}
.
Parameters
Name | Display Name | Info |
---|---|---|
stringify |
Stringify |
Enable to convert input to a string instead of using a template. |
template |
Template |
Template for formatting using variables in curly brackets. For |
input_data |
Data or DataFrame |
The input to parse - accepts either a DataFrame or Data object. |
sep |
Separator |
String used to separate rows/items. Default: newline. |
clean_data |
Clean Data |
When stringify is enabled, cleans data by removing empty rows and lines. |
Name | Display Name | Info |
---|---|---|
parsed_text |
Parsed Text |
The resulting formatted text as a message object. |
Component code
parser.py
import json
from typing import Any
from langflow.custom import Component
from langflow.io import (
BoolInput,
HandleInput,
MessageTextInput,
MultilineInput,
Output,
TabInput,
)
from langflow.schema import Data, DataFrame
from langflow.schema.message import Message
class ParserComponent(Component):
display_name = "Parser"
description = (
"Format a DataFrame or Data object into text using a template. "
"Enable 'Stringify' to convert input into a readable string instead."
)
icon = "braces"
inputs = [
TabInput(
name="mode",
display_name="Mode",
options=["Parser", "Stringify"],
value="Parser",
info="Convert into raw string instead of using a template.",
real_time_refresh=True,
),
MultilineInput(
name="pattern",
display_name="Template",
info=(
"Use variables within curly brackets to extract column values for DataFrames "
"or key values for Data."
"For example: `Name: {Name}, Age: {Age}, Country: {Country}`"
),
value="Text: {text}", # Example default
dynamic=True,
show=True,
required=True,
),
HandleInput(
name="input_data",
display_name="Data or DataFrame",
input_types=["DataFrame", "Data"],
info="Accepts either a DataFrame or a Data object.",
required=True,
),
MessageTextInput(
name="sep",
display_name="Separator",
advanced=True,
value="\n",
info="String used to separate rows/items.",
),
]
outputs = [
Output(
display_name="Parsed Text",
name="parsed_text",
info="Formatted text output.",
method="parse_combined_text",
),
]
def update_build_config(self, build_config, field_value, field_name=None):
"""Dynamically hide/show `template` and enforce requirement based on `stringify`."""
if field_name == "mode":
build_config["pattern"]["show"] = self.mode == "Parser"
build_config["pattern"]["required"] = self.mode == "Parser"
if field_value:
clean_data = BoolInput(
name="clean_data",
display_name="Clean Data",
info=(
"Enable to clean the data by removing empty rows and lines "
"in each cell of the DataFrame/ Data object."
),
value=True,
advanced=True,
required=False,
)
build_config["clean_data"] = clean_data.to_dict()
else:
build_config.pop("clean_data", None)
return build_config
def _clean_args(self):
"""Prepare arguments based on input type."""
input_data = self.input_data
match input_data:
case list() if all(isinstance(item, Data) for item in input_data):
msg = "List of Data objects is not supported."
raise ValueError(msg)
case DataFrame():
return input_data, None
case Data():
return None, input_data
case dict() if "data" in input_data:
try:
if "columns" in input_data: # Likely a DataFrame
return DataFrame.from_dict(input_data), None
# Likely a Data object
return None, Data(**input_data)
except (TypeError, ValueError, KeyError) as e:
msg = f"Invalid structured input provided: {e!s}"
raise ValueError(msg) from e
case _:
msg = f"Unsupported input type: {type(input_data)}. Expected DataFrame or Data."
raise ValueError(msg)
def parse_combined_text(self) -> Message:
"""Parse all rows/items into a single text or convert input to string if `stringify` is enabled."""
# Early return for stringify option
if self.mode == "Stringify":
return self.convert_to_string()
df, data = self._clean_args()
lines = []
if df is not None:
for _, row in df.iterrows():
formatted_text = self.pattern.format(**row.to_dict())
lines.append(formatted_text)
elif data is not None:
formatted_text = self.pattern.format(**data.data)
lines.append(formatted_text)
combined_text = self.sep.join(lines)
self.status = combined_text
return Message(text=combined_text)
def _safe_convert(self, data: Any) -> str:
"""Safely convert input data to string."""
try:
if isinstance(data, str):
return data
if isinstance(data, Message):
return data.get_text()
if isinstance(data, Data):
return json.dumps(data.data)
if isinstance(data, DataFrame):
if hasattr(self, "clean_data") and self.clean_data:
# Remove empty rows
data = data.dropna(how="all")
# Remove empty lines in each cell
data = data.replace(r"^\s*$", "", regex=True)
# Replace multiple newlines with a single newline
data = data.replace(r"\n+", "\n", regex=True)
return data.to_markdown(index=False)
return str(data)
except (ValueError, TypeError, AttributeError) as e:
msg = f"Error converting data: {e!s}"
raise ValueError(msg) from e
def convert_to_string(self) -> Message:
"""Convert input data to string with proper error handling."""
result = ""
if isinstance(self.input_data, list):
result = "\n".join([self._safe_convert(item) for item in self.input_data])
else:
result = self._safe_convert(self.input_data)
self.log(f"Converted to string with length: {len(result)}")
message = Message(text=result)
self.status = message
return message
Regex Extractor
This component extracts patterns from text using regular expressions.
Parameters
Name | Display Name | Info |
---|---|---|
input_text |
Input Text |
The text to analyze. Type: MessageTextInput, Required: true |
pattern |
Regex Pattern |
The regular expression pattern to match. Type: MessageTextInput, Required: true |
Name | Display Name | Info |
---|---|---|
data |
Data |
List of extracted matches as Data objects. Method: extract_matches |
text |
Message |
Formatted text of all matches. Method: get_matches_text |
Component code
regex.py
import re
from langflow.custom import Component
from langflow.io import MessageTextInput, Output
from langflow.schema import Data
from langflow.schema.message import Message
class RegexExtractorComponent(Component):
display_name = "Regex Extractor"
description = "Extract patterns from text using regular expressions."
icon = "regex"
inputs = [
MessageTextInput(
name="input_text",
display_name="Input Text",
info="The text to analyze",
required=True,
),
MessageTextInput(
name="pattern",
display_name="Regex Pattern",
info="The regular expression pattern to match",
value=r"",
required=True,
tool_mode=True,
),
]
outputs = [
Output(display_name="Data", name="data", method="extract_matches"),
Output(display_name="Message", name="text", method="get_matches_text"),
]
def extract_matches(self) -> list[Data]:
if not self.pattern or not self.input_text:
self.status = []
return []
try:
# Compile regex pattern
pattern = re.compile(self.pattern)
# Find all matches in the input text
matches = pattern.findall(self.input_text)
# Filter out empty matches
filtered_matches = [match for match in matches if match] # Remove empty matches
# Return empty list for no matches, or list of matches if found
result: list = [] if not filtered_matches else [Data(data={"match": match}) for match in filtered_matches]
except re.error as e:
error_message = f"Invalid regex pattern: {e!s}"
result = [Data(data={"error": error_message})]
except ValueError as e:
error_message = f"Error extracting matches: {e!s}"
result = [Data(data={"error": error_message})]
self.status = result
return result
def get_matches_text(self) -> Message:
"""Get matches as a formatted text message."""
matches = self.extract_matches()
if not matches:
message = Message(text="No matches found")
self.status = message
return message
if "error" in matches[0].data:
message = Message(text=matches[0].data["error"])
self.status = message
return message
result = "\n".join(match.data["match"] for match in matches)
message = Message(text=result)
self.status = message
return message
Save to File
This component saves DataFrames
, Data
, or Messages
to various file formats.
Parameters
Name | Display Name | Info |
---|---|---|
input_type |
Input Type |
Select the type of input to save. Options: ["DataFrame", "Data", "Message"]. Type: DropdownInput |
df |
DataFrame |
The DataFrame to save. Type: DataFrameInput |
data |
Data |
The Data object to save. Type: DataInput |
message |
Message |
The Message to save. Type: MessageInput |
file_format |
File Format |
Select the file format to save the input. Options for DataFrame/Data: ["csv", "excel", "json", "markdown"]. Options for Message: ["txt", "json", "markdown"]. Type: DropdownInput |
file_path |
File Path (including filename) |
The full file path (including filename and extension). Type: StrInput |
Name | Display Name | Info |
---|---|---|
confirmation |
Confirmation |
Confirmation message after saving the file. Method: save_to_file |
Component code
save_to_file.py
import json
from collections.abc import AsyncIterator, Iterator
from pathlib import Path
import pandas as pd
from langflow.custom import Component
from langflow.io import (
DataFrameInput,
DataInput,
DropdownInput,
MessageInput,
Output,
StrInput,
)
from langflow.schema import Data, DataFrame, Message
class SaveToFileComponent(Component):
display_name = "Save to File"
description = "Save DataFrames, Data, or Messages to various file formats."
icon = "save"
name = "SaveToFile"
# File format options for different types
DATA_FORMAT_CHOICES = ["csv", "excel", "json", "markdown"]
MESSAGE_FORMAT_CHOICES = ["txt", "json", "markdown"]
inputs = [
DropdownInput(
name="input_type",
display_name="Input Type",
options=["DataFrame", "Data", "Message"],
info="Select the type of input to save.",
value="DataFrame",
real_time_refresh=True,
),
DataFrameInput(
name="df",
display_name="DataFrame",
info="The DataFrame to save.",
dynamic=True,
show=True,
),
DataInput(
name="data",
display_name="Data",
info="The Data object to save.",
dynamic=True,
show=False,
),
MessageInput(
name="message",
display_name="Message",
info="The Message to save.",
dynamic=True,
show=False,
),
DropdownInput(
name="file_format",
display_name="File Format",
options=DATA_FORMAT_CHOICES,
info="Select the file format to save the input.",
real_time_refresh=True,
),
StrInput(
name="file_path",
display_name="File Path (including filename)",
info="The full file path (including filename and extension).",
value="./output",
),
]
outputs = [
Output(
name="confirmation",
display_name="Confirmation",
method="save_to_file",
info="Confirmation message after saving the file.",
),
]
def update_build_config(self, build_config, field_value, field_name=None):
# Hide/show dynamic fields based on the selected input type
if field_name == "input_type":
build_config["df"]["show"] = field_value == "DataFrame"
build_config["data"]["show"] = field_value == "Data"
build_config["message"]["show"] = field_value == "Message"
if field_value in {"DataFrame", "Data"}:
build_config["file_format"]["options"] = self.DATA_FORMAT_CHOICES
elif field_value == "Message":
build_config["file_format"]["options"] = self.MESSAGE_FORMAT_CHOICES
return build_config
def save_to_file(self) -> str:
input_type = self.input_type
file_format = self.file_format
file_path = Path(self.file_path).expanduser()
# Ensure the directory exists
if not file_path.parent.exists():
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path = self._adjust_file_path_with_format(file_path, file_format)
if input_type == "DataFrame":
dataframe = self.df
return self._save_dataframe(dataframe, file_path, file_format)
if input_type == "Data":
data = self.data
return self._save_data(data, file_path, file_format)
if input_type == "Message":
message = self.message
return self._save_message(message, file_path, file_format)
error_msg = f"Unsupported input type: {input_type}"
raise ValueError(error_msg)
def _adjust_file_path_with_format(self, path: Path, fmt: str) -> Path:
file_extension = path.suffix.lower().lstrip(".")
if fmt == "excel":
return Path(f"{path}.xlsx").expanduser() if file_extension not in ["xlsx", "xls"] else path
return Path(f"{path}.{fmt}").expanduser() if file_extension != fmt else path
def _save_dataframe(self, dataframe: DataFrame, path: Path, fmt: str) -> str:
if fmt == "csv":
dataframe.to_csv(path, index=False)
elif fmt == "excel":
dataframe.to_excel(path, index=False, engine="openpyxl")
elif fmt == "json":
dataframe.to_json(path, orient="records", indent=2)
elif fmt == "markdown":
path.write_text(dataframe.to_markdown(index=False), encoding="utf-8")
else:
error_msg = f"Unsupported DataFrame format: {fmt}"
raise ValueError(error_msg)
return f"DataFrame saved successfully as '{path}'"
def _save_data(self, data: Data, path: Path, fmt: str) -> str:
if fmt == "csv":
pd.DataFrame(data.data).to_csv(path, index=False)
elif fmt == "excel":
pd.DataFrame(data.data).to_excel(path, index=False, engine="openpyxl")
elif fmt == "json":
path.write_text(json.dumps(data.data, indent=2), encoding="utf-8")
elif fmt == "markdown":
path.write_text(pd.DataFrame(data.data).to_markdown(index=False), encoding="utf-8")
else:
error_msg = f"Unsupported Data format: {fmt}"
raise ValueError(error_msg)
return f"Data saved successfully as '{path}'"
def _save_message(self, message: Message, path: Path, fmt: str) -> str:
if message.text is None:
content = ""
elif isinstance(message.text, AsyncIterator):
# AsyncIterator needs to be handled differently
error_msg = "AsyncIterator not supported"
raise ValueError(error_msg)
elif isinstance(message.text, Iterator):
# Convert iterator to string
content = " ".join(str(item) for item in message.text)
else:
content = str(message.text)
if fmt == "txt":
path.write_text(content, encoding="utf-8")
elif fmt == "json":
path.write_text(json.dumps({"message": content}, indent=2), encoding="utf-8")
elif fmt == "markdown":
path.write_text(f"**Message:**\n\n{content}", encoding="utf-8")
else:
error_msg = f"Unsupported Message format: {fmt}"
raise ValueError(error_msg)
return f"Message saved successfully as '{path}'"
Select Data
This component is in Legacy as of Langflow version 1.1.3. Legacy components can be used in flows, but may not work due to Langflow core updates. |
This component selects a single data item from a list.
Parameters
Name | Display Name | Info |
---|---|---|
data_list |
Data List |
List of data to select from. |
data_index |
Data Index |
Index of the data to select. |
Name | Display Name | Info |
---|---|---|
selected_data |
Selected Data |
The selected Data object. |
Component code
select_data.py
from langflow.custom import Component
from langflow.field_typing.range_spec import RangeSpec
from langflow.inputs.inputs import DataInput, IntInput
from langflow.io import Output
from langflow.schema import Data
class SelectDataComponent(Component):
display_name: str = "Select Data"
description: str = "Select a single data from a list of data."
name: str = "SelectData"
icon = "prototypes"
legacy = True
inputs = [
DataInput(
name="data_list",
display_name="Data List",
info="List of data to select from.",
is_list=True, # Specify that this input takes a list of Data objects
),
IntInput(
name="data_index",
display_name="Data Index",
info="Index of the data to select.",
value=0, # Will be populated dynamically based on the length of data_list
range_spec=RangeSpec(min=0, max=15, step=1, step_type="int"),
),
]
outputs = [
Output(display_name="Selected Data", name="selected_data", method="select_data"),
]
async def select_data(self) -> Data:
# Retrieve the selected index from the dropdown
selected_index = int(self.data_index)
# Get the data list
# Validate that the selected index is within bounds
if selected_index < 0 or selected_index >= len(self.data_list):
msg = f"Selected index {selected_index} is out of range."
raise ValueError(msg)
# Return the selected Data object
selected_data = self.data_list[selected_index]
self.status = selected_data # Update the component status to reflect the selected data
return selected_data
Split text
This component splits text into chunks based on specified criteria.
Parameters
Name | Display Name | Info |
---|---|---|
data_inputs |
Input Documents |
The data to split. The component accepts |
chunk_overlap |
Chunk Overlap |
The number of characters to overlap between chunks. Default: |
chunk_size |
Chunk Size |
The maximum number of characters in each chunk. Default: |
separator |
Separator |
The character to split on. Default: |
text_key |
Text Key |
The key to use for the text column (advanced). Default: |
Name | Display Name | Info |
---|---|---|
chunks |
Chunks |
List of split text chunks as |
dataframe |
DataFrame |
List of split text chunks as |
Component code
split_text.py
from langchain_text_splitters import CharacterTextSplitter
from langflow.custom import Component
from langflow.io import DropdownInput, HandleInput, IntInput, MessageTextInput, Output
from langflow.schema import Data, DataFrame
from langflow.utils.util import unescape_string
class SplitTextComponent(Component):
display_name: str = "Split Text"
description: str = "Split text into chunks based on specified criteria."
icon = "scissors-line-dashed"
name = "SplitText"
inputs = [
HandleInput(
name="data_inputs",
display_name="Data or DataFrame",
info="The data with texts to split in chunks.",
input_types=["Data", "DataFrame"],
required=True,
),
IntInput(
name="chunk_overlap",
display_name="Chunk Overlap",
info="Number of characters to overlap between chunks.",
value=200,
),
IntInput(
name="chunk_size",
display_name="Chunk Size",
info=(
"The maximum length of each chunk. Text is first split by separator, "
"then chunks are merged up to this size. "
"Individual splits larger than this won't be further divided."
),
value=1000,
),
MessageTextInput(
name="separator",
display_name="Separator",
info=(
"The character to split on. Use \\n for newline. "
"Examples: \\n\\n for paragraphs, \\n for lines, . for sentences"
),
value="\n",
),
MessageTextInput(
name="text_key",
display_name="Text Key",
info="The key to use for the text column.",
value="text",
advanced=True,
),
DropdownInput(
name="keep_separator",
display_name="Keep Separator",
info="Whether to keep the separator in the output chunks and where to place it.",
options=["False", "True", "Start", "End"],
value="False",
advanced=True,
),
]
outputs = [
Output(display_name="Chunks", name="chunks", method="split_text"),
Output(display_name="DataFrame", name="dataframe", method="as_dataframe"),
]
def _docs_to_data(self, docs) -> list[Data]:
return [Data(text=doc.page_content, data=doc.metadata) for doc in docs]
def _fix_separator(self, separator: str) -> str:
"""Fix common separator issues and convert to proper format."""
if separator == "/n":
return "\n"
if separator == "/t":
return "\t"
return separator
def split_text_base(self):
separator = self._fix_separator(self.separator)
separator = unescape_string(separator)
if isinstance(self.data_inputs, DataFrame):
if not len(self.data_inputs):
msg = "DataFrame is empty"
raise TypeError(msg)
self.data_inputs.text_key = self.text_key
try:
documents = self.data_inputs.to_lc_documents()
except Exception as e:
msg = f"Error converting DataFrame to documents: {e}"
raise TypeError(msg) from e
else:
if not self.data_inputs:
msg = "No data inputs provided"
raise TypeError(msg)
documents = []
if isinstance(self.data_inputs, Data):
self.data_inputs.text_key = self.text_key
documents = [self.data_inputs.to_lc_document()]
else:
try:
documents = [input_.to_lc_document() for input_ in self.data_inputs if isinstance(input_, Data)]
if not documents:
msg = f"No valid Data inputs found in {type(self.data_inputs)}"
raise TypeError(msg)
except AttributeError as e:
msg = f"Invalid input type in collection: {e}"
raise TypeError(msg) from e
try:
# Convert string 'False'/'True' to boolean
keep_sep = self.keep_separator
if isinstance(keep_sep, str):
if keep_sep.lower() == "false":
keep_sep = False
elif keep_sep.lower() == "true":
keep_sep = True
# 'start' and 'end' are kept as strings
splitter = CharacterTextSplitter(
chunk_overlap=self.chunk_overlap,
chunk_size=self.chunk_size,
separator=separator,
keep_separator=keep_sep,
)
return splitter.split_documents(documents)
except Exception as e:
msg = f"Error splitting text: {e}"
raise TypeError(msg) from e
def split_text(self) -> list[Data]:
return self._docs_to_data(self.split_text_base())
def as_dataframe(self) -> DataFrame:
return DataFrame(self.split_text())
Update data
The Update data component dynamically updates or appends data with specified fields.
Parameters
Name | Display Name | Info |
---|---|---|
old_data |
data |
The records to update. It can be a single data object or a list of data objects. |
number_of_fields |
Number of Fields |
Number of fields to be added to the record (range: 1-15). |
text_key |
Text Key |
Key that identifies the field to be used as the text content. |
text_key_validator |
Text Key Validator |
If enabled, checks if the given 'Text Key' is present in the given 'data' object. |
Name | Display Name | Info |
---|---|---|
data |
data |
The resulting updated data objects. |
Component code
update_data.py
from typing import Any
from langflow.custom import Component
from langflow.field_typing.range_spec import RangeSpec
from langflow.inputs.inputs import (
BoolInput,
DataInput,
DictInput,
IntInput,
MessageTextInput,
)
from langflow.io import Output
from langflow.schema import Data
from langflow.schema.dotdict import dotdict
class UpdateDataComponent(Component):
display_name: str = "Update Data"
description: str = "Dynamically update or append data with the specified fields."
name: str = "UpdateData"
MAX_FIELDS = 15 # Define a constant for maximum number of fields
icon = "FolderSync"
inputs = [
DataInput(
name="old_data",
display_name="Data",
info="The record to update.",
is_list=True, # Changed to True to handle list of Data objects
required=True,
),
IntInput(
name="number_of_fields",
display_name="Number of Fields",
info="Number of fields to be added to the record.",
real_time_refresh=True,
value=0,
range_spec=RangeSpec(min=1, max=MAX_FIELDS, step=1, step_type="int"),
),
MessageTextInput(
name="text_key",
display_name="Text Key",
info="Key that identifies the field to be used as the text content.",
advanced=True,
),
BoolInput(
name="text_key_validator",
display_name="Text Key Validator",
advanced=True,
info="If enabled, checks if the given 'Text Key' is present in the given 'Data'.",
),
]
outputs = [
Output(display_name="Data", name="data", method="build_data"),
]
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
"""Update the build configuration when the number of fields changes.
Args:
build_config (dotdict): The current build configuration.
field_value (Any): The new value for the field.
field_name (Optional[str]): The name of the field being updated.
"""
if field_name == "number_of_fields":
default_keys = {
"code",
"_type",
"number_of_fields",
"text_key",
"old_data",
"text_key_validator",
}
try:
field_value_int = int(field_value)
except ValueError:
return build_config
if field_value_int > self.MAX_FIELDS:
build_config["number_of_fields"]["value"] = self.MAX_FIELDS
msg = f"Number of fields cannot exceed {self.MAX_FIELDS}. Try using a Component to combine two Data."
raise ValueError(msg)
existing_fields = {}
# Back up the existing template fields
for key in list(build_config.keys()):
if key not in default_keys:
existing_fields[key] = build_config.pop(key)
for i in range(1, field_value_int + 1):
key = f"field_{i}_key"
if key in existing_fields:
field = existing_fields[key]
build_config[key] = field
else:
field = DictInput(
display_name=f"Field {i}",
name=key,
info=f"Key for field {i}.",
input_types=["Message", "Data"],
)
build_config[field.name] = field.to_dict()
build_config["number_of_fields"]["value"] = field_value_int
return build_config
async def build_data(self) -> Data | list[Data]:
"""Build the updated data by combining the old data with new fields."""
new_data = self.get_data()
if isinstance(self.old_data, list):
for data_item in self.old_data:
if not isinstance(data_item, Data):
continue # Skip invalid items
data_item.data.update(new_data)
if self.text_key:
data_item.text_key = self.text_key
self.validate_text_key(data_item)
self.status = self.old_data
return self.old_data # Returns List[Data]
if isinstance(self.old_data, Data):
self.old_data.data.update(new_data)
if self.text_key:
self.old_data.text_key = self.text_key
self.status = self.old_data
self.validate_text_key(self.old_data)
return self.old_data # Returns Data
msg = "old_data is not a Data object or list of Data objects."
raise ValueError(msg)
def get_data(self):
"""Function to get the Data from the attributes."""
data = {}
default_keys = {
"code",
"_type",
"number_of_fields",
"text_key",
"old_data",
"text_key_validator",
}
for attr_name, attr_value in self._attributes.items():
if attr_name in default_keys:
continue # Skip default attributes
if isinstance(attr_value, dict):
for key, value in attr_value.items():
data[key] = value.get_text() if isinstance(value, Data) else value
elif isinstance(attr_value, Data):
data[attr_name] = attr_value.get_text()
else:
data[attr_name] = attr_value
return data
def validate_text_key(self, data: Data) -> None:
"""This function validates that the Text Key is one of the keys in the Data."""
data_keys = data.data.keys()
if self.text_key and self.text_key not in data_keys:
msg = f"Text Key: '{self.text_key}' not found in the Data keys: {', '.join(data_keys)}"
raise ValueError(msg)