Processing components in Langflow

This Langflow feature is currently in public preview. Development is ongoing, and the features and functionality are subject to change. Langflow, and the use of such, is subject to the DataStax Preview Terms.

Processing components process and transform data within a flow.

Use a processing component in a flow

The Split Text processing component in this flow splits the incoming Data into chunks to be embedded into the vector store component.

The component offers control over chunk size, overlap, and separator, which affect context and granularity in vector store retrieval results.

vector store document ingestion

Alter Metadata

This component modifies metadata of input objects. It can add new metadata, update existing metadata, and remove specified metadata fields. The component works with both Message and Data objects, and can also create a new Data object from user-provided text.

Parameters

Inputs
Name Display Name Info

input_value

Input

Objects to which Metadata should be added.

text_in

User Text

Text input; the value will be in the 'text' attribute of the Data object. Empty text entries are ignored.

metadata

Metadata

Metadata to add to each object.

remove_fields

Fields to Remove

Metadata fields to remove.

Outputs
Name Display Name Info

data

Data

List of Input objects, each with added Metadata.

Component code

alter_metadata.py
from langflow.custom import Component
from langflow.inputs import MessageTextInput
from langflow.io import HandleInput, NestedDictInput, Output, StrInput
from langflow.schema import Data


class AlterMetadataComponent(Component):
    display_name = "Alter Metadata"
    description = "Adds/Removes Metadata Dictionary on inputs"
    icon = "merge"
    name = "AlterMetadata"

    inputs = [
        HandleInput(
            name="input_value",
            display_name="Input",
            info="Object(s) to which Metadata should be added",
            required=False,
            input_types=["Message", "Data"],
            is_list=True,
        ),
        StrInput(
            name="text_in",
            display_name="User Text",
            info="Text input; value will be in 'text' attribute of Data object. Empty text entries are ignored.",
            required=False,
        ),
        NestedDictInput(
            name="metadata",
            display_name="Metadata",
            info="Metadata to add to each object",
            input_types=["Data"],
            required=True,
        ),
        MessageTextInput(
            name="remove_fields",
            display_name="Fields to Remove",
            info="Metadata Fields to Remove",
            required=False,
            is_list=True,
        ),
    ]

    outputs = [
        Output(
            name="data",
            display_name="Data",
            info="List of Input objects each with added Metadata",
            method="process_output",
        ),
    ]

    def _as_clean_dict(self, obj):
        """Convert a Data object or a standard dictionary to a standard dictionary."""
        if isinstance(obj, dict):
            as_dict = obj
        elif isinstance(obj, Data):
            as_dict = obj.data
        else:
            msg = f"Expected a Data object or a dictionary but got {type(obj)}."
            raise TypeError(msg)

        return {k: v for k, v in (as_dict or {}).items() if k and k.strip()}

    def process_output(self) -> list[Data]:
        # Ensure metadata is a dictionary, filtering out any empty keys
        metadata = self._as_clean_dict(self.metadata)

        # Convert text_in to a Data object if it exists, and initialize our list of Data objects
        data_objects = [Data(text=self.text_in)] if self.text_in else []

        # Append existing Data objects from input_value, if any
        if self.input_value:
            data_objects.extend(self.input_value)

        # Update each Data object with the new metadata, preserving existing fields
        for data in data_objects:
            data.data.update(metadata)

        # Handle removal of fields specified in remove_fields
        if self.remove_fields:
            fields_to_remove = {field.strip() for field in self.remove_fields if field.strip()}

            # Remove specified fields from each Data object's metadata
            for data in data_objects:
                data.data = {k: v for k, v in data.data.items() if k not in fields_to_remove}

        # Set the status for tracking/debugging purposes
        self.status = data_objects
        return data_objects

Combine data

Prior to Langflow version 1.1.3, this component was named Merge Data.

This component combines multiple data objects into a unified list of data objects.

Parameters

Inputs
Name Display Name Info

data_inputs

data Inputs

A list of data input objects to be merged.

Outputs
Name Display Name Info

merged_data

Merged data

The resulting list of merged data objects with consistent keys.

Component code

merge_data.py
from enum import Enum
from typing import cast

from loguru import logger

from langflow.custom import Component
from langflow.io import DataInput, DropdownInput, Output
from langflow.schema import DataFrame


class DataOperation(str, Enum):
    CONCATENATE = "Concatenate"
    APPEND = "Append"
    MERGE = "Merge"
    JOIN = "Join"


class MergeDataComponent(Component):
    display_name = "Combine Data"
    description = "Combines data using different operations"
    icon = "merge"
    MIN_INPUTS_REQUIRED = 2

    inputs = [
        DataInput(name="data_inputs", display_name="Data Inputs", info="Data to combine", is_list=True, required=True),
        DropdownInput(
            name="operation",
            display_name="Operation Type",
            options=[op.value for op in DataOperation],
            value=DataOperation.CONCATENATE.value,
        ),
    ]
    outputs = [Output(display_name="DataFrame", name="combined_data", method="combine_data")]

    def combine_data(self) -> DataFrame:
        if not self.data_inputs or len(self.data_inputs) < self.MIN_INPUTS_REQUIRED:
            empty_dataframe = DataFrame()
            self.status = empty_dataframe
            return empty_dataframe

        operation = DataOperation(self.operation)
        try:
            combined_dataframe = self._process_operation(operation)
            self.status = combined_dataframe
        except Exception as e:
            logger.error(f"Error during operation {operation}: {e!s}")
            raise
        else:
            return combined_dataframe

    def _process_operation(self, operation: DataOperation) -> DataFrame:
        if operation == DataOperation.CONCATENATE:
            combined_data: dict[str, str | object] = {}
            for data_input in self.data_inputs:
                for key, value in data_input.data.items():
                    if key in combined_data:
                        if isinstance(combined_data[key], str) and isinstance(value, str):
                            combined_data[key] = f"{combined_data[key]}\n{value}"
                        else:
                            combined_data[key] = value
                    else:
                        combined_data[key] = value
            return DataFrame([combined_data])

        if operation == DataOperation.APPEND:
            rows = [data_input.data for data_input in self.data_inputs]
            return DataFrame(rows)

        if operation == DataOperation.MERGE:
            result_data: dict[str, str | list[str] | object] = {}
            for data_input in self.data_inputs:
                for key, value in data_input.data.items():
                    if key in result_data and isinstance(value, str):
                        if isinstance(result_data[key], list):
                            cast("list[str]", result_data[key]).append(value)
                        else:
                            result_data[key] = [result_data[key], value]
                    else:
                        result_data[key] = value
            return DataFrame([result_data])

        if operation == DataOperation.JOIN:
            combined_data = {}
            for idx, data_input in enumerate(self.data_inputs, 1):
                for key, value in data_input.data.items():
                    new_key = f"{key}_doc{idx}" if idx > 1 else key
                    combined_data[new_key] = value
            return DataFrame([combined_data])

        return DataFrame()

