Processing components in Langflow

Processing components process and transform data within a flow.

Use a processing component in a flow

The Split Text processing component in this flow splits the incoming Data into chunks to be embedded into the vector store component.

The component offers control over chunk size, overlap, and separator, which affect context and granularity in vector store retrieval results.

vector store document ingestion

Combine Text

This component concatenates two text sources into a single text chunk using a specified delimiter.

Parameters

Inputs
Name Display Name Info

first_text

First Text

The first text input to concatenate.

second_text

Second Text

The second text input to concatenate.

delimiter

Delimiter

A string used to separate the two text inputs. Defaults to a space.

Component code

combine_text.py
from langflow.custom import Component
from langflow.io import MessageTextInput, Output
from langflow.schema.message import Message


class CombineTextComponent(Component):
    display_name = "Combine Text"
    description = "Concatenate two text sources into a single text chunk using a specified delimiter."
    icon = "merge"
    name = "CombineText"

    inputs = [
        MessageTextInput(
            name="text1",
            display_name="First Text",
            info="The first text input to concatenate.",
        ),
        MessageTextInput(
            name="text2",
            display_name="Second Text",
            info="The second text input to concatenate.",
        ),
        MessageTextInput(
            name="delimiter",
            display_name="Delimiter",
            info="A string used to separate the two text inputs. Defaults to a whitespace.",
            value=" ",
        ),
    ]

    outputs = [
        Output(display_name="Combined Text", name="combined_text", method="combine_texts"),
    ]

    def combine_texts(self) -> Message:
        combined = self.delimiter.join([self.text1, self.text2])
        self.status = combined
        return Message(text=combined)

Filter data

The FilterData component filters a data object based on a list of specified keys. This component allows for selective extraction of data from a data object, retaining only the key-value pairs that match the provided filter criteria.

Parameters

Inputs
Name Display Name Info

data

data

data object to filter

filter_criteria

Filter Criteria

List of keys to filter by.

Outputs
Name Display Name Info

filtered_data

Filtered data

The resulting filtered data object.

Component code

filter_data.py
from langflow.custom import Component
from langflow.io import DataInput, MessageTextInput, Output
from langflow.schema import Data


class FilterDataComponent(Component):
    display_name = "Filter Data"
    description = "Filters a Data object based on a list of keys."
    icon = "filter"
    beta = True
    name = "FilterData"

    inputs = [
        DataInput(
            name="data",
            display_name="Data",
            info="Data object to filter.",
        ),
        MessageTextInput(
            name="filter_criteria",
            display_name="Filter Criteria",
            info="List of keys to filter by.",
            is_list=True,
        ),
    ]

    outputs = [
        Output(display_name="Filtered Data", name="filtered_data", method="filter_data"),
    ]

    def filter_data(self) -> Data:
        filter_criteria: list[str] = self.filter_criteria
        data = self.data.data if isinstance(self.data, Data) else {}

        # Filter the data
        filtered = {key: value for key, value in data.items() if key in filter_criteria}

        # Create a new Data object with the filtered data
        filtered_data = Data(data=filtered)
        self.status = filtered_data
        return filtered_data

Filter Values

The Filter Values component filters a list of data items based on a specified key, filter value, and comparison operator.

Parameters

Inputs
Name Display Name Info

input_data

Input data

The list of data items to filter.

filter_key

Filter Key

The key to filter on (for example, 'route').

filter_value

Filter Value

The value to filter by (for example, 'CMIP').

operator

Comparison Operator

The operator to apply for comparing the values.

Outputs
Name Display Name Info

filtered_data

Filtered data

The resulting list of filtered data items.

Component code

filter_data_values.py
from typing import Any

from langflow.custom import Component
from langflow.io import DataInput, DropdownInput, MessageTextInput, Output
from langflow.schema import Data


