Skip to content

Comments

FOEPD-2040 support operator pipeline live execution#6628

Merged
swheaton merged 3 commits intodevelopfrom
feat/foepd-2564-live-pipeline-exec
Jan 8, 2026
Merged

FOEPD-2040 support operator pipeline live execution#6628
swheaton merged 3 commits intodevelopfrom
feat/foepd-2564-live-pipeline-exec

Conversation

@swheaton
Copy link
Contributor

@swheaton swheaton commented Dec 1, 2025

What changes are proposed in this pull request?

Supports immediate execution of operator pipelines. Previously not supported at all, and delegated pipeline execution was only possible in Enterprise. Now delegated execution can occur but it will just run all stages of the pipeline back to back in the same execution.

Equivalent to this pseudocode

for stage in pipeline.stages:
    foo.execute_operator(stage.uri, **stage.params)

How is this patch tested? If it is not, please explain why.

Added new tests to verify functionality.

Manually verify behavior locally with a dummy pipeline operator that I fiddle with

Cases:

  • All-success
  • Failure -> early exit
  • Failure w/ always run -> skip middle stages but execute always-run finalizer

Environments:

  • Run pipeline from UI
  • Run pipeline from SDK - foo.execute_operator()
  • Queued pipeline as a DO, verified local executor functionality

Release Notes

Is this a user-facing change that should be mentioned in the release notes?

  • No. You can skip the rest of this section.
  • Yes. Give a description of this change to be included in the release
    notes for FiftyOne users.

Support for immediate :class:fiftyone.operators.OperatorPipeline execution.

What areas of FiftyOne does this PR affect?

  • App: FiftyOne application changes
  • Build: Build and test infrastructure changes
  • Core: Core fiftyone Python library changes
  • Documentation: FiftyOne documentation changes
  • Other

Summary by CodeRabbit

Release Notes

  • New Features

    • Pipeline operators now support delegated execution mode for child process execution.
    • Added live pipeline execution capability with sequential stage processing.
    • Enhanced error handling with per-stage error capture and improved propagation.
    • Pipeline execution context is properly maintained across all execution paths.
  • Tests

    • Added comprehensive test coverage for pipeline execution scenarios and error handling.

✏️ Tip: You can customize this high-level summary in your review settings.

@swheaton swheaton requested review from a team as code owners December 1, 2025 16:02
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 1, 2025

Walkthrough

This PR introduces pipeline execution support to FiftyOne operators. A new do_execute_pipeline() function sequentially executes pipeline stages with per-stage error capture and context management. Both delegated (child-process) and synchronous execution paths integrate pipeline execution, conditionally invoking the pipeline handler before falling back to operator execution. The delegated flow now carries resolved pipeline context forward instead of passing None. Exception propagation is explicitly triggered in multiple paths via raise_exceptions(). Comprehensive test coverage validates execution scenarios including error handling, missing operators, and stages with special behavior flags.

Sequence Diagram(s)

sequenceDiagram
    actor Client
    participant Delegated as Delegated<br/>Executor
    participant Pipeline
    participant Stage as Stage<br/>Operators
    participant Context as Execution<br/>Context

    Client->>Delegated: _execute_operator(doc with pipeline)
    alt Pipeline Present
        Delegated->>Pipeline: do_execute_pipeline(pipeline, ctx)
        loop For Each Stage
            Pipeline->>Stage: Resolve & validate operator
            alt Operator Exists
                Pipeline->>Stage: Execute with context
                Stage->>Context: Update state
                alt Stage Error
                    Pipeline->>Context: Record error, mark stage
                    Note over Pipeline: Continue if always_run
                end
            else Operator Missing
                Pipeline->>Context: Record error
            end
        end
        Pipeline-->>Delegated: (error, message) or None
        alt Error Returned
            Delegated->>Client: ExecutionResult with error
        else Success
            Delegated->>Stage: Fallback: do_execute_operator
            Stage-->>Delegated: Result
            Delegated->>Client: ExecutionResult
        end
    else No Pipeline
        Delegated->>Stage: do_execute_operator(operator, ctx)
        Stage-->>Delegated: Result
        Delegated->>Client: ExecutionResult
    end
    Delegated->>Client: raise_exceptions()
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 26.32% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title directly and concisely identifies the main feature: support for operator pipeline live execution, matching the core objective of the PR.
Description check ✅ Passed The description covers proposed changes, testing approach with multiple scenarios, and completed release notes sections as required by the template.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8baccdf and 4873d6f.