Combine Text

This component concatenates two text sources into a single text chunk using a specified delimiter.

Parameters

Inputs
Name Display Name Info

first_text

First Text

The first text input to concatenate.

second_text

Second Text

The second text input to concatenate.

delimiter

Delimiter

A string used to separate the two text inputs. Defaults to a space.

Outputs
Name Display Name Info

message

Message

A Message object containing the combined text.

Component code

combine_text.py
from langflow.custom import Component
from langflow.io import MessageTextInput, Output
from langflow.schema.message import Message


class CombineTextComponent(Component):
    display_name = "Combine Text"
    description = "Concatenate two text sources into a single text chunk using a specified delimiter."
    icon = "merge"
    name = "CombineText"

    inputs = [
        MessageTextInput(
            name="text1",
            display_name="First Text",
            info="The first text input to concatenate.",
        ),
        MessageTextInput(
            name="text2",
            display_name="Second Text",
            info="The second text input to concatenate.",
        ),
        MessageTextInput(
            name="delimiter",
            display_name="Delimiter",
            info="A string used to separate the two text inputs. Defaults to a whitespace.",
            value=" ",
        ),
    ]

    outputs = [
        Output(display_name="Combined Text", name="combined_text", method="combine_texts"),
    ]

    def combine_texts(self) -> Message:
        combined = self.delimiter.join([self.text1, self.text2])
        self.status = combined
        return Message(text=combined)

Create data

This component is in Legacy as of Langflow version 1.1.3. Legacy components can be used in flows, but may not work due to Langflow core updates.

This component dynamically creates a Data object with a specified number of fields.

Parameters

Inputs
Name Display Name Info

number_of_fields

Number of Fields

The number of fields to be added to the record.

text_key

Text Key

Key that identifies the field to be used as the text content.

text_key_validator

Text Key Validator

If enabled, checks if the given 'Text Key' is present in the given 'Data'.

Outputs
Name Display Name Info

data

Data

A Data object created with the specified fields and text key.

Component code

create_data.py
from typing import Any

from langflow.custom import Component
from langflow.field_typing.range_spec import RangeSpec
from langflow.inputs.inputs import BoolInput, DictInput, IntInput, MessageTextInput
from langflow.io import Output
from langflow.schema import Data
from langflow.schema.dotdict import dotdict


class CreateDataComponent(Component):
    display_name: str = "Create Data"
    description: str = "Dynamically create a Data with a specified number of fields."
    name: str = "CreateData"
    MAX_FIELDS = 15  # Define a constant for maximum number of fields
    legacy = True
    icon = "ListFilter"

    inputs = [
        IntInput(
            name="number_of_fields",
            display_name="Number of Fields",
            info="Number of fields to be added to the record.",
            real_time_refresh=True,
            value=1,
            range_spec=RangeSpec(min=1, max=MAX_FIELDS, step=1, step_type="int"),
        ),
        MessageTextInput(
            name="text_key",
            display_name="Text Key",
            info="Key that identifies the field to be used as the text content.",
            advanced=True,
        ),
        BoolInput(
            name="text_key_validator",
            display_name="Text Key Validator",
            advanced=True,
            info="If enabled, checks if the given 'Text Key' is present in the given 'Data'.",
        ),
    ]

    outputs = [
        Output(display_name="Data", name="data", method="build_data"),
    ]

    def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
        if field_name == "number_of_fields":
            default_keys = ["code", "_type", "number_of_fields", "text_key", "text_key_validator"]
            try:
                field_value_int = int(field_value)
            except ValueError:
                return build_config
            existing_fields = {}
            if field_value_int > self.MAX_FIELDS:
                build_config["number_of_fields"]["value"] = self.MAX_FIELDS
                msg = (
                    f"Number of fields cannot exceed {self.MAX_FIELDS}. "
                    "Please adjust the number of fields to be within the allowed limit."
                )
                raise ValueError(msg)
            if len(build_config) > len(default_keys):
                # back up the existing template fields
                for key in build_config.copy():
                    if key not in default_keys:
                        existing_fields[key] = build_config.pop(key)

            for i in range(1, field_value_int + 1):
                key = f"field_{i}_key"
                if key in existing_fields:
                    field = existing_fields[key]
                    build_config[key] = field
                else:
                    field = DictInput(
                        display_name=f"Field {i}",
                        name=key,
                        info=f"Key for field {i}.",
                        input_types=["Message", "Data"],
                    )
                    build_config[field.name] = field.to_dict()

            build_config["number_of_fields"]["value"] = field_value_int
        return build_config

    async def build_data(self) -> Data:
        data = self.get_data()
        return_data = Data(data=data, text_key=self.text_key)
        self.status = return_data
        if self.text_key_validator:
            self.validate_text_key()
        return return_data

    def get_data(self):
        """Function to get the Data from the attributes."""
        data = {}
        for value_dict in self._attributes.values():
            if isinstance(value_dict, dict):
                # Check if the value of the value_dict is a Data
                value_dict_ = {
                    key: value.get_text() if isinstance(value, Data) else value for key, value in value_dict.items()
                }
                data.update(value_dict_)
        return data

    def validate_text_key(self) -> None:
        """This function validates that the Text Key is one of the keys in the Data."""
        data_keys = self.get_data().keys()
        if self.text_key not in data_keys and self.text_key != "":
            formatted_data_keys = ", ".join(data_keys)
            msg = f"Text Key: '{self.text_key}' not found in the Data keys: '{formatted_data_keys}'"
            raise ValueError(msg)

DataFrame operations

This component performs various operations on Pandas DataFrames. Here’s an overview of the available operations and their inputs:

Operations
Operation Description Required Inputs

Add Column

Adds a new column with a constant value

new_column_name, new_column_value

Drop Column

Removes a specified column

column_name

Filter

Filters rows based on column value

column_name, filter_value

Head

Returns first n rows

num_rows

Rename Column

Renames an existing column

column_name, new_column_name

Replace Value

Replaces values in a column

column_name, replace_value, replacement_value

Select Columns

Selects specific columns

columns_to_select

Sort

Sorts DataFrame by column

column_name, ascending

Tail

Returns last n rows

num_rows

Parameters

Inputs
Name Display Name Info

df

DataFrame

The input DataFrame to operate on.

operation

Operation

Select the DataFrame operation to perform. Options: Add Column, Drop Column, Filter, Head, Rename Column, Replace Value, Select Columns, Sort, Tail

column_name

Column Name

The column name to use for the operation.

filter_value

Filter Value

The value to filter rows by.

ascending

Sort Ascending

Whether to sort in ascending order.

new_column_name

New Column Name

The new column name when renaming or adding a column.

new_column_value

New Column Value

The value to populate the new column with.

columns_to_select

Columns to Select

List of column names to select.

num_rows

Number of Rows

Number of rows to return (for head/tail). Default: 5

replace_value

