Skip to content

test: add unit tests to execute transform flow e2e #737

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
- name: Install Python toolchains
run: |
source .venv/bin/activate
pip install maturin pytest mypy
pip install maturin mypy pytest pytest-asyncio
- name: Python build
run: |
source .venv/bin/activate
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module-name = "cocoindex._engine"
features = ["pyo3/extension-module"]

[project.optional-dependencies]
dev = ["pytest", "ruff", "mypy", "pre-commit"]
dev = ["pytest", "pytest-asyncio", "ruff", "mypy", "pre-commit"]
embeddings = ["sentence-transformers>=3.3.1"]
all = ["cocoindex[embeddings]"]

Expand Down
90 changes: 90 additions & 0 deletions python/cocoindex/tests/test_transform_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import typing
from dataclasses import dataclass

import pytest

import cocoindex


@dataclass
class Child:
value: int


@dataclass
class Parent:
children: list[Child]


# Fixture to initialize CocoIndex library
@pytest.fixture(scope="session", autouse=True)
def init_cocoindex() -> typing.Generator[None, None, None]:
cocoindex.init()
yield


@cocoindex.op.function()
def add_suffix(text: str) -> str:
"""Append ' world' to the input text."""
return f"{text} world"


@cocoindex.transform_flow()
def simple_transform(text: cocoindex.DataSlice[str]) -> cocoindex.DataSlice[str]:
"""Transform flow that applies add_suffix to input text."""
return text.transform(add_suffix)


@cocoindex.op.function()
def extract_value(child: int) -> int:
"""Extracts the value from a Child object."""
return child


@cocoindex.transform_flow()
def for_each_transform(
data: cocoindex.DataSlice[Parent],
) -> cocoindex.DataSlice[Parent]:
"""Transform flow that processes child rows to extract values."""
with data["children"].row() as child:
child["new_field"] = child["value"].transform(extract_value)
return data


def test_simple_transform_flow() -> None:
"""Test the simple transform flow."""
input_text = "hello"
result = simple_transform.eval(input_text)
assert result == "hello world", f"Expected 'hello world', got {result}"

result = simple_transform.eval("")
assert result == " world", f"Expected ' world', got {result}"


@pytest.mark.asyncio
async def test_simple_transform_flow_async() -> None:
"""Test the simple transform flow asynchronously."""
input_text = "async"
result = await simple_transform.eval_async(input_text)
assert result == "async world", f"Expected 'async world', got {result}"


def test_for_each_transform_flow() -> None:
"""Test the complex transform flow with child rows."""
input_data = Parent(children=[Child(1), Child(2), Child(3)])
result = for_each_transform.eval(input_data)
expected = Parent(children=[Child(1), Child(2), Child(3)])
Copy link
Member

@badmonster0 badmonster0 Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the value we really expect it to return - the new field isn't here.
The reason is the annotated return type doesn't contain the new field.

(This will need to define another set of Parent + Child for this additional field. This may look a little bit awkward for now, but it's OK as it's only a test, and after #758 is resolved we won't need to define types to annotate return values for transform functions)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah right, that can be a valid workaround. But aside from that, I would say there is one more concern if I were right.

Unlike regular @flow_def flows that use DataCollector to explicitly capture the output, here our transform flows can only rely on the direct return value. The row() context creates a temporary scope for iteration, but changes within that scope aren't automatically merged back.

So, if we update it as follows, I don't think the new_field is attached to the data.

@dataclass
class NewChild:
    value: int
    new_field: int = 0

@dataclass
class NewParent:
    children: list[NewChild]

@cocoindex.op.function()
def extract_value(value: int) -> int:
    return value

@cocoindex.transform_flow()
def for_each_transform(
    data: cocoindex.DataSlice[Parent],
) -> cocoindex.DataSlice[NewParent]:
    with data["children"].row() as child:
        child["new_field"] = child["value"].transform(extract_value)
    return data

@pytest.mark.asyncio
async def test_for_each_transform_flow_async() -> None:
    input_data = Parent(children=[Child(4), Child(5)])
    result = await for_each_transform.eval_async(input_data)
    expected = NewParent(
        children=[NewChild(value=4, new_field=4), NewChild(value=5, new_field=5)]
    )
    assert result == expected, f"Expected {expected}, got {result}"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The row() context creates a temporary scope for iteration

This is not the intention of the design. In with data["children"].row() as child, think child as a reference. When doing child["new_field"] = ..., it should add a new field to the original table. CocoInsight also shows this idea (it presents the entire table with all new fields merged).

Did you run the code with this change? If the returned table indeed doesn't have new_field, it's a bug. Maybe comment out this test for now and merge, and after we fix the bug we can uncomment it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation! That's quite unusual; the new_field in the result from the above code snippet was always 0. I'd like to make some other minimal examples and delve into this deeper to see what's wrong with it.

assert result == expected, f"Expected {expected}, got {result}"

input_data = Parent(children=[])
result = for_each_transform.eval(input_data)
assert result == Parent(children=[]), f"Expected [], got {result}"


@pytest.mark.asyncio
async def test_for_each_transform_flow_async() -> None:
"""Test the complex transform flow asynchronously."""
input_data = Parent(children=[Child(4), Child(5)])
result = await for_each_transform.eval_async(input_data)
expected = Parent(children=[Child(4), Child(5)])
assert result == expected, f"Expected {expected}, got {result}"
Loading