📒 Files selected for processing (4)
  • fiftyone/operators/delegated.py (5 hunks)
  • fiftyone/operators/executor.py (5 hunks)
  • tests/unittests/operators/delegated_tests.py (1 hunks)
  • tests/unittests/operators/executor_tests.py (3 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: swheaton
Repo: voxel51/fiftyone PR: 6023
File: fiftyone/operators/executor.py:1326-1332
Timestamp: 2025-06-16T21:07:49.946Z
Learning: In fiftyone/operators/executor.py, the ExecutionOptions.__init__ method intentionally ignores the allow_distributed_execution parameter by hardcoding _allow_distributed_execution to False. This is deliberate scaffolding behavior to maintain API compatibility while reserving distributed execution functionality for FiftyOne Enterprise only.
📚 Learning: 2025-06-16T21:07:49.946Z
Learnt from: swheaton
Repo: voxel51/fiftyone PR: 6023
File: fiftyone/operators/executor.py:1326-1332
Timestamp: 2025-06-16T21:07:49.946Z
Learning: In fiftyone/operators/executor.py, the ExecutionOptions.__init__ method intentionally ignores the allow_distributed_execution parameter by hardcoding _allow_distributed_execution to False. This is deliberate scaffolding behavior to maintain API compatibility while reserving distributed execution functionality for FiftyOne Enterprise only.

Applied to files:

  • fiftyone/operators/executor.py
  • tests/unittests/operators/executor_tests.py
  • fiftyone/operators/delegated.py
📚 Learning: 2025-10-07T01:19:39.063Z
Learnt from: swheaton
Repo: voxel51/fiftyone PR: 6385
File: fiftyone/operators/_types/pipeline.py:64-92
Timestamp: 2025-10-07T01:19:39.063Z
Learning: In fiftyone/operators/_types/pipeline.py, the Pipeline.stage() method intentionally accepts unused **kwargs for forward compatibility. This design pattern should not be flagged as an issue.

Applied to files:

  • fiftyone/operators/executor.py
🧬 Code graph analysis (4)
tests/unittests/operators/delegated_tests.py (5)
fiftyone/operators/executor.py (4)
  • delegated (896-898)
  • operator_uri (956-958)
  • num_distributed_tasks (938-941)
  • ExecutionContext (627-1141)
fiftyone/operators/_types/pipeline.py (2)
  • Pipeline (78-171)
  • PipelineStage (13-74)
fiftyone/factory/repos/delegated_operation_doc.py (2)
  • num_distributed_tasks (70-73)
  • DelegatedOperationDocument (23-168)
fiftyone/core/odm/document.py (2)
  • copy (167-173)
  • copy (609-626)
fiftyone/operators/delegated.py (1)
  • _execute_operator (846-927)
fiftyone/operators/executor.py (1)
fiftyone/operators/_types/pipeline.py (1)
  • stage (93-128)
tests/unittests/operators/executor_tests.py (3)
fiftyone/operators/operator.py (6)
  • Operator (121-345)
  • PipelineOperator (348-377)
  • name (139-140)
  • resolve_pipeline (359-371)
  • execute (208-219)
  • execute (373-377)
fiftyone/operators/_types/pipeline.py (2)
  • Pipeline (78-171)
  • stage (93-128)
fiftyone/operators/executor.py (3)
  • execute_or_delegate_operator (233-399)
  • operator_uri (956-958)
  • ExecutionResult (1144-1229)
fiftyone/operators/delegated.py (1)
fiftyone/operators/executor.py (4)
  • do_execute_pipeline (463-523)
  • raise_exceptions (1187-1191)
  • do_execute_operator (440-460)
  • ExecutionResult (1144-1229)
🪛 Ruff (0.14.6)
tests/unittests/operators/delegated_tests.py

1406-1406: Use pytest.raises instead of unittest-style assertRaises

(PT027)

fiftyone/operators/executor.py

480-480: Redefinition of unused error_message from line 18

(F811)


501-503: Abstract raise to an inner function

(TRY301)


501-503: Avoid specifying long messages outside the exception class

(TRY003)


509-509: Do not catch blind exception: Exception

(BLE001)


515-515: Use explicit conversion flag

Replace with conversion flag

(RUF010)


519-519: Do not catch blind exception: Exception

(BLE001)


520-520: Use explicit conversion flag

Replace with conversion flag

(RUF010)

tests/unittests/operators/executor_tests.py

201-201: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


203-203: Missing type annotation for *args

(ANN002)


203-203: Missing type annotation for **kwargs

(ANN003)


203-203: Unused method argument: kwargs

(ARG002)


248-248: Consider [pipeline_operator, *operators] instead of concatenation

Replace with [pipeline_operator, *operators]

(RUF005)


287-287: Consider [pipeline_operator, *operators] instead of concatenation

Replace with [pipeline_operator, *operators]

(RUF005)


327-327: Consider [pipeline_operator, *operators] instead of concatenation

Replace with [pipeline_operator, *operators]

(RUF005)


370-370: Consider [pipeline_operator, *operators] instead of concatenation

Replace with [pipeline_operator, *operators]

(RUF005)

fiftyone/operators/delegated.py

904-904: Unpacked variable error_message is never used

Prefix it with an underscore or any other dummy variable pattern

(RUF059)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
  • GitHub Check: test-windows / test-python (windows-latest, 3.9)
  • GitHub Check: test-windows / test-python (windows-latest, 3.11)
  • GitHub Check: test-windows / test-python (windows-latest, 3.12)
  • GitHub Check: test-windows / test-python (windows-latest, 3.10)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.11)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.9)
  • GitHub Check: test / test-app
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.12)
  • GitHub Check: lint / eslint
  • GitHub Check: e2e / test-e2e
  • GitHub Check: build / build
  • GitHub Check: build