Value to Replace

The value to replace in the column.

replacement_value

Replacement Value

The value to replace with.

Outputs
Name Display Name Info

output

DataFrame

The resulting DataFrame after the operation.

Component code

dataframe_operations.py
from langflow.custom import Component
from langflow.io import BoolInput, DataFrameInput, DropdownInput, IntInput, MessageTextInput, Output, StrInput
from langflow.schema import DataFrame


class DataFrameOperationsComponent(Component):
    display_name = "DataFrame Operations"
    description = "Perform various operations on a DataFrame."
    icon = "table"

    # Available operations
    OPERATION_CHOICES = [
        "Add Column",
        "Drop Column",
        "Filter",
        "Head",
        "Rename Column",
        "Replace Value",
        "Select Columns",
        "Sort",
        "Tail",
    ]

    inputs = [
        DataFrameInput(
            name="df",
            display_name="DataFrame",
            info="The input DataFrame to operate on.",
        ),
        DropdownInput(
            name="operation",
            display_name="Operation",
            options=OPERATION_CHOICES,
            info="Select the DataFrame operation to perform.",
            real_time_refresh=True,
        ),
        StrInput(
            name="column_name",
            display_name="Column Name",
            info="The column name to use for the operation.",
            dynamic=True,
            show=False,
        ),
        MessageTextInput(
            name="filter_value",
            display_name="Filter Value",
            info="The value to filter rows by.",
            dynamic=True,
            show=False,
        ),
        BoolInput(
            name="ascending",
            display_name="Sort Ascending",
            info="Whether to sort in ascending order.",
            dynamic=True,
            show=False,
            value=True,
        ),
        StrInput(
            name="new_column_name",
            display_name="New Column Name",
            info="The new column name when renaming or adding a column.",
            dynamic=True,
            show=False,
        ),
        MessageTextInput(
            name="new_column_value",
            display_name="New Column Value",
            info="The value to populate the new column with.",
            dynamic=True,
            show=False,
        ),
        StrInput(
            name="columns_to_select",
            display_name="Columns to Select",
            dynamic=True,
            is_list=True,
            show=False,
        ),
        IntInput(
            name="num_rows",
            display_name="Number of Rows",
            info="Number of rows to return (for head/tail).",
            dynamic=True,
            show=False,
            value=5,
        ),
        MessageTextInput(
            name="replace_value",
            display_name="Value to Replace",
            info="The value to replace in the column.",
            dynamic=True,
            show=False,
        ),
        MessageTextInput(
            name="replacement_value",
            display_name="Replacement Value",
            info="The value to replace with.",
            dynamic=True,
            show=False,
        ),
    ]

    outputs = [
        Output(
            display_name="DataFrame",
            name="output",
            method="perform_operation",
            info="The resulting DataFrame after the operation.",
        )
    ]

    def update_build_config(self, build_config, field_value, field_name=None):
        # Hide all dynamic fields by default
        dynamic_fields = [
            "column_name",
            "filter_value",
            "ascending",
            "new_column_name",
            "new_column_value",
            "columns_to_select",
            "num_rows",
            "replace_value",
            "replacement_value",
        ]
        for field in dynamic_fields:
            build_config[field]["show"] = False

        # Show relevant fields based on the selected operation
        if field_name == "operation":
            if field_value == "Filter":
                build_config["column_name"]["show"] = True
                build_config["filter_value"]["show"] = True
            elif field_value == "Sort":
                build_config["column_name"]["show"] = True
                build_config["ascending"]["show"] = True
            elif field_value == "Drop Column":
                build_config["column_name"]["show"] = True
            elif field_value == "Rename Column":
                build_config["column_name"]["show"] = True
                build_config["new_column_name"]["show"] = True
            elif field_value == "Add Column":
                build_config["new_column_name"]["show"] = True
                build_config["new_column_value"]["show"] = True
            elif field_value == "Select Columns":
                build_config["columns_to_select"]["show"] = True
            elif field_value in {"Head", "Tail"}:
                build_config["num_rows"]["show"] = True
            elif field_value == "Replace Value":
                build_config["column_name"]["show"] = True
                build_config["replace_value"]["show"] = True
                build_config["replacement_value"]["show"] = True

        return build_config

    def perform_operation(self) -> DataFrame:
        dataframe_copy = self.df.copy()
        operation = self.operation

        if operation == "Filter":
            return self.filter_rows_by_value(dataframe_copy)
        if operation == "Sort":
            return self.sort_by_column(dataframe_copy)
        if operation == "Drop Column":
            return self.drop_column(dataframe_copy)
        if operation == "Rename Column":
            return self.rename_column(dataframe_copy)
        if operation == "Add Column":
            return self.add_column(dataframe_copy)
        if operation == "Select Columns":
            return self.select_columns(dataframe_copy)
        if operation == "Head":
            return self.head(dataframe_copy)
        if operation == "Tail":
            return self.tail(dataframe_copy)
        if operation == "Replace Value":
            return self.replace_values(dataframe_copy)
        msg = f"Unsupported operation: {operation}"

        raise ValueError(msg)

    # Existing methods
    def filter_rows_by_value(self, df: DataFrame) -> DataFrame:
        return DataFrame(df[df[self.column_name] == self.filter_value])

    def sort_by_column(self, df: DataFrame) -> DataFrame:
        return DataFrame(df.sort_values(by=self.column_name, ascending=self.ascending))

    def drop_column(self, df: DataFrame) -> DataFrame:
        return DataFrame(df.drop(columns=[self.column_name]))

    def rename_column(self, df: DataFrame) -> DataFrame:
        return DataFrame(df.rename(columns={self.column_name: self.new_column_name}))

    def add_column(self, df: DataFrame) -> DataFrame:
        df[self.new_column_name] = [self.new_column_value] * len(df)
        return DataFrame(df)

    def select_columns(self, df: DataFrame) -> DataFrame:
        columns = [col.strip() for col in self.columns_to_select]
        return DataFrame(df[columns])

    # New methods
    def head(self, df: DataFrame) -> DataFrame:
        return DataFrame(df.head(self.num_rows))

    def tail(self, df: DataFrame) -> DataFrame:
        return DataFrame(df.tail(self.num_rows))

    def replace_values(self, df: DataFrame) -> DataFrame:
        df[self.column_name] = df[self.column_name].replace(self.replace_value, self.replacement_value)
        return DataFrame(df)

Data to message

Prior to Langflow version 1.1.3, this component was named Parse Data.

This component converts data objects into plain text using a specified template.

Parameters

Inputs
Name Display Name Info

data

data

The data to convert to text.

template

Template

The template to use for formatting the data. It can contain the keys {text}, {data} or any other key in the data.

sep

Separator

The separator to use between multiple data items.

Outputs
Name Display Name Info

text

Text

The resulting formatted text string as a message object.

Component code