class DataFilterComponent(Component):
    display_name = "Filter Values"
    description = (
        "Filter a list of data items based on a specified key, filter value,"
        " and comparison operator. Check advanced options to select match comparision."
    )
    icon = "filter"
    beta = True
    name = "FilterDataValues"

    inputs = [
        DataInput(name="input_data", display_name="Input Data", info="The list of data items to filter.", is_list=True),
        MessageTextInput(
            name="filter_key",
            display_name="Filter Key",
            info="The key to filter on (e.g., 'route').",
            value="route",
            input_types=["Data"],
        ),
        MessageTextInput(
            name="filter_value",
            display_name="Filter Value",
            info="The value to filter by (e.g., 'CMIP').",
            value="CMIP",
            input_types=["Data"],
        ),
        DropdownInput(
            name="operator",
            display_name="Comparison Operator",
            options=["equals", "not equals", "contains", "starts with", "ends with"],
            info="The operator to apply for comparing the values.",
            value="equals",
            advanced=True,
        ),
    ]

    outputs = [
        Output(display_name="Filtered Data", name="filtered_data", method="filter_data"),
    ]

    def compare_values(self, item_value: Any, filter_value: str, operator: str) -> bool:
        if operator == "equals":
            return str(item_value) == filter_value
        if operator == "not equals":
            return str(item_value) != filter_value
        if operator == "contains":
            return filter_value in str(item_value)
        if operator == "starts with":
            return str(item_value).startswith(filter_value)
        if operator == "ends with":
            return str(item_value).endswith(filter_value)
        return False

    def filter_data(self) -> list[Data]:
        # Extract inputs
        input_data: list[Data] = self.input_data
        filter_key: str = self.filter_key.text
        filter_value: str = self.filter_value.text
        operator: str = self.operator

        # Validate inputs
        if not input_data:
            self.status = "Input data is empty."
            return []

        if not filter_key or not filter_value:
            self.status = "Filter key or value is missing."
            return input_data

        # Filter the data
        filtered_data = []
        for item in input_data:
            if isinstance(item.data, dict) and filter_key in item.data:
                if self.compare_values(item.data[filter_key], filter_value, operator):
                    filtered_data.append(item)
            else:
                self.status = f"Warning: Some items don't have the key '{filter_key}' or are not dictionaries."

        self.status = filtered_data
        return filtered_data

JSON Cleaner

The JSON Cleaner component cleans JSON strings to ensure they are fully compliant with the JSON specification.

Parameters

Inputs
Name Display Name Info

json_str

JSON String

The JSON string to be cleaned. This can be a raw, potentially malformed JSON string produced by language models or other sources that may not fully comply with JSON specifications.

remove_control_chars

Remove Control Characters

If set to True, this option removes control characters (ASCII characters 0-31 and 127) from the JSON string. This can help eliminate invisible characters that might cause parsing issues or make the JSON invalid.

normalize_unicode

Normalize Unicode

When enabled, this option normalizes Unicode characters in the JSON string to their canonical composition form (NFC). This ensures consistent representation of Unicode characters across different systems and prevents potential issues with character encoding.

validate_json

Validate JSON

If set to True, this option attempts to parse the JSON string to ensure it is well-formed before applying the final repair operation. It raises a ValueError if the JSON is invalid, allowing for early detection of major structural issues in the JSON.

Outputs
Name Display Name Info

output

Cleaned JSON String

The resulting cleaned, repaired, and validated JSON string that fully complies with the JSON specification.

Component code

json_cleaner.py
import json
import unicodedata

from langflow.custom import Component
from langflow.inputs import BoolInput, MessageTextInput
from langflow.schema.message import Message
from langflow.template import Output