🔇 Additional comments (10)
tests/unittests/operators/delegated_tests.py (2)

1301-1356: LGTM - Pipeline execution test is well-structured.

The test correctly verifies that _execute_operator delegates to do_execute_pipeline when a pipeline is present, and validates the call patterns for both prepare_operator_executor and do_execute_pipeline.


1358-1420: LGTM - Error propagation test covers the failure path.

The test validates that pipeline execution errors are properly propagated as exceptions. The use of self.assertRaises is appropriate here since this test class extends unittest.TestCase.

tests/unittests/operators/executor_tests.py (4)

237-272: LGTM - Pipeline success test validates correct stage execution and parameter propagation.


274-310: LGTM - Failure test verifies early exit and error message format.


312-349: LGTM - Missing operator test validates proper error handling.


351-399: LGTM - always_run test confirms finalizer stages execute despite earlier failures.

The test correctly verifies that:

  1. First stage executes and fails
  2. Second stage is skipped
  3. Third stage (with always_run=True) still executes
  4. The original error is preserved in the result
fiftyone/operators/delegated.py (4)

23-23: LGTM - Import added for pipeline execution support.


101-101: LGTM - Error propagation ensures failures bubble up from child process.


676-676: LGTM - Error propagation in sync execution path.


925-927: LGTM - Simplified return statement.

weird errant import. blame AI

rm prints

code rabbit
@swheaton swheaton force-pushed the feat/foepd-2564-live-pipeline-exec branch from a341fb1 to 831e22a Compare January 7, 2026 15:32
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @tests/unittests/operators/delegated_tests.py:
- Around line 1338-1401: The test incorrectly expects _execute_operator to
raise; instead call result =
asyncio.run(self.svc._execute_operator(pipeline_do)) and assert that result is
an ExecutionResult and that result.raise_exceptions() raises the ValueError with
message "Pipeline execution failed" (or otherwise inspect the ExecutionResult
payload to assert it contains the same error). Keep the same mocks
(prepare_operator_executor and do_execute_pipeline) and the existing
assert_called_once_with checks, but replace the with self.assertRaises(...)
block with the ExecutionResult assertion and the raise_exceptions() invocation
to validate the contained error.
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a341fb1 and 831e22a.

📒 Files selected for processing (4)
  • fiftyone/operators/delegated.py
  • fiftyone/operators/executor.py
  • tests/unittests/operators/delegated_tests.py
  • tests/unittests/operators/executor_tests.py
🧰 Additional context used
🧠 Learnings (5)
📚 Learning: 2025-06-16T21:07:49.946Z
Learnt from: swheaton
Repo: voxel51/fiftyone PR: 6023
File: fiftyone/operators/executor.py:1326-1332
Timestamp: 2025-06-16T21:07:49.946Z
Learning: In fiftyone/operators/executor.py, the ExecutionOptions.__init__ method intentionally ignores the allow_distributed_execution parameter by hardcoding _allow_distributed_execution to False. This is deliberate scaffolding behavior to maintain API compatibility while reserving distributed execution functionality for FiftyOne Enterprise only.