parse_data.py
from langflow.custom import Component
from langflow.helpers.data import data_to_text, data_to_text_list
from langflow.io import DataInput, MultilineInput, Output, StrInput
from langflow.schema import Data
from langflow.schema.message import Message


class ParseDataComponent(Component):
    display_name = "Data to Message"
    description = "Convert Data objects into Messages using any {field_name} from input data."
    icon = "message-square"
    name = "ParseData"
    metadata = {
        "legacy_name": "Parse Data",
    }

    inputs = [
        DataInput(
            name="data",
            display_name="Data",
            info="The data to convert to text.",
            is_list=True,
            required=True,
        ),
        MultilineInput(
            name="template",
            display_name="Template",
            info="The template to use for formatting the data. "
            "It can contain the keys {text}, {data} or any other key in the Data.",
            value="{text}",
            required=True,
        ),
        StrInput(name="sep", display_name="Separator", advanced=True, value="\n"),
    ]

    outputs = [
        Output(
            display_name="Message",
            name="text",
            info="Data as a single Message, with each input Data separated by Separator",
            method="parse_data",
        ),
        Output(
            display_name="Data List",
            name="data_list",
            info="Data as a list of new Data, each having `text` formatted by Template",
            method="parse_data_as_list",
        ),
    ]

    def _clean_args(self) -> tuple[list[Data], str, str]:
        data = self.data if isinstance(self.data, list) else [self.data]
        template = self.template
        sep = self.sep
        return data, template, sep

    def parse_data(self) -> Message:
        data, template, sep = self._clean_args()
        result_string = data_to_text(template, data, sep)
        self.status = result_string
        return Message(text=result_string)

    def parse_data_as_list(self) -> list[Data]:
        data, template, _ = self._clean_args()
        text_list, data_list = data_to_text_list(template, data)
        for item, text in zip(data_list, text_list, strict=True):
            item.set_text(text)
        self.status = data_list
        return data_list

Filter data

This component is in Beta as of Langflow version 1.1.3, and is not yet fully supported.

This component filters a data object based on a list of specified keys. This component allows for selective extraction of data from a data object, retaining only the key-value pairs that match the provided filter criteria.

Parameters

Inputs
Name Display Name Info

data

data

data object to filter

filter_criteria

Filter Criteria

List of keys to filter by.

Outputs
Name Display Name Info

filtered_data

Filtered data

The resulting filtered data object.

Component code

filter_data.py
from langflow.custom import Component
from langflow.io import DataInput, MessageTextInput, Output
from langflow.schema import Data


class FilterDataComponent(Component):
    display_name = "Filter Data"
    description = "Filters a Data object based on a list of keys."
    icon = "filter"
    beta = True
    name = "FilterData"

    inputs = [
        DataInput(
            name="data",
            display_name="Data",
            info="Data object to filter.",
        ),
        MessageTextInput(
            name="filter_criteria",
            display_name="Filter Criteria",
            info="List of keys to filter by.",
            is_list=True,
        ),
    ]

    outputs = [
        Output(display_name="Filtered Data", name="filtered_data", method="filter_data"),
    ]

    def filter_data(self) -> Data:
        filter_criteria: list[str] = self.filter_criteria
        data = self.data.data if isinstance(self.data, Data) else {}

        # Filter the data
        filtered = {key: value for key, value in data.items() if key in filter_criteria}

        # Create a new Data object with the filtered data
        filtered_data = Data(data=filtered)
        self.status = filtered_data
        return filtered_data

Filter Values

This component is in Beta as of Langflow version 1.1.3, and is not yet fully supported.

This component filters a list of data items based on a specified key, filter value, and comparison operator.

Parameters

Inputs
Name Display Name Info

input_data

Input data

The list of data items to filter.

filter_key

Filter Key

The key to filter on (for example, 'route').

filter_value

Filter Value

The value to filter by (for example, 'CMIP').

operator

Comparison Operator

The operator to apply for comparing the values.

Outputs
Name Display Name Info

filtered_data

Filtered data

The resulting list of filtered data items.

Component code

filter_data_values.py
from typing import Any

from langflow.custom import Component
from langflow.io import DataInput, DropdownInput, MessageTextInput, Output
from langflow.schema import Data


class DataFilterComponent(Component):
    display_name = "Filter Values"
    description = (
        "Filter a list of data items based on a specified key, filter value,"
        " and comparison operator. Check advanced options to select match comparision."
    )
    icon = "filter"
    beta = True
    name = "FilterDataValues"

    inputs = [
        DataInput(name="input_data", display_name="Input Data", info="The list of data items to filter.", is_list=True),
        MessageTextInput(
            name="filter_key",
            display_name="Filter Key",
            info="The key to filter on (e.g., 'route').",
            value="route",
            input_types=["Data"],
        ),
        MessageTextInput(
            name="filter_value",
            display_name="Filter Value",
            info="The value to filter by (e.g., 'CMIP').",
            value="CMIP",
            input_types=["Data"],
        ),
        DropdownInput(
            name="operator",
            display_name="Comparison Operator",
            options=["equals", "not equals", "contains", "starts with", "ends with"],
            info="The operator to apply for comparing the values.",
            value="equals",
            advanced=True,
        ),
    ]

    outputs = [
        Output(display_name="Filtered Data", name="filtered_data", method="filter_data"),
    ]

    def compare_values(self, item_value: Any, filter_value: str, operator: str) -> bool:
        if operator == "equals":
            return str(item_value) == filter_value
        if operator == "not equals":
            return str(item_value) != filter_value
        if operator == "contains":
            return filter_value in str(item_value)
        if operator == "starts with":
            return str(item_value).startswith(filter_value)
        if operator == "ends with":
            return str(item_value).endswith(filter_value)
        return False

    def filter_data(self) -> list[Data]:
        # Extract inputs
        input_data: list[Data] = self.input_data
        filter_key: str = self.filter_key.text
        filter_value: str = self.filter_value.text
        operator: str = self.operator

        # Validate inputs
        if not input_data:
            self.status = "Input data is empty."
            return []

        if not filter_key or not filter_value:
            self.status = "Filter key or value is missing."
            return input_data

        # Filter the data
        filtered_data = []
        for item in input_data:
            if isinstance(item.data, dict) and filter_key in item.data:
                if self.compare_values(item.data[filter_key], filter_value, operator):
                    filtered_data.append(item)
            else:
                self.status = f"Warning: Some items don't have the key '{filter_key}' or are not dictionaries."

        self.status = filtered_data
        return filtered_data

JSON Cleaner

This component cleans JSON strings to ensure they are fully compliant with the JSON specification.

Parameters

Inputs
Name Display Name Info

json_str

JSON String

The JSON string to be cleaned. This can be a raw, potentially malformed JSON string produced by language models or other sources that may not fully comply with JSON specifications.

remove_control_chars

Remove Control Characters

If set to True, this option removes control characters (ASCII characters 0-31 and 127) from the JSON string. This can help eliminate invisible characters that might cause parsing issues or make the JSON invalid.