class JSONCleaner(Component):
    icon = "braces"
    display_name = "JSON Cleaner"
    description = (
        "Cleans the messy and sometimes incorrect JSON strings produced by LLMs "
        "so that they are fully compliant with the JSON spec."
    )

    inputs = [
        MessageTextInput(
            name="json_str", display_name="JSON String", info="The JSON string to be cleaned.", required=True
        ),
        BoolInput(
            name="remove_control_chars",
            display_name="Remove Control Characters",
            info="Remove control characters from the JSON string.",
            required=False,
        ),
        BoolInput(
            name="normalize_unicode",
            display_name="Normalize Unicode",
            info="Normalize Unicode characters in the JSON string.",
            required=False,
        ),
        BoolInput(
            name="validate_json",
            display_name="Validate JSON",
            info="Validate the JSON string to ensure it is well-formed.",
            required=False,
        ),
    ]

    outputs = [
        Output(display_name="Cleaned JSON String", name="output", method="clean_json"),
    ]

    def clean_json(self) -> Message:
        try:
            from json_repair import repair_json
        except ImportError as e:
            msg = "Could not import the json_repair package. Please install it with `pip install json_repair`."
            raise ImportError(msg) from e

        """Clean the input JSON string based on provided options and return the cleaned JSON string."""
        json_str = self.json_str
        remove_control_chars = self.remove_control_chars
        normalize_unicode = self.normalize_unicode
        validate_json = self.validate_json

        start = json_str.find("{")
        end = json_str.rfind("}")
        if start == -1 or end == -1:
            msg = "Invalid JSON string: Missing '{' or '}'"
            raise ValueError(msg)
        try:
            json_str = json_str[start : end + 1]

            if remove_control_chars:
                json_str = self._remove_control_characters(json_str)
            if normalize_unicode:
                json_str = self._normalize_unicode(json_str)
            if validate_json:
                json_str = self._validate_json(json_str)

            cleaned_json_str = repair_json(json_str)
            result = str(cleaned_json_str)

            self.status = result
            return Message(text=result)
        except Exception as e:
            msg = f"Error cleaning JSON string: {e}"
            raise ValueError(msg) from e

    def _remove_control_characters(self, s: str) -> str:
        """Remove control characters from the string."""
        return s.translate(self.translation_table)

    def _normalize_unicode(self, s: str) -> str:
        """Normalize Unicode characters in the string."""
        return unicodedata.normalize("NFC", s)

    def _validate_json(self, s: str) -> str:
        """Validate the JSON string."""
        try:
            json.loads(s)
        except json.JSONDecodeError as e:
            msg = f"Invalid JSON string: {e}"
            raise ValueError(msg) from e
        return s

    def __init__(self, *args, **kwargs):
        # Create a translation table that maps control characters to None
        super().__init__(*args, **kwargs)
        self.translation_table = str.maketrans("", "", "".join(chr(i) for i in range(32)) + chr(127))

Message to data

The message to data component converts a message object to a data object.

Parameters

Inputs
Name Display Name Info

message

message

The message object to convert to a data object.

Outputs
Name Display Name Info

data

data

The resulting data object converted from the input message.

Component code

message_to_data.py
from loguru import logger

from langflow.custom import Component
from langflow.io import MessageInput, Output
from langflow.schema import Data
from langflow.schema.message import Message


class MessageToDataComponent(Component):
    display_name = "Message to Data"
    description = "Convert a Message object to a Data object"
    icon = "message-square-share"
    beta = True
    name = "MessagetoData"

    inputs = [
        MessageInput(
            name="message",
            display_name="Message",
            info="The Message object to convert to a Data object",
        ),
    ]

    outputs = [
        Output(display_name="Data", name="data", method="convert_message_to_data"),
    ]

    def convert_message_to_data(self) -> Data:
        if isinstance(self.message, Message):
            # Convert Message to Data
            return Data(data=self.message.data)

        msg = "Error converting Message to Data: Input must be a Message object"
        logger.opt(exception=True).debug(msg)
        self.status = msg
        return Data(data={"error": msg})

Merge data

The Merge data component combines multiple data objects into a unified list of data objects.

Parameters

Inputs
Name Display Name Info

data_inputs

data Inputs

A list of data input objects to be merged.

Outputs
Name Display Name Info

merged_data

Merged data

The resulting list of merged data objects with consistent keys.

Component code

merge_data.py
from enum import Enum
from typing import cast