Applied to files:

  • fiftyone/operators/executor.py
  • fiftyone/operators/delegated.py
  • tests/unittests/operators/executor_tests.py
📚 Learning: 2025-10-07T01:19:39.063Z
Learnt from: swheaton
Repo: voxel51/fiftyone PR: 6385
File: fiftyone/operators/_types/pipeline.py:64-92
Timestamp: 2025-10-07T01:19:39.063Z
Learning: In fiftyone/operators/_types/pipeline.py, the Pipeline.stage() method intentionally accepts unused **kwargs for forward compatibility. This design pattern should not be flagged as an issue.

Applied to files:

  • fiftyone/operators/executor.py
📚 Learning: 2024-08-12T17:41:41.645Z
Learnt from: swheaton
Repo: voxel51/fiftyone PR: 4651
File: fiftyone/__public__.py:116-116
Timestamp: 2024-08-12T17:41:41.645Z
Learning: In `fiftyone/__public__.py`, imports are intended to make names available to users at the top module level, even if they are not used directly in the file. Avoid suggesting the removal of unused imports in this file.

Applied to files:

  • fiftyone/operators/executor.py
📚 Learning: 2025-12-09T15:35:45.409Z
Learnt from: swheaton
Repo: voxel51/fiftyone PR: 6646
File: tests/unittests/storage_tests.py:84-84
Timestamp: 2025-12-09T15:35:45.409Z
Learning: In the voxel51/fiftyone repository, when reviewing Python code that uses open(), do not suggest removing explicit mode="r" arguments even if Ruff flags UP015. The maintainers prefer explicit open mode for readability. Apply this guideline to all Python files in the repo (tests and source) where open() is used, not just a single file.

Applied to files:

  • fiftyone/operators/executor.py
  • fiftyone/operators/delegated.py
  • tests/unittests/operators/delegated_tests.py
  • tests/unittests/operators/executor_tests.py
📚 Learning: 2025-12-09T15:36:23.193Z
Learnt from: swheaton
Repo: voxel51/fiftyone PR: 6646
File: tests/unittests/storage_tests.py:274-274
Timestamp: 2025-12-09T15:36:23.193Z
Learning: In pytest.mark.parametrize, when parametrizing a single parameter you can use a plain string (e.g., 'param_name') as a shorthand. The tuple syntax ('param_name',) is only required when you are parametrizing multiple parameters. Do not flag PT006 for single-parameter parametrizations; use the string shorthand for clarity and correctness across tests.

Applied to files:

  • tests/unittests/operators/delegated_tests.py
  • tests/unittests/operators/executor_tests.py
🧬 Code graph analysis (3)
fiftyone/operators/executor.py (1)
fiftyone/operators/_types/pipeline.py (1)
  • stage (93-128)
tests/unittests/operators/delegated_tests.py (1)
fiftyone/operators/delegated.py (2)
  • _execute_operator (851-934)
  • set_progress (164-176)
tests/unittests/operators/executor_tests.py (3)
fiftyone/operators/operator.py (6)
  • Operator (121-345)
  • PipelineOperator (348-377)
  • name (139-140)
  • resolve_pipeline (359-371)
  • execute (208-219)
  • execute (373-377)
fiftyone/operators/_types/pipeline.py (2)
  • Pipeline (78-171)
  • stage (93-128)
fiftyone/operators/executor.py (3)
  • execute_or_delegate_operator (231-397)
  • operator_uri (946-948)
  • ExecutionResult (1134-1219)
🪛 Ruff (0.14.10)
fiftyone/operators/executor.py

492-494: Abstract raise to an inner function

(TRY301)


492-494: Avoid specifying long messages outside the exception class

(TRY003)


500-500: Do not catch blind exception: Exception

(BLE001)


505-505: Use explicit conversion flag

Replace with conversion flag

(RUF010)


509-509: Do not catch blind exception: Exception

(BLE001)


510-510: Use explicit conversion flag

Replace with conversion flag

(RUF010)

tests/unittests/operators/delegated_tests.py

1386-1386: Use pytest.raises instead of unittest-style assertRaises

(PT027)

tests/unittests/operators/executor_tests.py

201-201: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


203-203: Missing type annotation for *args

(ANN002)


203-203: Missing type annotation for **kwargs

(ANN003)


203-203: Unused method argument: kwargs