normalize_unicode

Normalize Unicode

When enabled, this option normalizes Unicode characters in the JSON string to their canonical composition form (NFC). This ensures consistent representation of Unicode characters across different systems and prevents potential issues with character encoding.

validate_json

Validate JSON

If set to True, this option attempts to parse the JSON string to ensure it is well-formed before applying the final repair operation. It raises a ValueError if the JSON is invalid, allowing for early detection of major structural issues in the JSON.

Outputs
Name Display Name Info

output

Cleaned JSON String

The resulting cleaned, repaired, and validated JSON string that fully complies with the JSON specification.

Component code

json_cleaner.py
import json
import unicodedata

from langflow.custom import Component
from langflow.inputs import BoolInput, MessageTextInput
from langflow.schema.message import Message
from langflow.template import Output


class JSONCleaner(Component):
    icon = "braces"
    display_name = "JSON Cleaner"
    description = (
        "Cleans the messy and sometimes incorrect JSON strings produced by LLMs "
        "so that they are fully compliant with the JSON spec."
    )

    inputs = [
        MessageTextInput(
            name="json_str", display_name="JSON String", info="The JSON string to be cleaned.", required=True
        ),
        BoolInput(
            name="remove_control_chars",
            display_name="Remove Control Characters",
            info="Remove control characters from the JSON string.",
            required=False,
        ),
        BoolInput(
            name="normalize_unicode",
            display_name="Normalize Unicode",
            info="Normalize Unicode characters in the JSON string.",
            required=False,
        ),
        BoolInput(
            name="validate_json",
            display_name="Validate JSON",
            info="Validate the JSON string to ensure it is well-formed.",
            required=False,
        ),
    ]

    outputs = [
        Output(display_name="Cleaned JSON String", name="output", method="clean_json"),
    ]

    def clean_json(self) -> Message:
        try:
            from json_repair import repair_json
        except ImportError as e:
            msg = "Could not import the json_repair package. Please install it with `pip install json_repair`."
            raise ImportError(msg) from e

        """Clean the input JSON string based on provided options and return the cleaned JSON string."""
        json_str = self.json_str
        remove_control_chars = self.remove_control_chars
        normalize_unicode = self.normalize_unicode
        validate_json = self.validate_json

        start = json_str.find("{")
        end = json_str.rfind("}")
        if start == -1 or end == -1:
            msg = "Invalid JSON string: Missing '{' or '}'"
            raise ValueError(msg)
        try:
            json_str = json_str[start : end + 1]

            if remove_control_chars:
                json_str = self._remove_control_characters(json_str)
            if normalize_unicode:
                json_str = self._normalize_unicode(json_str)
            if validate_json:
                json_str = self._validate_json(json_str)

            cleaned_json_str = repair_json(json_str)
            result = str(cleaned_json_str)

            self.status = result
            return Message(text=result)
        except Exception as e:
            msg = f"Error cleaning JSON string: {e}"
            raise ValueError(msg) from e

    def _remove_control_characters(self, s: str) -> str:
        """Remove control characters from the string."""
        return s.translate(self.translation_table)

    def _normalize_unicode(self, s: str) -> str:
        """Normalize Unicode characters in the string."""
        return unicodedata.normalize("NFC", s)

    def _validate_json(self, s: str) -> str:
        """Validate the JSON string."""
        try:
            json.loads(s)
        except json.JSONDecodeError as e:
            msg = f"Invalid JSON string: {e}"
            raise ValueError(msg) from e
        return s

    def __init__(self, *args, **kwargs):
        # Create a translation table that maps control characters to None
        super().__init__(*args, **kwargs)
        self.translation_table = str.maketrans("", "", "".join(chr(i) for i in range(32)) + chr(127))

LLM Router

This component routes requests to the most appropriate LLM based on the OpenRouter model specifications.

Parameters

Inputs
Name Display Name Info

models

Language Models

List of LLMs to route between.

input_value

Input

The input message to be routed.

judge_llm

Judge LLM

LLM that will evaluate and select the most appropriate model.

optimization

Optimization

Optimization preference (quality/speed/cost/balanced).

Outputs
Name Display Name Info

output

Output

The response from the selected model.

selected_model

Selected Model

Name of the chosen model.

Component code

llm_router.py
import json

import requests

from langflow.base.models.chat_result import get_chat_result
from langflow.base.models.model_utils import get_model_name
from langflow.custom import Component
from langflow.io import DropdownInput, HandleInput, Output
from langflow.schema.message import Message


class LLMRouterComponent(Component):
    display_name = "LLM Router"
    description = "Routes the input to the most appropriate LLM based on OpenRouter model specifications"
    icon = "git-branch"

    inputs = [
        HandleInput(
            name="models",
            display_name="Language Models",
            input_types=["LanguageModel"],
            required=True,
            is_list=True,
            info="List of LLMs to route between",
        ),
        HandleInput(
            name="input_value",
            display_name="Input",
            input_types=["Message"],
            info="The input message to be routed",
        ),
        HandleInput(
            name="judge_llm",
            display_name="Judge LLM",
            input_types=["LanguageModel"],
            info="LLM that will evaluate and select the most appropriate model",
        ),
        DropdownInput(
            name="optimization",
            display_name="Optimization",
            options=["quality", "speed", "cost", "balanced"],
            value="balanced",
            info="Optimization preference for model selection",
        ),
    ]

    outputs = [
        Output(display_name="Output", name="output", method="route_to_model"),
        Output(
            display_name="Selected Model",
            name="selected_model",
            method="get_selected_model",
            required_inputs=["output"],
        ),
    ]

    _selected_model_name: str | None = None

    def get_selected_model(self) -> str:
        return self._selected_model_name or ""

    def _get_model_specs(self, model_name: str) -> str:
        """Fetch specific model information from OpenRouter API."""
        http_success = 200
        base_info = f"Model: {model_name}\n"

        # Remove any special characters and spaces, keep only alphanumeric
        clean_name = "".join(c.lower() for c in model_name if c.isalnum())
        url = f"https://openrouter.ai/api/v1/models/{clean_name}/endpoints"

        try:
            response = requests.get(url, timeout=10)
        except requests.exceptions.RequestException as e:
            return base_info + f"Error fetching specs: {e!s}"

        if response.status_code != http_success:
            return base_info + "No specifications available"

        try:
            data = response.json().get("data", {})
        except (json.JSONDecodeError, requests.exceptions.JSONDecodeError):
            return base_info + "Error parsing response data"

        # Extract relevant information
        context_length = data.get("context_length", "Unknown")
        max_completion_tokens = data.get("max_completion_tokens", "Unknown")
        architecture = data.get("architecture", {})
        tokenizer = architecture.get("tokenizer", "Unknown")
        instruct_type = architecture.get("instruct_type", "Unknown")

        pricing = data.get("pricing", {})
        prompt_price = pricing.get("prompt", "Unknown")
        completion_price = pricing.get("completion", "Unknown")

        description = data.get("description", "No description available")
        created = data.get("created", "Unknown")

        return f"""
Model: {model_name}
Description: {description}
Context Length: {context_length} tokens
Max Completion Tokens: {max_completion_tokens}
Tokenizer: {tokenizer}
Instruct Type: {instruct_type}
Pricing: ${prompt_price}/1k tokens (prompt), ${completion_price}/1k tokens (completion)
Created: {created}
"""

    MISSING_INPUTS_MSG = "Missing required inputs: models, input_value, or judge_llm"

    async def route_to_model(self) -> Message:
        if not self.models or not self.input_value or not self.judge_llm:
            raise ValueError(self.MISSING_INPUTS_MSG)

        system_prompt = {
            "role": "system",
            "content": (
                "You are a model selection expert. Analyze the input and select the most "
                "appropriate model based on:\n"
                "1. Task complexity and requirements\n"
                "2. Context length needed\n"
                "3. Model capabilities\n"
                "4. Cost considerations\n"
                "5. Speed requirements\n\n"
                "Consider the detailed model specifications provided and the user's "
                "optimization preference. Return only the index number (0-based) of the best model."
            ),
        }

        # Create list of available models with their detailed specs
        models_info = []
        for i, model in enumerate(self.models):
            model_name = get_model_name(model)
            model_specs = self._get_model_specs(model_name)
            models_info.append(f"=== Model {i} ===\n{model_specs}")

        models_str = "\n\n".join(models_info)

        user_message = {
            "role": "user",
            "content": f"""Available Models with Specifications:\n{models_str}\n
            Optimization Preference: {self.optimization}\n
            Input Query: "{self.input_value.text}"\n
            Based on the model specifications and optimization preference,
            select the most appropriate model (return only the index number):""",
        }

        try:
            # Get judge's decision
            response = await self.judge_llm.ainvoke([system_prompt, user_message])

            try:
                selected_index = int(response.content.strip())
                if 0 <= selected_index < len(self.models):
                    chosen_model = self.models[selected_index]
                    self._selected_model_name = get_model_name(chosen_model)
                else:
                    chosen_model = self.models[0]
                    self._selected_model_name = get_model_name(chosen_model)
            except ValueError:
                chosen_model = self.models[0]
                self._selected_model_name = get_model_name(chosen_model)

            # Get response from chosen model
            return get_chat_result(
                runnable=chosen_model,
                input_value=self.input_value,
            )

        except (RuntimeError, ValueError) as e:
            self.status = f"Error: {e!s}"
            # Fallback to first model
            chosen_model = self.models[0]
            self._selected_model_name = get_model_name(chosen_model)
            return get_chat_result(
                runnable=chosen_model,
                input_value=self.input_value,
            )

Message to data

This component converts a message object to a data object.

Parameters

Inputs
Name Display Name Info

message

message

The message object to convert to a data object.

Outputs
Name Display Name Info

data

data

The resulting data object converted from the input message.

Component code

message_to_data.py
from loguru import logger

from langflow.custom import Component
from langflow.io import MessageInput, Output
from langflow.schema import Data
from langflow.schema.message import Message


class MessageToDataComponent(Component):
    display_name = "Message to Data"
    description = "Convert a Message object to a Data object"
    icon = "message-square-share"
    beta = True
    name = "MessagetoData"

    inputs = [
        MessageInput(
            name="message",
            display_name="Message",
            info="The Message object to convert to a Data object",
        ),
    ]

    outputs = [
        Output(display_name="Data", name="data", method="convert_message_to_data"),
    ]

    def convert_message_to_data(self) -> Data:
        if isinstance(self.message, Message):
            # Convert Message to Data
            return Data(data=self.message.data)

        msg = "Error converting Message to Data: Input must be a Message object"
        logger.opt(exception=True).debug(msg)
        self.status = msg
        return Data(data={"error": msg})

Parse DataFrame

This component converts a DataFrame into plain text following a specified template. Each column in the DataFrame is treated as a possible template key, for example, {col_name}.

Parameters

Inputs
Name Display Name Info

df

DataFrame

The DataFrame to convert to text rows.

template

Template

The template for formatting each row. Use placeholders matching column names in the DataFrame, for example, {col1}, {col2}.

sep

Separator

String that joins all row texts when building the single Text output.

Outputs
Name Display Name Info

text

Text

All rows combined into a single text, each row formatted by the template and separated by the separator value defined in sep.

Component code

parse_dataframe.py
from langflow.custom import Component
from langflow.io import DataFrameInput, MultilineInput, Output, StrInput
from langflow.schema.message import Message


class ParseDataFrameComponent(Component):
    display_name = "Parse DataFrame"
    description = (
        "Convert a DataFrame into plain text following a specified template. "
        "Each column in the DataFrame is treated as a possible template key, e.g. {col_name}."
    )
    icon = "braces"
    name = "ParseDataFrame"

    inputs = [
        DataFrameInput(name="df", display_name="DataFrame", info="The DataFrame to convert to text rows."),
        MultilineInput(
            name="template",
            display_name="Template",
            info=(
                "The template for formatting each row. "
                "Use placeholders matching column names in the DataFrame, for example '{col1}', '{col2}'."
            ),
            value="{text}",
        ),
        StrInput(
            name="sep",
            display_name="Separator",
            advanced=True,
            value="\n",
            info="String that joins all row texts when building the single Text output.",
        ),
    ]

    outputs = [
        Output(
            display_name="Text",
            name="text",
            info="All rows combined into a single text, each row formatted by the template and separated by `sep`.",
            method="parse_data",
        ),
    ]

    def _clean_args(self):
        dataframe = self.df
        template = self.template or "{text}"
        sep = self.sep or "\n"
        return dataframe, template, sep

    def parse_data(self) -> Message:
        """Converts each row of the DataFrame into a formatted string using the template.

        then joins them with `sep`. Returns a single combined string as a Message.
        """
        dataframe, template, sep = self._clean_args()

        lines = []
        # For each row in the DataFrame, build a dict and format
        for _, row in dataframe.iterrows():
            row_dict = row.to_dict()
            text_line = template.format(**row_dict)  # e.g. template="{text}", row_dict={"text": "Hello"}
            lines.append(text_line)

        # Join all lines with the provided separator
        result_string = sep.join(lines)
        self.status = result_string  # store in self.status for UI logs
        return Message(text=result_string)

Parse JSON

This component is in Legacy as of Langflow version 1.1.3. Legacy components can be used in flows, but may not work due to Langflow core updates.

This component converts and extracts JSON fields using JQ queries.

Parameters

Inputs
Name Display Name Info

input_value

Input

The data object to filter. It can be a message or data object.

query

JQ Query

JQ Query to filter the data. The input is always a JSON list.

Outputs
Name Display Name Info

filtered_data

Filtered data