from loguru import logger

from langflow.custom import Component
from langflow.io import DataInput, DropdownInput, Output
from langflow.schema import DataFrame


class DataOperation(str, Enum):
    CONCATENATE = "Concatenate"
    APPEND = "Append"
    MERGE = "Merge"
    JOIN = "Join"


class MergeDataComponent(Component):
    display_name = "Data Combiner"
    description = "Combines data using different operations"
    icon = "merge"
    MIN_INPUTS_REQUIRED = 2

    inputs = [
        DataInput(name="data_inputs", display_name="Data Inputs", info="Data to combine", is_list=True, required=True),
        DropdownInput(
            name="operation",
            display_name="Operation Type",
            options=[op.value for op in DataOperation],
            value=DataOperation.CONCATENATE.value,
        ),
    ]
    outputs = [Output(display_name="DataFrame", name="combined_data", method="combine_data")]

    def combine_data(self) -> DataFrame:
        if not self.data_inputs or len(self.data_inputs) < self.MIN_INPUTS_REQUIRED:
            empty_dataframe = DataFrame()
            self.status = empty_dataframe
            return empty_dataframe

        operation = DataOperation(self.operation)
        try:
            combined_dataframe = self._process_operation(operation)
            self.status = combined_dataframe
        except Exception as e:
            logger.error(f"Error during operation {operation}: {e!s}")
            raise
        else:
            return combined_dataframe

    def _process_operation(self, operation: DataOperation) -> DataFrame:
        if operation == DataOperation.CONCATENATE:
            combined_data: dict[str, str | object] = {}
            for data_input in self.data_inputs:
                for key, value in data_input.data.items():
                    if key in combined_data:
                        if isinstance(combined_data[key], str) and isinstance(value, str):
                            combined_data[key] = f"{combined_data[key]}\n{value}"
                        else:
                            combined_data[key] = value
                    else:
                        combined_data[key] = value
            return DataFrame([combined_data])

        if operation == DataOperation.APPEND:
            rows = [data_input.data for data_input in self.data_inputs]
            return DataFrame(rows)

        if operation == DataOperation.MERGE:
            result_data: dict[str, str | list[str] | object] = {}
            for data_input in self.data_inputs:
                for key, value in data_input.data.items():
                    if key in result_data and isinstance(value, str):
                        if isinstance(result_data[key], list):
                            cast("list[str]", result_data[key]).append(value)
                        else:
                            result_data[key] = [result_data[key], value]
                    else:
                        result_data[key] = value
            return DataFrame([result_data])

        if operation == DataOperation.JOIN:
            combined_data = {}
            for idx, data_input in enumerate(self.data_inputs, 1):
                for key, value in data_input.data.items():
                    new_key = f"{key}_doc{idx}" if idx > 1 else key
                    combined_data[new_key] = value
            return DataFrame([combined_data])

        return DataFrame()

Parse data

The ParseData component converts data objects into plain text using a specified template. This component transforms structured data into human-readable text formats, allowing for customizable output through the use of templates.

Parameters

Inputs
Name Display Name Info

data

data

The data to convert to text.

template

Template

The template to use for formatting the data. It can contain the keys {text}, {data} or any other key in the data.

sep

Separator

The separator to use between multiple data items.

Outputs
Name Display Name Info

text

Text

The resulting formatted text string as a message object.

Component code

parse_data.py
from langflow.custom import Component
from langflow.helpers.data import data_to_text, data_to_text_list
from langflow.io import DataInput, MultilineInput, Output, StrInput
from langflow.schema import Data
from langflow.schema.message import Message