(ARG002)


248-248: Consider [pipeline_operator, *operators] instead of concatenation

Replace with [pipeline_operator, *operators]

(RUF005)


287-287: Consider [pipeline_operator, *operators] instead of concatenation

Replace with [pipeline_operator, *operators]

(RUF005)


327-327: Consider [pipeline_operator, *operators] instead of concatenation

Replace with [pipeline_operator, *operators]

(RUF005)


370-370: Consider [pipeline_operator, *operators] instead of concatenation

Replace with [pipeline_operator, *operators]

(RUF005)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: lint / eslint
  • GitHub Check: test / test-app
  • GitHub Check: test / test-python (ubuntu-latest, 3.11)
  • GitHub Check: test-windows / test-python (windows-latest, 3.11)
  • GitHub Check: test / test-python (ubuntu-latest, 3.9)
  • GitHub Check: build
🔇 Additional comments (4)
fiftyone/operators/delegated.py (1)

906-915: LGTM! Pipeline execution logic is correct.

The code correctly handles do_execute_pipeline's return value: None on success (line 513 in executor.py returns None when both error and error_message are falsy), or a tuple (error, error_message) on failure. The early return on error and setting result = None on success both integrate properly with the downstream outputs_schema resolution.

tests/unittests/operators/executor_tests.py (1)

237-399: LGTM! Comprehensive pipeline execution tests.

The test suite covers:

  • Successful multi-stage pipeline execution with parameter propagation
  • Early exit on stage failure
  • Missing operator detection and error handling
  • always_run behavior ensuring cleanup stages execute despite earlier failures

All tests verify the correct calls, parameters, and result states.

fiftyone/operators/executor.py (2)

373-381: LGTM! Return value check is correct.

Despite a past review comment claiming do_execute_pipeline always returns a tuple, line 513 clearly shows it returns None when both error and error_message are falsy:

return error, error_message if error or error_message else None

The check if result is None: correctly handles the success case.


461-513: Pipeline execution implementation is sound.

The function correctly:

  • Iterates through stages sequentially
  • Respects always_run flag for cleanup stages
  • Captures first error encountered and continues for always_run stages
  • Updates PipelineExecutionContext with current stage index
  • Returns None on success, (error, error_message) tuple on failure

Static analysis flags broad Exception catch (line 500), but this is intentional to ensure pipeline continues executing always_run stages even when earlier stages fail.