Filtered data as a list of data objects.

Component code

parse_json_data.py
import json
from json import JSONDecodeError

import jq
from json_repair import repair_json
from loguru import logger

from langflow.custom import Component
from langflow.inputs import HandleInput, MessageTextInput
from langflow.io import Output
from langflow.schema import Data
from langflow.schema.message import Message


class ParseJSONDataComponent(Component):
    display_name = "Parse JSON"
    description = "Convert and extract JSON fields."
    icon = "braces"
    name = "ParseJSONData"
    legacy: bool = True

    inputs = [
        HandleInput(
            name="input_value",
            display_name="Input",
            info="Data object to filter.",
            required=True,
            input_types=["Message", "Data"],
        ),
        MessageTextInput(
            name="query",
            display_name="JQ Query",
            info="JQ Query to filter the data. The input is always a JSON list.",
            required=True,
        ),
    ]

    outputs = [
        Output(display_name="Filtered Data", name="filtered_data", method="filter_data"),
    ]

    def _parse_data(self, input_value) -> str:
        if isinstance(input_value, Message) and isinstance(input_value.text, str):
            return input_value.text
        if isinstance(input_value, Data):
            return json.dumps(input_value.data)
        return str(input_value)

    def filter_data(self) -> list[Data]:
        to_filter = self.input_value
        if not to_filter:
            return []
        # Check if input is a list
        if isinstance(to_filter, list):
            to_filter = [self._parse_data(f) for f in to_filter]
        else:
            to_filter = self._parse_data(to_filter)

        # If input is not a list, don't wrap it in a list
        if not isinstance(to_filter, list):
            to_filter = repair_json(to_filter)
            try:
                to_filter_as_dict = json.loads(to_filter)
            except JSONDecodeError:
                try:
                    to_filter_as_dict = json.loads(repair_json(to_filter))
                except JSONDecodeError as e:
                    msg = f"Invalid JSON: {e}"
                    raise ValueError(msg) from e
        else:
            to_filter = [repair_json(f) for f in to_filter]
            to_filter_as_dict = []
            for f in to_filter:
                try:
                    to_filter_as_dict.append(json.loads(f))
                except JSONDecodeError:
                    try:
                        to_filter_as_dict.append(json.loads(repair_json(f)))
                    except JSONDecodeError as e:
                        msg = f"Invalid JSON: {e}"
                        raise ValueError(msg) from e
            to_filter = to_filter_as_dict

        full_filter_str = json.dumps(to_filter_as_dict)

        logger.info("to_filter: ", to_filter)

        results = jq.compile(self.query).input_text(full_filter_str).all()
        logger.info("results: ", results)
        return [Data(data=value) if isinstance(value, dict) else Data(text=str(value)) for value in results]

Select Data

This component is in Legacy as of Langflow version 1.1.3. Legacy components can be used in flows, but may not work due to Langflow core updates.

This component selects a single data item from a list.

Parameters

Inputs
Name Display Name Info

data_list

Data List

List of data to select from.

data_index

Data Index

Index of the data to select.

Outputs
Name Display Name Info

selected_data

Selected Data

The selected Data object.

Component code

select_data.py
from langflow.custom import Component
from langflow.field_typing.range_spec import RangeSpec
from langflow.inputs.inputs import DataInput, IntInput
from langflow.io import Output
from langflow.schema import Data


class SelectDataComponent(Component):
    display_name: str = "Select Data"
    description: str = "Select a single data from a list of data."
    name: str = "SelectData"
    icon = "prototypes"
    legacy = True

    inputs = [
        DataInput(
            name="data_list",
            display_name="Data List",
            info="List of data to select from.",
            is_list=True,  # Specify that this input takes a list of Data objects
        ),
        IntInput(
            name="data_index",
            display_name="Data Index",
            info="Index of the data to select.",
            value=0,  # Will be populated dynamically based on the length of data_list
            range_spec=RangeSpec(min=0, max=15, step=1, step_type="int"),
        ),
    ]

    outputs = [
        Output(display_name="Selected Data", name="selected_data", method="select_data"),
    ]

    async def select_data(self) -> Data:
        # Retrieve the selected index from the dropdown
        selected_index = int(self.data_index)
        # Get the data list

        # Validate that the selected index is within bounds
        if selected_index < 0 or selected_index >= len(self.data_list):
            msg = f"Selected index {selected_index} is out of range."
            raise ValueError(msg)

        # Return the selected Data object
        selected_data = self.data_list[selected_index]
        self.status = selected_data  # Update the component status to reflect the selected data
        return selected_data

Split text

This component splits text into chunks based on specified criteria.

Parameters

Inputs
Name Display Name Info

data_inputs

Input Documents

The data to split. The component accepts Data or DataFrame objects.

chunk_overlap

Chunk Overlap

The number of characters to overlap between chunks. Default: 200.

chunk_size

Chunk Size

The maximum number of characters in each chunk. Default: 1000.

separator

Separator

The character to split on. Default: newline.

text_key

Text Key

The key to use for the text column (advanced). Default: text.

Outputs
Name Display Name Info

chunks

Chunks

List of split text chunks as Data objects.

dataframe

DataFrame

List of split text chunks as DataFrame objects.

Component code

split_text.py
from langchain_text_splitters import CharacterTextSplitter

from langflow.custom import Component
from langflow.io import HandleInput, IntInput, MessageTextInput, Output
from langflow.schema import Data, DataFrame
from langflow.utils.util import unescape_string