class ParseDataComponent(Component):
    display_name = "Data to Message"
    description = "Convert Data objects into Messages using any {field_name} from input data."
    icon = "message-square"
    name = "ParseData"

    inputs = [
        DataInput(name="data", display_name="Data", info="The data to convert to text.", is_list=True, required=True),
        MultilineInput(
            name="template",
            display_name="Template",
            info="The template to use for formatting the data. "
            "It can contain the keys {text}, {data} or any other key in the Data.",
            value="{text}",
            required=True,
        ),
        StrInput(name="sep", display_name="Separator", advanced=True, value="\n"),
    ]

    outputs = [
        Output(
            display_name="Message",
            name="text",
            info="Data as a single Message, with each input Data separated by Separator",
            method="parse_data",
        ),
        Output(
            display_name="Data List",
            name="data_list",
            info="Data as a list of new Data, each having `text` formatted by Template",
            method="parse_data_as_list",
        ),
    ]

    def _clean_args(self) -> tuple[list[Data], str, str]:
        data = self.data if isinstance(self.data, list) else [self.data]
        template = self.template
        sep = self.sep
        return data, template, sep

    def parse_data(self) -> Message:
        data, template, sep = self._clean_args()
        result_string = data_to_text(template, data, sep)
        self.status = result_string
        return Message(text=result_string)

    def parse_data_as_list(self) -> list[Data]:
        data, template, _ = self._clean_args()
        text_list, data_list = data_to_text_list(template, data)
        for item, text in zip(data_list, text_list, strict=True):
            item.set_text(text)
        self.status = data_list
        return data_list

Parse JSON

This component converts and extracts JSON fields using JQ queries.

Parameters

Inputs
Name Display Name Info

input_value

Input

The data object to filter. It can be a message or data object.

query

JQ Query

JQ Query to filter the data. The input is always a JSON list.

Outputs
Name Display Name Info

filtered_data

Filtered data

Filtered data as a list of data objects.

Component code

parse_json_data.py
import json
from json import JSONDecodeError

import jq
from json_repair import repair_json
from loguru import logger

from langflow.custom import Component
from langflow.inputs import HandleInput, MessageTextInput
from langflow.io import Output
from langflow.schema import Data
from langflow.schema.message import Message


class ParseJSONDataComponent(Component):
    display_name = "Parse JSON"
    description = "Convert and extract JSON fields."
    icon = "braces"
    name = "ParseJSONData"
    legacy: bool = True

    inputs = [
        HandleInput(
            name="input_value",
            display_name="Input",
            info="Data object to filter.",
            required=True,
            input_types=["Message", "Data"],
        ),
        MessageTextInput(
            name="query",
            display_name="JQ Query",
            info="JQ Query to filter the data. The input is always a JSON list.",
            required=True,
        ),
    ]

    outputs = [
        Output(display_name="Filtered Data", name="filtered_data", method="filter_data"),
    ]

    def _parse_data(self, input_value) -> str:
        if isinstance(input_value, Message) and isinstance(input_value.text, str):
            return input_value.text
        if isinstance(input_value, Data):
            return json.dumps(input_value.data)
        return str(input_value)

    def filter_data(self) -> list[Data]:
        to_filter = self.input_value
        if not to_filter:
            return []
        # Check if input is a list
        if isinstance(to_filter, list):
            to_filter = [self._parse_data(f) for f in to_filter]
        else:
            to_filter = self._parse_data(to_filter)

        # If input is not a list, don't wrap it in a list
        if not isinstance(to_filter, list):
            to_filter = repair_json(to_filter)
            try:
                to_filter_as_dict = json.loads(to_filter)
            except JSONDecodeError:
                try:
                    to_filter_as_dict = json.loads(repair_json(to_filter))
                except JSONDecodeError as e:
                    msg = f"Invalid JSON: {e}"
                    raise ValueError(msg) from e
        else:
            to_filter = [repair_json(f) for f in to_filter]
            to_filter_as_dict = []
            for f in to_filter:
                try:
                    to_filter_as_dict.append(json.loads(f))
                except JSONDecodeError:
                    try:
                        to_filter_as_dict.append(json.loads(repair_json(f)))
                    except JSONDecodeError as e:
                        msg = f"Invalid JSON: {e}"
                        raise ValueError(msg) from e
            to_filter = to_filter_as_dict

        full_filter_str = json.dumps(to_filter_as_dict)

        logger.info("to_filter: ", to_filter)

        results = jq.compile(self.query).input_text(full_filter_str).all()
        logger.info("results: ", results)
        return [Data(data=value) if isinstance(value, dict) else Data(text=str(value)) for value in results]