Minor: Consider using f-string conversion flags for exception formatting (e.g., {e!s} instead of str(e)) at lines 505 and 510 to align with Ruff RUF010, though this is stylistic.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In @tests/unittests/operators/executor_tests.py:
- Line 247: Replace list concatenation with iterable unpacking: where the test
stubs set list_operators.return_value = [pipeline_operator] + operators, change
it to use [pipeline_operator, *operators] to avoid an intermediate list; update
the same pattern at the other occurrences that set list_operators.return_value
(the sites using pipeline_operator and operators) for consistency.
- Around line 393-398: The assertion in
test_execute_pipeline_operator_fail_always_run is too strict: instead of
checking exact equality of result.error_message, update the assertion to allow
additional context by using startswith or using containment (e.g., assert
result.error_message.startswith("Failed to execute pipeline stage[0]: Operator
failed") or assert "Failed to execute pipeline stage[0]: Operator failed" in
result.error_message) while keeping the existing checks that result is an
ExecutionResult and result.error is not None; update the assertion referencing
result.error_message accordingly.
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 831e22a and de58c8c.

📒 Files selected for processing (1)
  • tests/unittests/operators/executor_tests.py
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-06-16T21:07:49.946Z
Learnt from: swheaton
Repo: voxel51/fiftyone PR: 6023
File: fiftyone/operators/executor.py:1326-1332
Timestamp: 2025-06-16T21:07:49.946Z
Learning: In fiftyone/operators/executor.py, the ExecutionOptions.__init__ method intentionally ignores the allow_distributed_execution parameter by hardcoding _allow_distributed_execution to False. This is deliberate scaffolding behavior to maintain API compatibility while reserving distributed execution functionality for FiftyOne Enterprise only.

Applied to files:

  • tests/unittests/operators/executor_tests.py
📚 Learning: 2025-12-09T15:35:45.409Z
Learnt from: swheaton
Repo: voxel51/fiftyone PR: 6646
File: tests/unittests/storage_tests.py:84-84
Timestamp: 2025-12-09T15:35:45.409Z
Learning: In the voxel51/fiftyone repository, when reviewing Python code that uses open(), do not suggest removing explicit mode="r" arguments even if Ruff flags UP015. The maintainers prefer explicit open mode for readability. Apply this guideline to all Python files in the repo (tests and source) where open() is used, not just a single file.

Applied to files:

  • tests/unittests/operators/executor_tests.py
📚 Learning: 2025-12-09T15:36:23.193Z
Learnt from: swheaton
Repo: voxel51/fiftyone PR: 6646
File: tests/unittests/storage_tests.py:274-274
Timestamp: 2025-12-09T15:36:23.193Z
Learning: In pytest.mark.parametrize, when parametrizing a single parameter you can use a plain string (e.g., 'param_name') as a shorthand. The tuple syntax ('param_name',) is only required when you are parametrizing multiple parameters. Do not flag PT006 for single-parameter parametrizations; use the string shorthand for clarity and correctness across tests.

Applied to files:

  • tests/unittests/operators/executor_tests.py
🧬 Code graph analysis (1)
tests/unittests/operators/executor_tests.py (3)
fiftyone/operators/operator.py (6)
  • Operator (121-345)
  • PipelineOperator (348-377)
  • name (139-140)
  • resolve_pipeline (359-371)
  • execute (208-219)
  • execute (373-377)
fiftyone/operators/_types/pipeline.py (2)
  • Pipeline (78-171)
  • stage (93-128)
fiftyone/operators/executor.py (3)
  • execute_or_delegate_operator (231-397)
  • operator_uri (946-948)
  • ExecutionResult (1134-1219)
🪛 Ruff (0.14.10)
tests/unittests/operators/executor_tests.py

202-202: Missing type annotation for *args

(ANN002)


202-202: Missing type annotation for **kwargs

(ANN003)


202-202: Unused method argument: kwargs

(ARG002)


247-247: Consider [pipeline_operator, *operators] instead of concatenation

Replace with [pipeline_operator, *operators]

(RUF005)


286-286: Consider [pipeline_operator, *operators] instead of concatenation

Replace with [pipeline_operator, *operators]

(RUF005)


326-326: Consider [pipeline_operator, *operators] instead of concatenation

Replace with [pipeline_operator, *operators]

(RUF005)


369-369: Consider [pipeline_operator, *operators] instead of concatenation

Replace with [pipeline_operator, *operators]

(RUF005)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
  • GitHub Check: build / build
  • GitHub Check: e2e / Test e2e (Mongo 7)
  • GitHub Check: test / test-python (ubuntu-latest, 3.12)
  • GitHub Check: test-windows / test-python (windows-latest, 3.12)
  • GitHub Check: test / test-python (ubuntu-latest, 3.9)
  • GitHub Check: test-windows / test-python (windows-latest, 3.11)
  • GitHub Check: test / test-python (ubuntu-latest, 3.10)
  • GitHub Check: test / test-python (ubuntu-latest, 3.11)
  • GitHub Check: test-windows / test-python (windows-latest, 3.10)
  • GitHub Check: test / test-app
  • GitHub Check: e2e / Test e2e (Mongo 6)
  • GitHub Check: test-windows / test-python (windows-latest, 3.9)
  • GitHub Check: e2e / Test e2e (Mongo latest)
  • GitHub Check: e2e / Test e2e (Mongo 8.0)
  • GitHub Check: build
🔇 Additional comments (2)
tests/unittests/operators/executor_tests.py (2)

236-398: Good coverage of immediate pipeline execution scenarios.

The new tests sensibly cover: happy-path pipeline execution, early exit on operator failure, missing-operator handling, and the always_run finalizer semantics, while also asserting dataset_name/params propagation and operator call counts. No functional issues spotted in this suite.


202-208: Trim record_request_params signature to remove unused kwargs.

record_request_params never uses kwargs, so accepting it just triggers Ruff’s ARG002/ANN003 with no benefit. You can keep the helper minimal and silence the warnings by dropping **kwargs from the signature:

Proposed change
-    def record_request_params(self, *args, **kwargs):
-        self.calls.append(copy.deepcopy(args[0].request_params))
+    def record_request_params(self, *args):
+        self.calls.append(copy.deepcopy(args[0].request_params))

This keeps the behavior identical while reducing lint noise.

⛔ Skipped due to learnings
Learnt from: swheaton
Repo: voxel51/fiftyone PR: 6385
File: fiftyone/operators/_types/pipeline.py:64-92
Timestamp: 2025-10-07T01:19:39.063Z
Learning: In fiftyone/operators/_types/pipeline.py, the Pipeline.stage() method intentionally accepts unused **kwargs for forward compatibility. This design pattern should not be flagged as an issue.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @tests/unittests/operators/delegated_tests.py:
- Around line 1387-1390: Split the compound assertion into two checks for
clearer test diagnostics: replace the combined assertion that references
result.error_message with first asserting result.error_message is not None and
then asserting that the substring "Pipeline execution failed" is contained in
result.error_message; update the assertions in the test that currently use the
compound form so failures show whether the message is missing versus the
expected text not present.
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between de58c8c and a84e24a.

📒 Files selected for processing (2)
  • tests/unittests/operators/delegated_tests.py
  • tests/unittests/operators/executor_tests.py
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: swheaton
Repo: voxel51/fiftyone PR: 6385
File: fiftyone/operators/_types/pipeline.py:64-92
Timestamp: 2025-10-07T01:19:39.063Z
Learning: In fiftyone/operators/_types/pipeline.py, the Pipeline.stage() method intentionally accepts unused **kwargs for forward compatibility. This design pattern should not be flagged as an issue.
📚 Learning: 2025-10-07T01:19:39.063Z
Learnt from: swheaton
Repo: voxel51/fiftyone PR: 6385
File: fiftyone/operators/_types/pipeline.py:64-92
Timestamp: 2025-10-07T01:19:39.063Z
Learning: In fiftyone/operators/_types/pipeline.py, the Pipeline.stage() method intentionally accepts unused **kwargs for forward compatibility. This design pattern should not be flagged as an issue.

Applied to files:

  • tests/unittests/operators/executor_tests.py
📚 Learning: 2025-06-16T21:07:49.946Z
Learnt from: swheaton
Repo: voxel51/fiftyone PR: 6023
File: fiftyone/operators/executor.py:1326-1332
Timestamp: 2025-06-16T21:07:49.946Z
Learning: In fiftyone/operators/executor.py, the ExecutionOptions.__init__ method intentionally ignores the allow_distributed_execution parameter by hardcoding _allow_distributed_execution to False. This is deliberate scaffolding behavior to maintain API compatibility while reserving distributed execution functionality for FiftyOne Enterprise only.

Applied to files:

  • tests/unittests/operators/executor_tests.py
📚 Learning: 2025-12-09T15:35:45.409Z
Learnt from: swheaton
Repo: voxel51/fiftyone PR: 6646
File: tests/unittests/storage_tests.py:84-84
Timestamp: 2025-12-09T15:35:45.409Z
Learning: In the voxel51/fiftyone repository, when reviewing Python code that uses open(), do not suggest removing explicit mode="r" arguments even if Ruff flags UP015. The maintainers prefer explicit open mode for readability. Apply this guideline to all Python files in the repo (tests and source) where open() is used, not just a single file.

Applied to files:

  • tests/unittests/operators/executor_tests.py
  • tests/unittests/operators/delegated_tests.py
📚 Learning: 2025-12-09T15:36:23.193Z
Learnt from: swheaton
Repo: voxel51/fiftyone PR: 6646
File: tests/unittests/storage_tests.py:274-274
Timestamp: 2025-12-09T15:36:23.193Z
Learning: In pytest.mark.parametrize, when parametrizing a single parameter you can use a plain string (e.g., 'param_name') as a shorthand. The tuple syntax ('param_name',) is only required when you are parametrizing multiple parameters. Do not flag PT006 for single-parameter parametrizations; use the string shorthand for clarity and correctness across tests.

Applied to files:

  • tests/unittests/operators/executor_tests.py
  • tests/unittests/operators/delegated_tests.py
🧬 Code graph analysis (2)
tests/unittests/operators/executor_tests.py (3)
fiftyone/operators/operator.py (6)
  • Operator (121-345)
  • PipelineOperator (348-377)
  • name (139-140)
  • resolve_pipeline (359-371)
  • execute (208-219)
  • execute (373-377)
fiftyone/operators/_types/pipeline.py (2)
  • Pipeline (78-171)
  • stage (93-128)
fiftyone/operators/executor.py (3)
  • execute_or_delegate_operator (231-397)
  • operator_uri (946-948)
  • ExecutionResult (1134-1219)
tests/unittests/operators/delegated_tests.py (2)
fiftyone/factory/repos/delegated_operation_doc.py (2)
  • num_distributed_tasks (71-74)
  • DelegatedOperationDocument (23-170)
fiftyone/operators/delegated.py (2)
  • _execute_operator (851-934)
  • set_progress (164-176)
🪛 Ruff (0.14.10)
tests/unittests/operators/executor_tests.py

202-202: Missing type annotation for *args

(ANN002)


202-202: Missing type annotation for **kwargs

(ANN003)


202-202: Unused method argument: kwargs

(ARG002)


247-247: Consider [pipeline_operator, *operators] instead of concatenation

Replace with [pipeline_operator, *operators]

(RUF005)


286-286: Consider [pipeline_operator, *operators] instead of concatenation

Replace with [pipeline_operator, *operators]

(RUF005)


326-326: Consider [pipeline_operator, *operators] instead of concatenation

Replace with [pipeline_operator, *operators]

(RUF005)


369-369: Consider [pipeline_operator, *operators] instead of concatenation

Replace with [pipeline_operator, *operators]

(RUF005)

tests/unittests/operators/delegated_tests.py

1387-1390: Assertion should be broken down into multiple parts

Break down assertion into multiple parts

(PT018)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: e2e / Test e2e (Mongo latest)
  • GitHub Check: test-windows / test-python (windows-latest, 3.9)
  • GitHub Check: e2e / Test e2e (Mongo 8.0)
  • GitHub Check: build / build
  • GitHub Check: build
🔇 Additional comments (7)
tests/unittests/operators/delegated_tests.py (1)

1281-1336: LGTM - delegated pipeline execution test is well-structured.

The test correctly verifies that when a pipeline document is executed via _execute_operator, the delegated path prepares the operator executor with pipeline_ctx=None and invokes do_execute_pipeline with the pipeline and context. The mock setup accurately reflects the success case where do_execute_pipeline returns None.

tests/unittests/operators/executor_tests.py (6)

8-8: LGTM - imports are appropriately used.

The new imports (copy, mock, PipelineOperator) are all used in the TestPipeline class to support pipeline execution testing.

Also applies to: 10-10, 20-20


205-234: LGTM - fixture setup is clean and comprehensive.

The autouse fixture properly initializes test state (self.calls = []) and dynamically creates mock operators with the appropriate side effects for tracking execution. The setup correctly supports all four pipeline test scenarios.


237-271: LGTM - comprehensive happy path test.

The test correctly verifies that a 2-stage pipeline executes both operators in sequence, with each stage receiving the correct dataset_name and stage-specific params. The assertions confirm successful execution without errors.


273-309: LGTM - failure handling test is correct.

The test properly validates that when the first pipeline stage fails, subsequent stages are skipped and the error message correctly identifies the failing stage. The use of startswith for the error message assertion allows flexibility for additional error context.


311-348: LGTM - missing operator test validates error reporting.

The test correctly verifies that when a pipeline stage references a non-existent operator, execution halts at that stage with an appropriate error message identifying the missing operator by URI.


350-398: LGTM - always_run behavior is correctly tested.

The test properly validates the critical always_run semantics: when an early stage fails, intermediate stages are skipped but stages marked always_run=True still execute. The final error correctly reflects the initial failure rather than subsequent stage execution.

Copy link
Contributor

@CamronStaley CamronStaley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm just to clarify though the idea is that this enables calling execute on a pipeline operator where as in FOT we never do that it's always broker manager that looks at the parent operator and handles the children.

If my assumptions are correct above this means partitions / num children will just be ignored if someone did do direct execution including that correct?

@swheaton
Copy link
Contributor Author

swheaton commented Jan 7, 2026

lgtm just to clarify though the idea is that this enables calling execute on a pipeline operator where as in FOT we never do that it's always broker manager that looks at the parent operator and handles the children.

If my assumptions are correct above this means partitions / num children will just be ignored if someone did do direct execution including that correct?

it gives meaning to "executing" a pipeline which is simply triggering one operator after another in series. Any request for distribution or other such features are ignored

@swheaton swheaton merged commit f5136a5 into develop Jan 8, 2026
23 checks passed
@swheaton swheaton deleted the feat/foepd-2564-live-pipeline-exec branch January 8, 2026 01:41
@swheaton swheaton changed the title FOEPD-2564 support operator pipeline live execution FOEPD-2040 support operator pipeline live execution Jan 8, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants