Processing components in Langflow
This Langflow feature is currently in public preview. Development is ongoing, and the features and functionality are subject to change. Langflow, and the use of such, is subject to the DataStax Preview Terms. |
Processing components process and transform data within a flow.
Use a processing component in a flow
The Split Text processing component in this flow splits the incoming Data
into chunks to be embedded into the vector store component.
The component offers control over chunk size, overlap, and separator, which affect context and granularity in vector store retrieval results.

Alter Metadata
This component modifies metadata of input objects. It can add new metadata, update existing metadata, and remove specified metadata fields. The component works with both Message
and Data
objects, and can also create a new Data
object from user-provided text.
Parameters
Name | Display Name | Info |
---|---|---|
input_value |
Input |
Objects to which Metadata should be added. |
text_in |
User Text |
Text input; the value will be in the 'text' attribute of the Data object. Empty text entries are ignored. |
metadata |
Metadata |
Metadata to add to each object. |
remove_fields |
Fields to Remove |
Metadata fields to remove. |
Name | Display Name | Info |
---|---|---|
data |
Data |
List of Input objects, each with added Metadata. |
Component code
alter_metadata.py
from langflow.custom import Component
from langflow.inputs import MessageTextInput
from langflow.io import HandleInput, NestedDictInput, Output, StrInput
from langflow.schema import Data
class AlterMetadataComponent(Component):
display_name = "Alter Metadata"
description = "Adds/Removes Metadata Dictionary on inputs"
icon = "merge"
name = "AlterMetadata"
inputs = [
HandleInput(
name="input_value",
display_name="Input",
info="Object(s) to which Metadata should be added",
required=False,
input_types=["Message", "Data"],
is_list=True,
),
StrInput(
name="text_in",
display_name="User Text",
info="Text input; value will be in 'text' attribute of Data object. Empty text entries are ignored.",
required=False,
),
NestedDictInput(
name="metadata",
display_name="Metadata",
info="Metadata to add to each object",
input_types=["Data"],
required=True,
),
MessageTextInput(
name="remove_fields",
display_name="Fields to Remove",
info="Metadata Fields to Remove",
required=False,
is_list=True,
),
]
outputs = [
Output(
name="data",
display_name="Data",
info="List of Input objects each with added Metadata",
method="process_output",
),
]
def _as_clean_dict(self, obj):
"""Convert a Data object or a standard dictionary to a standard dictionary."""
if isinstance(obj, dict):
as_dict = obj
elif isinstance(obj, Data):
as_dict = obj.data
else:
msg = f"Expected a Data object or a dictionary but got {type(obj)}."
raise TypeError(msg)
return {k: v for k, v in (as_dict or {}).items() if k and k.strip()}
def process_output(self) -> list[Data]:
# Ensure metadata is a dictionary, filtering out any empty keys
metadata = self._as_clean_dict(self.metadata)
# Convert text_in to a Data object if it exists, and initialize our list of Data objects
data_objects = [Data(text=self.text_in)] if self.text_in else []
# Append existing Data objects from input_value, if any
if self.input_value:
data_objects.extend(self.input_value)
# Update each Data object with the new metadata, preserving existing fields
for data in data_objects:
data.data.update(metadata)
# Handle removal of fields specified in remove_fields
if self.remove_fields:
fields_to_remove = {field.strip() for field in self.remove_fields if field.strip()}
# Remove specified fields from each Data object's metadata
for data in data_objects:
data.data = {k: v for k, v in data.data.items() if k not in fields_to_remove}
# Set the status for tracking/debugging purposes
self.status = data_objects
return data_objects
Combine data
Prior to Langflow version 1.1.3, this component was named Merge Data. |
This component combines multiple data objects into a unified list of data objects.
Parameters
Name | Display Name | Info |
---|---|---|
data_inputs |
data Inputs |
A list of data input objects to be merged. |
Name | Display Name | Info |
---|---|---|
merged_data |
Merged data |
The resulting list of merged data objects with consistent keys. |
Component code
merge_data.py
from enum import Enum
from typing import cast
from loguru import logger
from langflow.custom import Component
from langflow.io import DataInput, DropdownInput, Output
from langflow.schema import DataFrame
class DataOperation(str, Enum):
CONCATENATE = "Concatenate"
APPEND = "Append"
MERGE = "Merge"
JOIN = "Join"
class MergeDataComponent(Component):
display_name = "Combine Data"
description = "Combines data using different operations"
icon = "merge"
MIN_INPUTS_REQUIRED = 2
inputs = [
DataInput(name="data_inputs", display_name="Data Inputs", info="Data to combine", is_list=True, required=True),
DropdownInput(
name="operation",
display_name="Operation Type",
options=[op.value for op in DataOperation],
value=DataOperation.CONCATENATE.value,
),
]
outputs = [Output(display_name="DataFrame", name="combined_data", method="combine_data")]
def combine_data(self) -> DataFrame:
if not self.data_inputs or len(self.data_inputs) < self.MIN_INPUTS_REQUIRED:
empty_dataframe = DataFrame()
self.status = empty_dataframe
return empty_dataframe
operation = DataOperation(self.operation)
try:
combined_dataframe = self._process_operation(operation)
self.status = combined_dataframe
except Exception as e:
logger.error(f"Error during operation {operation}: {e!s}")
raise
else:
return combined_dataframe
def _process_operation(self, operation: DataOperation) -> DataFrame:
if operation == DataOperation.CONCATENATE:
combined_data: dict[str, str | object] = {}
for data_input in self.data_inputs:
for key, value in data_input.data.items():
if key in combined_data:
if isinstance(combined_data[key], str) and isinstance(value, str):
combined_data[key] = f"{combined_data[key]}\n{value}"
else:
combined_data[key] = value
else:
combined_data[key] = value
return DataFrame([combined_data])
if operation == DataOperation.APPEND:
rows = [data_input.data for data_input in self.data_inputs]
return DataFrame(rows)
if operation == DataOperation.MERGE:
result_data: dict[str, str | list[str] | object] = {}
for data_input in self.data_inputs:
for key, value in data_input.data.items():
if key in result_data and isinstance(value, str):
if isinstance(result_data[key], list):
cast("list[str]", result_data[key]).append(value)
else:
result_data[key] = [result_data[key], value]
else:
result_data[key] = value
return DataFrame([result_data])
if operation == DataOperation.JOIN:
combined_data = {}
for idx, data_input in enumerate(self.data_inputs, 1):
for key, value in data_input.data.items():
new_key = f"{key}_doc{idx}" if idx > 1 else key
combined_data[new_key] = value
return DataFrame([combined_data])
return DataFrame()
Combine Text
This component concatenates two text sources into a single text chunk using a specified delimiter.
Parameters
Name | Display Name | Info |
---|---|---|
first_text |
First Text |
The first text input to concatenate. |
second_text |
Second Text |
The second text input to concatenate. |
delimiter |
Delimiter |
A string used to separate the two text inputs. Defaults to a space. |
Name | Display Name | Info |
---|---|---|
message |
Message |
A Message object containing the combined text. |
Component code
combine_text.py
from langflow.custom import Component
from langflow.io import MessageTextInput, Output
from langflow.schema.message import Message
class CombineTextComponent(Component):
display_name = "Combine Text"
description = "Concatenate two text sources into a single text chunk using a specified delimiter."
icon = "merge"
name = "CombineText"
inputs = [
MessageTextInput(
name="text1",
display_name="First Text",
info="The first text input to concatenate.",
),
MessageTextInput(
name="text2",
display_name="Second Text",
info="The second text input to concatenate.",
),
MessageTextInput(
name="delimiter",
display_name="Delimiter",
info="A string used to separate the two text inputs. Defaults to a whitespace.",
value=" ",
),
]
outputs = [
Output(display_name="Combined Text", name="combined_text", method="combine_texts"),
]
def combine_texts(self) -> Message:
combined = self.delimiter.join([self.text1, self.text2])
self.status = combined
return Message(text=combined)
Create data
This component is in Legacy as of Langflow version 1.1.3. Legacy components can be used in flows, but may not work due to Langflow core updates. |
This component dynamically creates a Data
object with a specified number of fields.
Parameters
Name | Display Name | Info |
---|---|---|
number_of_fields |
Number of Fields |
The number of fields to be added to the record. |
text_key |
Text Key |
Key that identifies the field to be used as the text content. |
text_key_validator |
Text Key Validator |
If enabled, checks if the given 'Text Key' is present in the given 'Data'. |
Name | Display Name | Info |
---|---|---|
data |
Data |
A |
Component code
create_data.py
from typing import Any
from langflow.custom import Component
from langflow.field_typing.range_spec import RangeSpec
from langflow.inputs.inputs import BoolInput, DictInput, IntInput, MessageTextInput
from langflow.io import Output
from langflow.schema import Data
from langflow.schema.dotdict import dotdict
class CreateDataComponent(Component):
display_name: str = "Create Data"
description: str = "Dynamically create a Data with a specified number of fields."
name: str = "CreateData"
MAX_FIELDS = 15 # Define a constant for maximum number of fields
legacy = True
icon = "ListFilter"
inputs = [
IntInput(
name="number_of_fields",
display_name="Number of Fields",
info="Number of fields to be added to the record.",
real_time_refresh=True,
value=1,
range_spec=RangeSpec(min=1, max=MAX_FIELDS, step=1, step_type="int"),
),
MessageTextInput(
name="text_key",
display_name="Text Key",
info="Key that identifies the field to be used as the text content.",
advanced=True,
),
BoolInput(
name="text_key_validator",
display_name="Text Key Validator",
advanced=True,
info="If enabled, checks if the given 'Text Key' is present in the given 'Data'.",
),
]
outputs = [
Output(display_name="Data", name="data", method="build_data"),
]
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
if field_name == "number_of_fields":
default_keys = ["code", "_type", "number_of_fields", "text_key", "text_key_validator"]
try:
field_value_int = int(field_value)
except ValueError:
return build_config
existing_fields = {}
if field_value_int > self.MAX_FIELDS:
build_config["number_of_fields"]["value"] = self.MAX_FIELDS
msg = (
f"Number of fields cannot exceed {self.MAX_FIELDS}. "
"Please adjust the number of fields to be within the allowed limit."
)
raise ValueError(msg)
if len(build_config) > len(default_keys):
# back up the existing template fields
for key in build_config.copy():
if key not in default_keys:
existing_fields[key] = build_config.pop(key)
for i in range(1, field_value_int + 1):
key = f"field_{i}_key"
if key in existing_fields:
field = existing_fields[key]
build_config[key] = field
else:
field = DictInput(
display_name=f"Field {i}",
name=key,
info=f"Key for field {i}.",
input_types=["Message", "Data"],
)
build_config[field.name] = field.to_dict()
build_config["number_of_fields"]["value"] = field_value_int
return build_config
async def build_data(self) -> Data:
data = self.get_data()
return_data = Data(data=data, text_key=self.text_key)
self.status = return_data
if self.text_key_validator:
self.validate_text_key()
return return_data
def get_data(self):
"""Function to get the Data from the attributes."""
data = {}
for value_dict in self._attributes.values():
if isinstance(value_dict, dict):
# Check if the value of the value_dict is a Data
value_dict_ = {
key: value.get_text() if isinstance(value, Data) else value for key, value in value_dict.items()
}
data.update(value_dict_)
return data
def validate_text_key(self) -> None:
"""This function validates that the Text Key is one of the keys in the Data."""
data_keys = self.get_data().keys()
if self.text_key not in data_keys and self.text_key != "":
formatted_data_keys = ", ".join(data_keys)
msg = f"Text Key: '{self.text_key}' not found in the Data keys: '{formatted_data_keys}'"
raise ValueError(msg)
DataFrame operations
This component performs various operations on Pandas DataFrames. Here’s an overview of the available operations and their inputs:
Operation | Description | Required Inputs |
---|---|---|
Add Column |
Adds a new column with a constant value |
new_column_name, new_column_value |
Drop Column |
Removes a specified column |
column_name |
Filter |
Filters rows based on column value |
column_name, filter_value |
Head |
Returns first n rows |
num_rows |
Rename Column |
Renames an existing column |
column_name, new_column_name |
Replace Value |
Replaces values in a column |
column_name, replace_value, replacement_value |
Select Columns |
Selects specific columns |
columns_to_select |
Sort |
Sorts DataFrame by column |
column_name, ascending |
Tail |
Returns last n rows |
num_rows |
Parameters
Name | Display Name | Info |
---|---|---|
df |
DataFrame |
The input DataFrame to operate on. |
operation |
Operation |
Select the DataFrame operation to perform. Options: Add Column, Drop Column, Filter, Head, Rename Column, Replace Value, Select Columns, Sort, Tail |
column_name |
Column Name |
The column name to use for the operation. |
filter_value |
Filter Value |
The value to filter rows by. |
ascending |
Sort Ascending |
Whether to sort in ascending order. |
new_column_name |
New Column Name |
The new column name when renaming or adding a column. |
new_column_value |
New Column Value |
The value to populate the new column with. |
columns_to_select |
Columns to Select |
List of column names to select. |
num_rows |
Number of Rows |
Number of rows to return (for head/tail). Default: 5 |
replace_value |
Value to Replace |
The value to replace in the column. |
replacement_value |
Replacement Value |
The value to replace with. |
Name | Display Name | Info |
---|---|---|
output |
DataFrame |
The resulting DataFrame after the operation. |
Component code
dataframe_operations.py
from langflow.custom import Component
from langflow.io import BoolInput, DataFrameInput, DropdownInput, IntInput, MessageTextInput, Output, StrInput
from langflow.schema import DataFrame
class DataFrameOperationsComponent(Component):
display_name = "DataFrame Operations"
description = "Perform various operations on a DataFrame."
icon = "table"
# Available operations
OPERATION_CHOICES = [
"Add Column",
"Drop Column",
"Filter",
"Head",
"Rename Column",
"Replace Value",
"Select Columns",
"Sort",
"Tail",
]
inputs = [
DataFrameInput(
name="df",
display_name="DataFrame",
info="The input DataFrame to operate on.",
),
DropdownInput(
name="operation",
display_name="Operation",
options=OPERATION_CHOICES,
info="Select the DataFrame operation to perform.",
real_time_refresh=True,
),
StrInput(
name="column_name",
display_name="Column Name",
info="The column name to use for the operation.",
dynamic=True,
show=False,
),
MessageTextInput(
name="filter_value",
display_name="Filter Value",
info="The value to filter rows by.",
dynamic=True,
show=False,
),
BoolInput(
name="ascending",
display_name="Sort Ascending",
info="Whether to sort in ascending order.",
dynamic=True,
show=False,
value=True,
),
StrInput(
name="new_column_name",
display_name="New Column Name",
info="The new column name when renaming or adding a column.",
dynamic=True,
show=False,
),
MessageTextInput(
name="new_column_value",
display_name="New Column Value",
info="The value to populate the new column with.",
dynamic=True,
show=False,
),
StrInput(
name="columns_to_select",
display_name="Columns to Select",
dynamic=True,
is_list=True,
show=False,
),
IntInput(
name="num_rows",
display_name="Number of Rows",
info="Number of rows to return (for head/tail).",
dynamic=True,
show=False,
value=5,
),
MessageTextInput(
name="replace_value",
display_name="Value to Replace",
info="The value to replace in the column.",
dynamic=True,
show=False,
),
MessageTextInput(
name="replacement_value",
display_name="Replacement Value",
info="The value to replace with.",
dynamic=True,
show=False,
),
]
outputs = [
Output(
display_name="DataFrame",
name="output",
method="perform_operation",
info="The resulting DataFrame after the operation.",
)
]
def update_build_config(self, build_config, field_value, field_name=None):
# Hide all dynamic fields by default
dynamic_fields = [
"column_name",
"filter_value",
"ascending",
"new_column_name",
"new_column_value",
"columns_to_select",
"num_rows",
"replace_value",
"replacement_value",
]
for field in dynamic_fields:
build_config[field]["show"] = False
# Show relevant fields based on the selected operation
if field_name == "operation":
if field_value == "Filter":
build_config["column_name"]["show"] = True
build_config["filter_value"]["show"] = True
elif field_value == "Sort":
build_config["column_name"]["show"] = True
build_config["ascending"]["show"] = True
elif field_value == "Drop Column":
build_config["column_name"]["show"] = True
elif field_value == "Rename Column":
build_config["column_name"]["show"] = True
build_config["new_column_name"]["show"] = True
elif field_value == "Add Column":
build_config["new_column_name"]["show"] = True
build_config["new_column_value"]["show"] = True
elif field_value == "Select Columns":
build_config["columns_to_select"]["show"] = True
elif field_value in {"Head", "Tail"}:
build_config["num_rows"]["show"] = True
elif field_value == "Replace Value":
build_config["column_name"]["show"] = True
build_config["replace_value"]["show"] = True
build_config["replacement_value"]["show"] = True
return build_config
def perform_operation(self) -> DataFrame:
dataframe_copy = self.df.copy()
operation = self.operation
if operation == "Filter":
return self.filter_rows_by_value(dataframe_copy)
if operation == "Sort":
return self.sort_by_column(dataframe_copy)
if operation == "Drop Column":
return self.drop_column(dataframe_copy)
if operation == "Rename Column":
return self.rename_column(dataframe_copy)
if operation == "Add Column":
return self.add_column(dataframe_copy)
if operation == "Select Columns":
return self.select_columns(dataframe_copy)
if operation == "Head":
return self.head(dataframe_copy)
if operation == "Tail":
return self.tail(dataframe_copy)
if operation == "Replace Value":
return self.replace_values(dataframe_copy)
msg = f"Unsupported operation: {operation}"
raise ValueError(msg)
# Existing methods
def filter_rows_by_value(self, df: DataFrame) -> DataFrame:
return DataFrame(df[df[self.column_name] == self.filter_value])
def sort_by_column(self, df: DataFrame) -> DataFrame:
return DataFrame(df.sort_values(by=self.column_name, ascending=self.ascending))
def drop_column(self, df: DataFrame) -> DataFrame:
return DataFrame(df.drop(columns=[self.column_name]))
def rename_column(self, df: DataFrame) -> DataFrame:
return DataFrame(df.rename(columns={self.column_name: self.new_column_name}))
def add_column(self, df: DataFrame) -> DataFrame:
df[self.new_column_name] = [self.new_column_value] * len(df)
return DataFrame(df)
def select_columns(self, df: DataFrame) -> DataFrame:
columns = [col.strip() for col in self.columns_to_select]
return DataFrame(df[columns])
# New methods
def head(self, df: DataFrame) -> DataFrame:
return DataFrame(df.head(self.num_rows))
def tail(self, df: DataFrame) -> DataFrame:
return DataFrame(df.tail(self.num_rows))
def replace_values(self, df: DataFrame) -> DataFrame:
df[self.column_name] = df[self.column_name].replace(self.replace_value, self.replacement_value)
return DataFrame(df)
Data to message
Prior to Langflow version 1.1.3, this component was named Parse Data. |
This component converts data objects into plain text using a specified template.
Parameters
Name | Display Name | Info |
---|---|---|
data |
data |
The data to convert to text. |
template |
Template |
The template to use for formatting the data. It can contain the keys {text}, {data} or any other key in the data. |
sep |
Separator |
The separator to use between multiple data items. |
Name | Display Name | Info |
---|---|---|
text |
Text |
The resulting formatted text string as a message object. |
Component code
parse_data.py
from langflow.custom import Component
from langflow.helpers.data import data_to_text, data_to_text_list
from langflow.io import DataInput, MultilineInput, Output, StrInput
from langflow.schema import Data
from langflow.schema.message import Message
class ParseDataComponent(Component):
display_name = "Data to Message"
description = "Convert Data objects into Messages using any {field_name} from input data."
icon = "message-square"
name = "ParseData"
metadata = {
"legacy_name": "Parse Data",
}
inputs = [
DataInput(
name="data",
display_name="Data",
info="The data to convert to text.",
is_list=True,
required=True,
),
MultilineInput(
name="template",
display_name="Template",
info="The template to use for formatting the data. "
"It can contain the keys {text}, {data} or any other key in the Data.",
value="{text}",
required=True,
),
StrInput(name="sep", display_name="Separator", advanced=True, value="\n"),
]
outputs = [
Output(
display_name="Message",
name="text",
info="Data as a single Message, with each input Data separated by Separator",
method="parse_data",
),
Output(
display_name="Data List",
name="data_list",
info="Data as a list of new Data, each having `text` formatted by Template",
method="parse_data_as_list",
),
]
def _clean_args(self) -> tuple[list[Data], str, str]:
data = self.data if isinstance(self.data, list) else [self.data]
template = self.template
sep = self.sep
return data, template, sep
def parse_data(self) -> Message:
data, template, sep = self._clean_args()
result_string = data_to_text(template, data, sep)
self.status = result_string
return Message(text=result_string)
def parse_data_as_list(self) -> list[Data]:
data, template, _ = self._clean_args()
text_list, data_list = data_to_text_list(template, data)
for item, text in zip(data_list, text_list, strict=True):
item.set_text(text)
self.status = data_list
return data_list
Filter data
This component is in Beta as of Langflow version 1.1.3, and is not yet fully supported. |
This component filters a data object based on a list of specified keys. This component allows for selective extraction of data from a data object, retaining only the key-value pairs that match the provided filter criteria.
Parameters
Name | Display Name | Info |
---|---|---|
data |
data |
data object to filter |
filter_criteria |
Filter Criteria |
List of keys to filter by. |
Name | Display Name | Info |
---|---|---|
filtered_data |
Filtered data |
The resulting filtered data object. |
Component code
filter_data.py
from langflow.custom import Component
from langflow.io import DataInput, MessageTextInput, Output
from langflow.schema import Data
class FilterDataComponent(Component):
display_name = "Filter Data"
description = "Filters a Data object based on a list of keys."
icon = "filter"
beta = True
name = "FilterData"
inputs = [
DataInput(
name="data",
display_name="Data",
info="Data object to filter.",
),
MessageTextInput(
name="filter_criteria",
display_name="Filter Criteria",
info="List of keys to filter by.",
is_list=True,
),
]
outputs = [
Output(display_name="Filtered Data", name="filtered_data", method="filter_data"),
]
def filter_data(self) -> Data:
filter_criteria: list[str] = self.filter_criteria
data = self.data.data if isinstance(self.data, Data) else {}
# Filter the data
filtered = {key: value for key, value in data.items() if key in filter_criteria}
# Create a new Data object with the filtered data
filtered_data = Data(data=filtered)
self.status = filtered_data
return filtered_data
Filter Values
This component is in Beta as of Langflow version 1.1.3, and is not yet fully supported. |
This component filters a list of data items based on a specified key, filter value, and comparison operator.
Parameters
Name | Display Name | Info |
---|---|---|
input_data |
Input data |
The list of data items to filter. |
filter_key |
Filter Key |
The key to filter on (for example, 'route'). |
filter_value |
Filter Value |
The value to filter by (for example, 'CMIP'). |
operator |
Comparison Operator |
The operator to apply for comparing the values. |
Name | Display Name | Info |
---|---|---|
filtered_data |
Filtered data |
The resulting list of filtered data items. |
Component code
filter_data_values.py
from typing import Any
from langflow.custom import Component
from langflow.io import DataInput, DropdownInput, MessageTextInput, Output
from langflow.schema import Data
class DataFilterComponent(Component):
display_name = "Filter Values"
description = (
"Filter a list of data items based on a specified key, filter value,"
" and comparison operator. Check advanced options to select match comparision."
)
icon = "filter"
beta = True
name = "FilterDataValues"
inputs = [
DataInput(name="input_data", display_name="Input Data", info="The list of data items to filter.", is_list=True),
MessageTextInput(
name="filter_key",
display_name="Filter Key",
info="The key to filter on (e.g., 'route').",
value="route",
input_types=["Data"],
),
MessageTextInput(
name="filter_value",
display_name="Filter Value",
info="The value to filter by (e.g., 'CMIP').",
value="CMIP",
input_types=["Data"],
),
DropdownInput(
name="operator",
display_name="Comparison Operator",
options=["equals", "not equals", "contains", "starts with", "ends with"],
info="The operator to apply for comparing the values.",
value="equals",
advanced=True,
),
]
outputs = [
Output(display_name="Filtered Data", name="filtered_data", method="filter_data"),
]
def compare_values(self, item_value: Any, filter_value: str, operator: str) -> bool:
if operator == "equals":
return str(item_value) == filter_value
if operator == "not equals":
return str(item_value) != filter_value
if operator == "contains":
return filter_value in str(item_value)
if operator == "starts with":
return str(item_value).startswith(filter_value)
if operator == "ends with":
return str(item_value).endswith(filter_value)
return False
def filter_data(self) -> list[Data]:
# Extract inputs
input_data: list[Data] = self.input_data
filter_key: str = self.filter_key.text
filter_value: str = self.filter_value.text
operator: str = self.operator
# Validate inputs
if not input_data:
self.status = "Input data is empty."
return []
if not filter_key or not filter_value:
self.status = "Filter key or value is missing."
return input_data
# Filter the data
filtered_data = []
for item in input_data:
if isinstance(item.data, dict) and filter_key in item.data:
if self.compare_values(item.data[filter_key], filter_value, operator):
filtered_data.append(item)
else:
self.status = f"Warning: Some items don't have the key '{filter_key}' or are not dictionaries."
self.status = filtered_data
return filtered_data
JSON Cleaner
This component cleans JSON strings to ensure they are fully compliant with the JSON specification.
Parameters
Name | Display Name | Info |
---|---|---|
json_str |
JSON String |
The JSON string to be cleaned. This can be a raw, potentially malformed JSON string produced by language models or other sources that may not fully comply with JSON specifications. |
remove_control_chars |
Remove Control Characters |
If set to True, this option removes control characters (ASCII characters 0-31 and 127) from the JSON string. This can help eliminate invisible characters that might cause parsing issues or make the JSON invalid. |
normalize_unicode |
Normalize Unicode |
When enabled, this option normalizes Unicode characters in the JSON string to their canonical composition form (NFC). This ensures consistent representation of Unicode characters across different systems and prevents potential issues with character encoding. |
validate_json |
Validate JSON |
If set to True, this option attempts to parse the JSON string to ensure it is well-formed before applying the final repair operation. It raises a ValueError if the JSON is invalid, allowing for early detection of major structural issues in the JSON. |
Name | Display Name | Info |
---|---|---|
output |
Cleaned JSON String |
The resulting cleaned, repaired, and validated JSON string that fully complies with the JSON specification. |
Component code
json_cleaner.py
import json
import unicodedata
from langflow.custom import Component
from langflow.inputs import BoolInput, MessageTextInput
from langflow.schema.message import Message
from langflow.template import Output
class JSONCleaner(Component):
icon = "braces"
display_name = "JSON Cleaner"
description = (
"Cleans the messy and sometimes incorrect JSON strings produced by LLMs "
"so that they are fully compliant with the JSON spec."
)
inputs = [
MessageTextInput(
name="json_str", display_name="JSON String", info="The JSON string to be cleaned.", required=True
),
BoolInput(
name="remove_control_chars",
display_name="Remove Control Characters",
info="Remove control characters from the JSON string.",
required=False,
),
BoolInput(
name="normalize_unicode",
display_name="Normalize Unicode",
info="Normalize Unicode characters in the JSON string.",
required=False,
),
BoolInput(
name="validate_json",
display_name="Validate JSON",
info="Validate the JSON string to ensure it is well-formed.",
required=False,
),
]
outputs = [
Output(display_name="Cleaned JSON String", name="output", method="clean_json"),
]
def clean_json(self) -> Message:
try:
from json_repair import repair_json
except ImportError as e:
msg = "Could not import the json_repair package. Please install it with `pip install json_repair`."
raise ImportError(msg) from e
"""Clean the input JSON string based on provided options and return the cleaned JSON string."""
json_str = self.json_str
remove_control_chars = self.remove_control_chars
normalize_unicode = self.normalize_unicode
validate_json = self.validate_json
start = json_str.find("{")
end = json_str.rfind("}")
if start == -1 or end == -1:
msg = "Invalid JSON string: Missing '{' or '}'"
raise ValueError(msg)
try:
json_str = json_str[start : end + 1]
if remove_control_chars:
json_str = self._remove_control_characters(json_str)
if normalize_unicode:
json_str = self._normalize_unicode(json_str)
if validate_json:
json_str = self._validate_json(json_str)
cleaned_json_str = repair_json(json_str)
result = str(cleaned_json_str)
self.status = result
return Message(text=result)
except Exception as e:
msg = f"Error cleaning JSON string: {e}"
raise ValueError(msg) from e
def _remove_control_characters(self, s: str) -> str:
"""Remove control characters from the string."""
return s.translate(self.translation_table)
def _normalize_unicode(self, s: str) -> str:
"""Normalize Unicode characters in the string."""
return unicodedata.normalize("NFC", s)
def _validate_json(self, s: str) -> str:
"""Validate the JSON string."""
try:
json.loads(s)
except json.JSONDecodeError as e:
msg = f"Invalid JSON string: {e}"
raise ValueError(msg) from e
return s
def __init__(self, *args, **kwargs):
# Create a translation table that maps control characters to None
super().__init__(*args, **kwargs)
self.translation_table = str.maketrans("", "", "".join(chr(i) for i in range(32)) + chr(127))
LLM Router
This component routes requests to the most appropriate LLM based on the OpenRouter model specifications.
Parameters
Name | Display Name | Info |
---|---|---|
models |
Language Models |
List of LLMs to route between. |
input_value |
Input |
The input message to be routed. |
judge_llm |
Judge LLM |
LLM that will evaluate and select the most appropriate model. |
optimization |
Optimization |
Optimization preference (quality/speed/cost/balanced). |
Name | Display Name | Info |
---|---|---|
output |
Output |
The response from the selected model. |
selected_model |
Selected Model |
Name of the chosen model. |
Component code
llm_router.py
import json
import requests
from langflow.base.models.chat_result import get_chat_result
from langflow.base.models.model_utils import get_model_name
from langflow.custom import Component
from langflow.io import DropdownInput, HandleInput, Output
from langflow.schema.message import Message
class LLMRouterComponent(Component):
display_name = "LLM Router"
description = "Routes the input to the most appropriate LLM based on OpenRouter model specifications"
icon = "git-branch"
inputs = [
HandleInput(
name="models",
display_name="Language Models",
input_types=["LanguageModel"],
required=True,
is_list=True,
info="List of LLMs to route between",
),
HandleInput(
name="input_value",
display_name="Input",
input_types=["Message"],
info="The input message to be routed",
),
HandleInput(
name="judge_llm",
display_name="Judge LLM",
input_types=["LanguageModel"],
info="LLM that will evaluate and select the most appropriate model",
),
DropdownInput(
name="optimization",
display_name="Optimization",
options=["quality", "speed", "cost", "balanced"],
value="balanced",
info="Optimization preference for model selection",
),
]
outputs = [
Output(display_name="Output", name="output", method="route_to_model"),
Output(
display_name="Selected Model",
name="selected_model",
method="get_selected_model",
required_inputs=["output"],
),
]
_selected_model_name: str | None = None
def get_selected_model(self) -> str:
return self._selected_model_name or ""
def _get_model_specs(self, model_name: str) -> str:
"""Fetch specific model information from OpenRouter API."""
http_success = 200
base_info = f"Model: {model_name}\n"
# Remove any special characters and spaces, keep only alphanumeric
clean_name = "".join(c.lower() for c in model_name if c.isalnum())
url = f"https://openrouter.ai/api/v1/models/{clean_name}/endpoints"
try:
response = requests.get(url, timeout=10)
except requests.exceptions.RequestException as e:
return base_info + f"Error fetching specs: {e!s}"
if response.status_code != http_success:
return base_info + "No specifications available"
try:
data = response.json().get("data", {})
except (json.JSONDecodeError, requests.exceptions.JSONDecodeError):
return base_info + "Error parsing response data"
# Extract relevant information
context_length = data.get("context_length", "Unknown")
max_completion_tokens = data.get("max_completion_tokens", "Unknown")
architecture = data.get("architecture", {})
tokenizer = architecture.get("tokenizer", "Unknown")
instruct_type = architecture.get("instruct_type", "Unknown")
pricing = data.get("pricing", {})
prompt_price = pricing.get("prompt", "Unknown")
completion_price = pricing.get("completion", "Unknown")
description = data.get("description", "No description available")
created = data.get("created", "Unknown")
return f"""
Model: {model_name}
Description: {description}
Context Length: {context_length} tokens
Max Completion Tokens: {max_completion_tokens}
Tokenizer: {tokenizer}
Instruct Type: {instruct_type}
Pricing: ${prompt_price}/1k tokens (prompt), ${completion_price}/1k tokens (completion)
Created: {created}
"""
MISSING_INPUTS_MSG = "Missing required inputs: models, input_value, or judge_llm"
async def route_to_model(self) -> Message:
if not self.models or not self.input_value or not self.judge_llm:
raise ValueError(self.MISSING_INPUTS_MSG)
system_prompt = {
"role": "system",
"content": (
"You are a model selection expert. Analyze the input and select the most "
"appropriate model based on:\n"
"1. Task complexity and requirements\n"
"2. Context length needed\n"
"3. Model capabilities\n"
"4. Cost considerations\n"
"5. Speed requirements\n\n"
"Consider the detailed model specifications provided and the user's "
"optimization preference. Return only the index number (0-based) of the best model."
),
}
# Create list of available models with their detailed specs
models_info = []
for i, model in enumerate(self.models):
model_name = get_model_name(model)
model_specs = self._get_model_specs(model_name)
models_info.append(f"=== Model {i} ===\n{model_specs}")
models_str = "\n\n".join(models_info)
user_message = {
"role": "user",
"content": f"""Available Models with Specifications:\n{models_str}\n
Optimization Preference: {self.optimization}\n
Input Query: "{self.input_value.text}"\n
Based on the model specifications and optimization preference,
select the most appropriate model (return only the index number):""",
}
try:
# Get judge's decision
response = await self.judge_llm.ainvoke([system_prompt, user_message])
try:
selected_index = int(response.content.strip())
if 0 <= selected_index < len(self.models):
chosen_model = self.models[selected_index]
self._selected_model_name = get_model_name(chosen_model)
else:
chosen_model = self.models[0]
self._selected_model_name = get_model_name(chosen_model)
except ValueError:
chosen_model = self.models[0]
self._selected_model_name = get_model_name(chosen_model)
# Get response from chosen model
return get_chat_result(
runnable=chosen_model,
input_value=self.input_value,
)
except (RuntimeError, ValueError) as e:
self.status = f"Error: {e!s}"
# Fallback to first model
chosen_model = self.models[0]
self._selected_model_name = get_model_name(chosen_model)
return get_chat_result(
runnable=chosen_model,
input_value=self.input_value,
)
Message to data
This component converts a message object to a data object.
Parameters
Name | Display Name | Info |
---|---|---|
message |
message |
The message object to convert to a data object. |
Name | Display Name | Info |
---|---|---|
data |
data |
The resulting data object converted from the input message. |
Component code
message_to_data.py
from loguru import logger
from langflow.custom import Component
from langflow.io import MessageInput, Output
from langflow.schema import Data
from langflow.schema.message import Message
class MessageToDataComponent(Component):
display_name = "Message to Data"
description = "Convert a Message object to a Data object"
icon = "message-square-share"
beta = True
name = "MessagetoData"
inputs = [
MessageInput(
name="message",
display_name="Message",
info="The Message object to convert to a Data object",
),
]
outputs = [
Output(display_name="Data", name="data", method="convert_message_to_data"),
]
def convert_message_to_data(self) -> Data:
if isinstance(self.message, Message):
# Convert Message to Data
return Data(data=self.message.data)
msg = "Error converting Message to Data: Input must be a Message object"
logger.opt(exception=True).debug(msg)
self.status = msg
return Data(data={"error": msg})
Parse DataFrame
This component converts a DataFrame into plain text following a specified template. Each column in the DataFrame is treated as a possible template key, for example, {col_name}
.
Parameters
Name | Display Name | Info |
---|---|---|
df |
DataFrame |
The DataFrame to convert to text rows. |
template |
Template |
The template for formatting each row. Use placeholders matching column names in the DataFrame, for example, |
sep |
Separator |
String that joins all row texts when building the single Text output. |
Name | Display Name | Info |
---|---|---|
text |
Text |
All rows combined into a single text, each row formatted by the template and separated by the separator value defined in |
Component code
parse_dataframe.py
from langflow.custom import Component
from langflow.io import DataFrameInput, MultilineInput, Output, StrInput
from langflow.schema.message import Message
class ParseDataFrameComponent(Component):
display_name = "Parse DataFrame"
description = (
"Convert a DataFrame into plain text following a specified template. "
"Each column in the DataFrame is treated as a possible template key, e.g. {col_name}."
)
icon = "braces"
name = "ParseDataFrame"
inputs = [
DataFrameInput(name="df", display_name="DataFrame", info="The DataFrame to convert to text rows."),
MultilineInput(
name="template",
display_name="Template",
info=(
"The template for formatting each row. "
"Use placeholders matching column names in the DataFrame, for example '{col1}', '{col2}'."
),
value="{text}",
),
StrInput(
name="sep",
display_name="Separator",
advanced=True,
value="\n",
info="String that joins all row texts when building the single Text output.",
),
]
outputs = [
Output(
display_name="Text",
name="text",
info="All rows combined into a single text, each row formatted by the template and separated by `sep`.",
method="parse_data",
),
]
def _clean_args(self):
dataframe = self.df
template = self.template or "{text}"
sep = self.sep or "\n"
return dataframe, template, sep
def parse_data(self) -> Message:
"""Converts each row of the DataFrame into a formatted string using the template.
then joins them with `sep`. Returns a single combined string as a Message.
"""
dataframe, template, sep = self._clean_args()
lines = []
# For each row in the DataFrame, build a dict and format
for _, row in dataframe.iterrows():
row_dict = row.to_dict()
text_line = template.format(**row_dict) # e.g. template="{text}", row_dict={"text": "Hello"}
lines.append(text_line)
# Join all lines with the provided separator
result_string = sep.join(lines)
self.status = result_string # store in self.status for UI logs
return Message(text=result_string)
Parse JSON
This component is in Legacy as of Langflow version 1.1.3. Legacy components can be used in flows, but may not work due to Langflow core updates. |
This component converts and extracts JSON fields using JQ queries.
Parameters
Name | Display Name | Info |
---|---|---|
input_value |
Input |
The data object to filter. It can be a message or data object. |
query |
JQ Query |
JQ Query to filter the data. The input is always a JSON list. |
Name | Display Name | Info |
---|---|---|
filtered_data |
Filtered data |
Filtered data as a list of data objects. |
Component code
parse_json_data.py
import json
from json import JSONDecodeError
import jq
from json_repair import repair_json
from loguru import logger
from langflow.custom import Component
from langflow.inputs import HandleInput, MessageTextInput
from langflow.io import Output
from langflow.schema import Data
from langflow.schema.message import Message
class ParseJSONDataComponent(Component):
display_name = "Parse JSON"
description = "Convert and extract JSON fields."
icon = "braces"
name = "ParseJSONData"
legacy: bool = True
inputs = [
HandleInput(
name="input_value",
display_name="Input",
info="Data object to filter.",
required=True,
input_types=["Message", "Data"],
),
MessageTextInput(
name="query",
display_name="JQ Query",
info="JQ Query to filter the data. The input is always a JSON list.",
required=True,
),
]
outputs = [
Output(display_name="Filtered Data", name="filtered_data", method="filter_data"),
]
def _parse_data(self, input_value) -> str:
if isinstance(input_value, Message) and isinstance(input_value.text, str):
return input_value.text
if isinstance(input_value, Data):
return json.dumps(input_value.data)
return str(input_value)
def filter_data(self) -> list[Data]:
to_filter = self.input_value
if not to_filter:
return []
# Check if input is a list
if isinstance(to_filter, list):
to_filter = [self._parse_data(f) for f in to_filter]
else:
to_filter = self._parse_data(to_filter)
# If input is not a list, don't wrap it in a list
if not isinstance(to_filter, list):
to_filter = repair_json(to_filter)
try:
to_filter_as_dict = json.loads(to_filter)
except JSONDecodeError:
try:
to_filter_as_dict = json.loads(repair_json(to_filter))
except JSONDecodeError as e:
msg = f"Invalid JSON: {e}"
raise ValueError(msg) from e
else:
to_filter = [repair_json(f) for f in to_filter]
to_filter_as_dict = []
for f in to_filter:
try:
to_filter_as_dict.append(json.loads(f))
except JSONDecodeError:
try:
to_filter_as_dict.append(json.loads(repair_json(f)))
except JSONDecodeError as e:
msg = f"Invalid JSON: {e}"
raise ValueError(msg) from e
to_filter = to_filter_as_dict
full_filter_str = json.dumps(to_filter_as_dict)
logger.info("to_filter: ", to_filter)
results = jq.compile(self.query).input_text(full_filter_str).all()
logger.info("results: ", results)
return [Data(data=value) if isinstance(value, dict) else Data(text=str(value)) for value in results]
Select Data
This component is in Legacy as of Langflow version 1.1.3. Legacy components can be used in flows, but may not work due to Langflow core updates. |
This component selects a single data item from a list.
Parameters
Name | Display Name | Info |
---|---|---|
data_list |
Data List |
List of data to select from. |
data_index |
Data Index |
Index of the data to select. |
Name | Display Name | Info |
---|---|---|
selected_data |
Selected Data |
The selected Data object. |
Component code
select_data.py
from langflow.custom import Component
from langflow.field_typing.range_spec import RangeSpec
from langflow.inputs.inputs import DataInput, IntInput
from langflow.io import Output
from langflow.schema import Data
class SelectDataComponent(Component):
display_name: str = "Select Data"
description: str = "Select a single data from a list of data."
name: str = "SelectData"
icon = "prototypes"
legacy = True
inputs = [
DataInput(
name="data_list",
display_name="Data List",
info="List of data to select from.",
is_list=True, # Specify that this input takes a list of Data objects
),
IntInput(
name="data_index",
display_name="Data Index",
info="Index of the data to select.",
value=0, # Will be populated dynamically based on the length of data_list
range_spec=RangeSpec(min=0, max=15, step=1, step_type="int"),
),
]
outputs = [
Output(display_name="Selected Data", name="selected_data", method="select_data"),
]
async def select_data(self) -> Data:
# Retrieve the selected index from the dropdown
selected_index = int(self.data_index)
# Get the data list
# Validate that the selected index is within bounds
if selected_index < 0 or selected_index >= len(self.data_list):
msg = f"Selected index {selected_index} is out of range."
raise ValueError(msg)
# Return the selected Data object
selected_data = self.data_list[selected_index]
self.status = selected_data # Update the component status to reflect the selected data
return selected_data
Split text
This component splits text into chunks based on specified criteria.
Parameters
Name | Display Name | Info |
---|---|---|
data_inputs |
Input Documents |
The data to split. The component accepts |
chunk_overlap |
Chunk Overlap |
The number of characters to overlap between chunks. Default: |
chunk_size |
Chunk Size |
The maximum number of characters in each chunk. Default: |
separator |
Separator |
The character to split on. Default: |
text_key |
Text Key |
The key to use for the text column (advanced). Default: |
Name | Display Name | Info |
---|---|---|
chunks |
Chunks |
List of split text chunks as |
dataframe |
DataFrame |
List of split text chunks as |
Component code
split_text.py
from langchain_text_splitters import CharacterTextSplitter
from langflow.custom import Component
from langflow.io import HandleInput, IntInput, MessageTextInput, Output
from langflow.schema import Data, DataFrame
from langflow.utils.util import unescape_string
class SplitTextComponent(Component):
display_name: str = "Split Text"
description: str = "Split text into chunks based on specified criteria."
icon = "scissors-line-dashed"
name = "SplitText"
inputs = [
HandleInput(
name="data_inputs",
display_name="Input Documents",
info="The data to split.",
input_types=["Data", "DataFrame"],
required=True,
),
IntInput(
name="chunk_overlap",
display_name="Chunk Overlap",
info="Number of characters to overlap between chunks.",
value=200,
),
IntInput(
name="chunk_size",
display_name="Chunk Size",
info="The maximum number of characters in each chunk.",
value=1000,
),
MessageTextInput(
name="separator",
display_name="Separator",
info="The character to split on. Defaults to newline.",
value="\n",
),
MessageTextInput(
name="text_key",
display_name="Text Key",
info="The key to use for the text column.",
value="text",
advanced=True,
),
]
outputs = [
Output(display_name="Chunks", name="chunks", method="split_text"),
Output(display_name="DataFrame", name="dataframe", method="as_dataframe"),
]
def _docs_to_data(self, docs) -> list[Data]:
return [Data(text=doc.page_content, data=doc.metadata) for doc in docs]
def _docs_to_dataframe(self, docs):
data_dicts = [{self.text_key: doc.page_content, **doc.metadata} for doc in docs]
return DataFrame(data_dicts)
def split_text_base(self):
separator = unescape_string(self.separator)
if isinstance(self.data_inputs, DataFrame):
if not len(self.data_inputs):
msg = "DataFrame is empty"
raise TypeError(msg)
self.data_inputs.text_key = self.text_key
try:
documents = self.data_inputs.to_lc_documents()
except Exception as e:
msg = f"Error converting DataFrame to documents: {e}"
raise TypeError(msg) from e
else:
if not self.data_inputs:
msg = "No data inputs provided"
raise TypeError(msg)
documents = []
if isinstance(self.data_inputs, Data):
self.data_inputs.text_key = self.text_key
documents = [self.data_inputs.to_lc_document()]
else:
try:
documents = [input_.to_lc_document() for input_ in self.data_inputs if isinstance(input_, Data)]
if not documents:
msg = f"No valid Data inputs found in {type(self.data_inputs)}"
raise TypeError(msg)
except AttributeError as e:
msg = f"Invalid input type in collection: {e}"
raise TypeError(msg) from e
try:
splitter = CharacterTextSplitter(
chunk_overlap=self.chunk_overlap,
chunk_size=self.chunk_size,
separator=separator,
)
return splitter.split_documents(documents)
except Exception as e:
msg = f"Error splitting text: {e}"
raise TypeError(msg) from e
def split_text(self) -> list[Data]:
return self._docs_to_data(self.split_text_base())
def as_dataframe(self) -> DataFrame:
return self._docs_to_dataframe(self.split_text_base())
Update data
The Update data component dynamically updates or appends data with specified fields.
Parameters
Name | Display Name | Info |
---|---|---|
old_data |
data |
The records to update. It can be a single data object or a list of data objects. |
number_of_fields |
Number of Fields |
Number of fields to be added to the record (range: 1-15). |
text_key |
Text Key |
Key that identifies the field to be used as the text content. |
text_key_validator |
Text Key Validator |
If enabled, checks if the given 'Text Key' is present in the given 'data' object. |
Name | Display Name | Info |
---|---|---|
data |
data |
The resulting updated data objects. |
Component code
update_data.py
from typing import Any
from langflow.custom import Component
from langflow.field_typing.range_spec import RangeSpec
from langflow.inputs.inputs import (
BoolInput,
DataInput,
DictInput,
IntInput,
MessageTextInput,
)
from langflow.io import Output
from langflow.schema import Data
from langflow.schema.dotdict import dotdict
class UpdateDataComponent(Component):
display_name: str = "Update Data"
description: str = "Dynamically update or append data with the specified fields."
name: str = "UpdateData"
MAX_FIELDS = 15 # Define a constant for maximum number of fields
icon = "FolderSync"
inputs = [
DataInput(
name="old_data",
display_name="Data",
info="The record to update.",
is_list=True, # Changed to True to handle list of Data objects
required=True,
),
IntInput(
name="number_of_fields",
display_name="Number of Fields",
info="Number of fields to be added to the record.",
real_time_refresh=True,
value=0,
range_spec=RangeSpec(min=1, max=MAX_FIELDS, step=1, step_type="int"),
),
MessageTextInput(
name="text_key",
display_name="Text Key",
info="Key that identifies the field to be used as the text content.",
advanced=True,
),
BoolInput(
name="text_key_validator",
display_name="Text Key Validator",
advanced=True,
info="If enabled, checks if the given 'Text Key' is present in the given 'Data'.",
),
]
outputs = [
Output(display_name="Data", name="data", method="build_data"),
]
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
"""Update the build configuration when the number of fields changes.
Args:
build_config (dotdict): The current build configuration.
field_value (Any): The new value for the field.
field_name (Optional[str]): The name of the field being updated.
"""
if field_name == "number_of_fields":
default_keys = {
"code",
"_type",
"number_of_fields",
"text_key",
"old_data",
"text_key_validator",
}
try:
field_value_int = int(field_value)
except ValueError:
return build_config
if field_value_int > self.MAX_FIELDS:
build_config["number_of_fields"]["value"] = self.MAX_FIELDS
msg = f"Number of fields cannot exceed {self.MAX_FIELDS}. Try using a Component to combine two Data."
raise ValueError(msg)
existing_fields = {}
# Back up the existing template fields
for key in list(build_config.keys()):
if key not in default_keys:
existing_fields[key] = build_config.pop(key)
for i in range(1, field_value_int + 1):
key = f"field_{i}_key"
if key in existing_fields:
field = existing_fields[key]
build_config[key] = field
else:
field = DictInput(
display_name=f"Field {i}",
name=key,
info=f"Key for field {i}.",
input_types=["Message", "Data"],
)
build_config[field.name] = field.to_dict()
build_config["number_of_fields"]["value"] = field_value_int
return build_config
async def build_data(self) -> Data | list[Data]:
"""Build the updated data by combining the old data with new fields."""
new_data = self.get_data()
if isinstance(self.old_data, list):
for data_item in self.old_data:
if not isinstance(data_item, Data):
continue # Skip invalid items
data_item.data.update(new_data)
if self.text_key:
data_item.text_key = self.text_key
self.validate_text_key(data_item)
self.status = self.old_data
return self.old_data # Returns List[Data]
if isinstance(self.old_data, Data):
self.old_data.data.update(new_data)
if self.text_key:
self.old_data.text_key = self.text_key
self.status = self.old_data
self.validate_text_key(self.old_data)
return self.old_data # Returns Data
msg = "old_data is not a Data object or list of Data objects."
raise ValueError(msg)
def get_data(self):
"""Function to get the Data from the attributes."""
data = {}
default_keys = {
"code",
"_type",
"number_of_fields",
"text_key",
"old_data",
"text_key_validator",
}
for attr_name, attr_value in self._attributes.items():
if attr_name in default_keys:
continue # Skip default attributes
if isinstance(attr_value, dict):
for key, value in attr_value.items():
data[key] = value.get_text() if isinstance(value, Data) else value
elif isinstance(attr_value, Data):
data[attr_name] = attr_value.get_text()
else:
data[attr_name] = attr_value
return data
def validate_text_key(self, data: Data) -> None:
"""This function validates that the Text Key is one of the keys in the Data."""
data_keys = data.data.keys()
if self.text_key and self.text_key not in data_keys:
msg = f"Text Key: '{self.text_key}' not found in the Data keys: {', '.join(data_keys)}"
raise ValueError(msg)