Split Text

This component splits text into chunks of a specified length.

Parameters

Inputs
Name Display Name Info

texts

Texts

Texts to split.

separators

Separators

Characters to split on. Defaults to a space.

max_chunk_size

Max Chunk Size

The maximum length (in characters) of each chunk.

chunk_overlap

Chunk Overlap

The amount of character overlap between chunks.

recursive

Recursive

Whether to split recursively.

Component code

split_text.py
from langchain_text_splitters import CharacterTextSplitter

from langflow.custom import Component
from langflow.io import HandleInput, IntInput, MessageTextInput, Output
from langflow.schema import Data, DataFrame
from langflow.utils.util import unescape_string


class SplitTextComponent(Component):
    display_name: str = "Split Text"
    description: str = "Split text into chunks based on specified criteria."
    icon = "scissors-line-dashed"
    name = "SplitText"

    inputs = [
        HandleInput(
            name="data_inputs",
            display_name="Data Inputs",
            info="The data to split.",
            input_types=["Data"],
            is_list=True,
            required=True,
        ),
        IntInput(
            name="chunk_overlap",
            display_name="Chunk Overlap",
            info="Number of characters to overlap between chunks.",
            value=200,
        ),
        IntInput(
            name="chunk_size",
            display_name="Chunk Size",
            info="The maximum number of characters in each chunk.",
            value=1000,
        ),
        MessageTextInput(
            name="separator",
            display_name="Separator",
            info="The character to split on. Defaults to newline.",
            value="\n",
        ),
    ]

    outputs = [
        Output(display_name="Chunks", name="chunks", method="split_text"),
        Output(display_name="DataFrame", name="dataframe", method="as_dataframe"),
    ]

    def _docs_to_data(self, docs):
        return [Data(text=doc.page_content, data=doc.metadata) for doc in docs]

    def split_text(self) -> list[Data]:
        separator = unescape_string(self.separator)

        documents = [_input.to_lc_document() for _input in self.data_inputs if isinstance(_input, Data)]

        splitter = CharacterTextSplitter(
            chunk_overlap=self.chunk_overlap,
            chunk_size=self.chunk_size,
            separator=separator,
        )
        docs = splitter.split_documents(documents)
        data = self._docs_to_data(docs)
        self.status = data
        return data

    def as_dataframe(self) -> DataFrame:
        return DataFrame(self.split_text())

Update data

The Update data component dynamically updates or appends data with specified fields.

Parameters

Inputs
Name Display Name Info

old_data

data

The records to update. It can be a single data object or a list of data objects.

number_of_fields

Number of Fields

Number of fields to be added to the record (range: 1-15).

text_key

Text Key

Key that identifies the field to be used as the text content.

text_key_validator

Text Key Validator

If enabled, checks if the given 'Text Key' is present in the given 'data' object.

Outputs
Name Display Name Info

data

data

The resulting updated data objects.

Component code

update_data.py
from typing import Any

from langflow.custom import Component
from langflow.field_typing.range_spec import RangeSpec
from langflow.inputs.inputs import (
    BoolInput,
    DataInput,
    DictInput,
    IntInput,
    MessageTextInput,
)
from langflow.io import Output
from langflow.schema import Data
from langflow.schema.dotdict import dotdict