class SplitTextComponent(Component):
    display_name: str = "Split Text"
    description: str = "Split text into chunks based on specified criteria."
    icon = "scissors-line-dashed"
    name = "SplitText"

    inputs = [
        HandleInput(
            name="data_inputs",
            display_name="Input Documents",
            info="The data to split.",
            input_types=["Data", "DataFrame"],
            required=True,
        ),
        IntInput(
            name="chunk_overlap",
            display_name="Chunk Overlap",
            info="Number of characters to overlap between chunks.",
            value=200,
        ),
        IntInput(
            name="chunk_size",
            display_name="Chunk Size",
            info="The maximum number of characters in each chunk.",
            value=1000,
        ),
        MessageTextInput(
            name="separator",
            display_name="Separator",
            info="The character to split on. Defaults to newline.",
            value="\n",
        ),
        MessageTextInput(
            name="text_key",
            display_name="Text Key",
            info="The key to use for the text column.",
            value="text",
            advanced=True,
        ),
    ]

    outputs = [
        Output(display_name="Chunks", name="chunks", method="split_text"),
        Output(display_name="DataFrame", name="dataframe", method="as_dataframe"),
    ]

    def _docs_to_data(self, docs) -> list[Data]:
        return [Data(text=doc.page_content, data=doc.metadata) for doc in docs]

    def _docs_to_dataframe(self, docs):
        data_dicts = [{self.text_key: doc.page_content, **doc.metadata} for doc in docs]
        return DataFrame(data_dicts)

    def split_text_base(self):
        separator = unescape_string(self.separator)
        if isinstance(self.data_inputs, DataFrame):
            if not len(self.data_inputs):
                msg = "DataFrame is empty"
                raise TypeError(msg)

            self.data_inputs.text_key = self.text_key
            try:
                documents = self.data_inputs.to_lc_documents()
            except Exception as e:
                msg = f"Error converting DataFrame to documents: {e}"
                raise TypeError(msg) from e
        else:
            if not self.data_inputs:
                msg = "No data inputs provided"
                raise TypeError(msg)

            documents = []
            if isinstance(self.data_inputs, Data):
                self.data_inputs.text_key = self.text_key
                documents = [self.data_inputs.to_lc_document()]
            else:
                try:
                    documents = [input_.to_lc_document() for input_ in self.data_inputs if isinstance(input_, Data)]
                    if not documents:
                        msg = f"No valid Data inputs found in {type(self.data_inputs)}"
                        raise TypeError(msg)
                except AttributeError as e:
                    msg = f"Invalid input type in collection: {e}"
                    raise TypeError(msg) from e
        try:
            splitter = CharacterTextSplitter(
                chunk_overlap=self.chunk_overlap,
                chunk_size=self.chunk_size,
                separator=separator,
            )
            return splitter.split_documents(documents)
        except Exception as e:
            msg = f"Error splitting text: {e}"
            raise TypeError(msg) from e

    def split_text(self) -> list[Data]:
        return self._docs_to_data(self.split_text_base())

    def as_dataframe(self) -> DataFrame:
        return self._docs_to_dataframe(self.split_text_base())

Update data

The Update data component dynamically updates or appends data with specified fields.

Parameters

Inputs
Name Display Name Info

old_data

data

The records to update. It can be a single data object or a list of data objects.

number_of_fields

Number of Fields

Number of fields to be added to the record (range: 1-15).

text_key

Text Key

Key that identifies the field to be used as the text content.

text_key_validator

Text Key Validator

If enabled, checks if the given 'Text Key' is present in the given 'data' object.

Outputs
Name Display Name Info

data

data

The resulting updated data objects.

Component code

update_data.py
from typing import Any

from langflow.custom import Component
from langflow.field_typing.range_spec import RangeSpec
from langflow.inputs.inputs import (
    BoolInput,
    DataInput,
    DictInput,
    IntInput,
    MessageTextInput,
)
from langflow.io import Output
from langflow.schema import Data
from langflow.schema.dotdict import dotdict


class UpdateDataComponent(Component):
    display_name: str = "Update Data"
    description: str = "Dynamically update or append data with the specified fields."
    name: str = "UpdateData"
    MAX_FIELDS = 15  # Define a constant for maximum number of fields
    icon = "FolderSync"

    inputs = [
        DataInput(
            name="old_data",
            display_name="Data",
            info="The record to update.",
            is_list=True,  # Changed to True to handle list of Data objects
            required=True,
        ),
        IntInput(
            name="number_of_fields",
            display_name="Number of Fields",
            info="Number of fields to be added to the record.",
            real_time_refresh=True,
            value=0,
            range_spec=RangeSpec(min=1, max=MAX_FIELDS, step=1, step_type="int"),
        ),
        MessageTextInput(
            name="text_key",
            display_name="Text Key",
            info="Key that identifies the field to be used as the text content.",
            advanced=True,
        ),
        BoolInput(
            name="text_key_validator",
            display_name="Text Key Validator",
            advanced=True,
            info="If enabled, checks if the given 'Text Key' is present in the given 'Data'.",
        ),
    ]

    outputs = [
        Output(display_name="Data", name="data", method="build_data"),
    ]

    def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
        """Update the build configuration when the number of fields changes.

        Args:
            build_config (dotdict): The current build configuration.
            field_value (Any): The new value for the field.
            field_name (Optional[str]): The name of the field being updated.
        """
        if field_name == "number_of_fields":
            default_keys = {
                "code",
                "_type",
                "number_of_fields",
                "text_key",
                "old_data",
                "text_key_validator",
            }
            try:
                field_value_int = int(field_value)
            except ValueError:
                return build_config

            if field_value_int > self.MAX_FIELDS:
                build_config["number_of_fields"]["value"] = self.MAX_FIELDS
                msg = f"Number of fields cannot exceed {self.MAX_FIELDS}. Try using a Component to combine two Data."
                raise ValueError(msg)

            existing_fields = {}
            # Back up the existing template fields
            for key in list(build_config.keys()):
                if key not in default_keys:
                    existing_fields[key] = build_config.pop(key)

            for i in range(1, field_value_int + 1):
                key = f"field_{i}_key"
                if key in existing_fields:
                    field = existing_fields[key]
                    build_config[key] = field
                else:
                    field = DictInput(
                        display_name=f"Field {i}",
                        name=key,
                        info=f"Key for field {i}.",
                        input_types=["Message", "Data"],
                    )
                    build_config[field.name] = field.to_dict()

            build_config["number_of_fields"]["value"] = field_value_int
        return build_config

    async def build_data(self) -> Data | list[Data]:
        """Build the updated data by combining the old data with new fields."""
        new_data = self.get_data()
        if isinstance(self.old_data, list):
            for data_item in self.old_data:
                if not isinstance(data_item, Data):
                    continue  # Skip invalid items
                data_item.data.update(new_data)
                if self.text_key:
                    data_item.text_key = self.text_key
                self.validate_text_key(data_item)
            self.status = self.old_data
            return self.old_data  # Returns List[Data]
        if isinstance(self.old_data, Data):
            self.old_data.data.update(new_data)
            if self.text_key:
                self.old_data.text_key = self.text_key
            self.status = self.old_data
            self.validate_text_key(self.old_data)
            return self.old_data  # Returns Data
        msg = "old_data is not a Data object or list of Data objects."
        raise ValueError(msg)

    def get_data(self):
        """Function to get the Data from the attributes."""
        data = {}
        default_keys = {
            "code",
            "_type",
            "number_of_fields",
            "text_key",
            "old_data",
            "text_key_validator",
        }
        for attr_name, attr_value in self._attributes.items():
            if attr_name in default_keys:
                continue  # Skip default attributes
            if isinstance(attr_value, dict):
                for key, value in attr_value.items():
                    data[key] = value.get_text() if isinstance(value, Data) else value
            elif isinstance(attr_value, Data):
                data[attr_name] = attr_value.get_text()
            else:
                data[attr_name] = attr_value
        return data

    def validate_text_key(self, data: Data) -> None:
        """This function validates that the Text Key is one of the keys in the Data."""
        data_keys = data.data.keys()
        if self.text_key and self.text_key not in data_keys:
            msg = f"Text Key: '{self.text_key}' not found in the Data keys: {', '.join(data_keys)}"
            raise ValueError(msg)

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax | Privacy policy | Terms of use | Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com