class UpdateDataComponent(Component):
    display_name: str = "Update Data"
    description: str = "Dynamically update or append data with the specified fields."
    name: str = "UpdateData"
    MAX_FIELDS = 15  # Define a constant for maximum number of fields
    icon = "FolderSync"

    inputs = [
        DataInput(
            name="old_data",
            display_name="Data",
            info="The record to update.",
            is_list=True,  # Changed to True to handle list of Data objects
        ),
        IntInput(
            name="number_of_fields",
            display_name="Number of Fields",
            info="Number of fields to be added to the record.",
            real_time_refresh=True,
            value=0,
            range_spec=RangeSpec(min=1, max=MAX_FIELDS, step=1, step_type="int"),
        ),
        MessageTextInput(
            name="text_key",
            display_name="Text Key",
            info="Key that identifies the field to be used as the text content.",
            advanced=True,
        ),
        BoolInput(
            name="text_key_validator",
            display_name="Text Key Validator",
            advanced=True,
            info="If enabled, checks if the given 'Text Key' is present in the given 'Data'.",
        ),
    ]

    outputs = [
        Output(display_name="Data", name="data", method="build_data"),
    ]

    def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
        """Update the build configuration when the number of fields changes.

        Args:
            build_config (dotdict): The current build configuration.
            field_value (Any): The new value for the field.
            field_name (Optional[str]): The name of the field being updated.
        """
        if field_name == "number_of_fields":
            default_keys = {
                "code",
                "_type",
                "number_of_fields",
                "text_key",
                "old_data",
                "text_key_validator",
            }
            try:
                field_value_int = int(field_value)
            except ValueError:
                return build_config

            if field_value_int > self.MAX_FIELDS:
                build_config["number_of_fields"]["value"] = self.MAX_FIELDS
                msg = f"Number of fields cannot exceed {self.MAX_FIELDS}. Try using a Component to combine two Data."
                raise ValueError(msg)

            existing_fields = {}
            # Back up the existing template fields
            for key in list(build_config.keys()):
                if key not in default_keys:
                    existing_fields[key] = build_config.pop(key)

            for i in range(1, field_value_int + 1):
                key = f"field_{i}_key"
                if key in existing_fields:
                    field = existing_fields[key]
                    build_config[key] = field
                else:
                    field = DictInput(
                        display_name=f"Field {i}",
                        name=key,
                        info=f"Key for field {i}.",
                        input_types=["Message", "Data"],
                    )
                    build_config[field.name] = field.to_dict()

            build_config["number_of_fields"]["value"] = field_value_int
        return build_config

    async def build_data(self) -> Data | list[Data]:
        """Build the updated data by combining the old data with new fields."""
        new_data = self.get_data()
        if isinstance(self.old_data, list):
            for data_item in self.old_data:
                if not isinstance(data_item, Data):
                    continue  # Skip invalid items
                data_item.data.update(new_data)
                if self.text_key:
                    data_item.text_key = self.text_key
                self.validate_text_key(data_item)
            self.status = self.old_data
            return self.old_data  # Returns List[Data]
        if isinstance(self.old_data, Data):
            self.old_data.data.update(new_data)
            if self.text_key:
                self.old_data.text_key = self.text_key
            self.status = self.old_data
            self.validate_text_key(self.old_data)
            return self.old_data  # Returns Data
        msg = "old_data is not a Data object or list of Data objects."
        raise ValueError(msg)

    def get_data(self):
        """Function to get the Data from the attributes."""
        data = {}
        default_keys = {
            "code",
            "_type",
            "number_of_fields",
            "text_key",
            "old_data",
            "text_key_validator",
        }
        for attr_name, attr_value in self._attributes.items():
            if attr_name in default_keys:
                continue  # Skip default attributes
            if isinstance(attr_value, dict):
                for key, value in attr_value.items():
                    data[key] = value.get_text() if isinstance(value, Data) else value
            elif isinstance(attr_value, Data):
                data[attr_name] = attr_value.get_text()
            else:
                data[attr_name] = attr_value
        return data

    def validate_text_key(self, data: Data) -> None:
        """This function validates that the Text Key is one of the keys in the Data."""
        data_keys = data.data.keys()
        if self.text_key and self.text_key not in data_keys:
            msg = f"Text Key: '{self.text_key}' not found in the Data keys: {', '.join(data_keys)}"
            raise ValueError(msg)

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax | Privacy policy | Terms of use | Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com