Skip to content

[Backend Tester] Add backend filtering, improved test discovery #12624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 27 additions & 29 deletions backends/test/suite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
import unittest

from enum import Enum
from typing import Any, Callable, Tuple
from typing import Callable

import executorch.backends.test.suite.flow

import torch
from executorch.backends.test.harness import Tester
from executorch.backends.test.suite.context import get_active_test_context, TestContext
from executorch.backends.test.suite.flow import TestFlow
from executorch.backends.test.suite.reporting import log_test_summary
from executorch.backends.test.suite.runner import run_test, runner_main

Expand Down Expand Up @@ -44,22 +46,20 @@ def is_backend_enabled(backend):
return backend in _ENABLED_BACKENDS


ALL_TEST_FLOWS = []
_ALL_TEST_FLOWS: dict[str, TestFlow] = {}

if is_backend_enabled("xnnpack"):
from executorch.backends.xnnpack.test.tester import Tester as XnnpackTester

XNNPACK_TEST_FLOW = ("xnnpack", XnnpackTester)
ALL_TEST_FLOWS.append(XNNPACK_TEST_FLOW)
def get_test_flows() -> dict[str, TestFlow]:
global _ALL_TEST_FLOWS

if is_backend_enabled("coreml"):
try:
from executorch.backends.apple.coreml.test.tester import CoreMLTester
if not _ALL_TEST_FLOWS:
_ALL_TEST_FLOWS = {
name: f
for name, f in executorch.backends.test.suite.flow.all_flows().items()
if is_backend_enabled(f.backend)
}

COREML_TEST_FLOW = ("coreml", CoreMLTester)
ALL_TEST_FLOWS.append(COREML_TEST_FLOW)
except Exception:
print("Core ML AOT is not available.")
return _ALL_TEST_FLOWS


DTYPES = [
Expand Down Expand Up @@ -115,53 +115,51 @@ def _create_tests(cls):
# Expand a test into variants for each registered flow.
def _expand_test(cls, test_name: str):
test_func = getattr(cls, test_name)
for flow_name, tester_factory in ALL_TEST_FLOWS:
_create_test_for_backend(cls, test_func, flow_name, tester_factory)
for flow in get_test_flows().values():
_create_test_for_backend(cls, test_func, flow)
delattr(cls, test_name)


def _make_wrapped_test(
test_func: Callable,
test_name: str,
test_flow: str,
tester_factory: Callable,
flow: TestFlow,
params: dict | None = None,
):
def wrapped_test(self):
with TestContext(test_name, test_flow, params):
with TestContext(test_name, flow.name, params):
test_kwargs = params or {}
test_kwargs["tester_factory"] = tester_factory
test_kwargs["tester_factory"] = flow.tester_factory

test_func(self, **test_kwargs)

wrapped_test._name = test_name
wrapped_test._flow = flow

return wrapped_test


def _create_test_for_backend(
cls,
test_func: Callable,
flow_name: str,
tester_factory: Callable[[torch.nn.Module, Tuple[Any]], Tester],
flow: TestFlow,
):
test_type = getattr(test_func, "test_type", TestType.STANDARD)

if test_type == TestType.STANDARD:
wrapped_test = _make_wrapped_test(
test_func, test_func.__name__, flow_name, tester_factory
)
test_name = f"{test_func.__name__}_{flow_name}"
wrapped_test = _make_wrapped_test(test_func, test_func.__name__, flow)
test_name = f"{test_func.__name__}_{flow.name}"
setattr(cls, test_name, wrapped_test)
elif test_type == TestType.DTYPE:
for dtype in DTYPES:
wrapped_test = _make_wrapped_test(
test_func,
test_func.__name__,
flow_name,
tester_factory,
flow,
{"dtype": dtype},
)
dtype_name = str(dtype)[6:] # strip "torch."
test_name = f"{test_func.__name__}_{dtype_name}_{flow_name}"
test_name = f"{test_func.__name__}_{dtype_name}_{flow.name}"
setattr(cls, test_name, wrapped_test)
else:
raise NotImplementedError(f"Unknown test type {test_type}.")
Expand Down
63 changes: 63 additions & 0 deletions backends/test/suite/discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

# pyre-unsafe

import os
import unittest

from types import ModuleType

from executorch.backends.test.suite.flow import TestFlow

#
# This file contains logic related to test discovery and filtering.
#


def discover_tests(
root_module: ModuleType, backends: set[str] | None
) -> unittest.TestSuite:
# Collect all tests using the unittest discovery mechanism then filter down.

# Find the file system path corresponding to the root module.
module_file = root_module.__file__
if module_file is None:
raise RuntimeError(f"Module {root_module} has no __file__ attribute")

loader = unittest.TestLoader()
module_dir = os.path.dirname(module_file)
suite = loader.discover(module_dir)

return _filter_tests(suite, backends)


def _filter_tests(
suite: unittest.TestSuite, backends: set[str] | None
) -> unittest.TestSuite:
# Recursively traverse the test suite and add them to the filtered set.
filtered_suite = unittest.TestSuite()

for child in suite:
if isinstance(child, unittest.TestSuite):
filtered_suite.addTest(_filter_tests(child, backends))
elif isinstance(child, unittest.TestCase):
if _is_test_enabled(child, backends):
filtered_suite.addTest(child)
else:
raise RuntimeError(f"Unexpected test type: {type(child)}")

return filtered_suite


def _is_test_enabled(test_case: unittest.TestCase, backends: set[str] | None) -> bool:
test_method = getattr(test_case, test_case._testMethodName)

if backends is not None:
flow: TestFlow = test_method._flow
return flow.backend in backends
else:
return True
62 changes: 62 additions & 0 deletions backends/test/suite/flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import logging

from dataclasses import dataclass
from typing import Callable

from executorch.backends.test.harness import Tester

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


@dataclass
class TestFlow:
"""
A lowering flow to test. This typically corresponds to a combination of a backend and
a lowering recipe.
"""

name: str
""" The name of the lowering flow. """

backend: str
""" The name of the target backend. """

tester_factory: Callable[[], Tester]
""" A factory function that returns a Tester instance for this lowering flow. """


def create_xnnpack_flow() -> TestFlow | None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def create_xnnpack_flow() -> TestFlow | None:
@classmethod
def from_name(name: str) -> TestFlow | None:
...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I slightly prefer the approach of having individual methods that can be explicitly called per flow, but I'll defer to your judgement if you feel strongly about it.

try:
from executorch.backends.xnnpack.test.tester import Tester as XnnpackTester

return TestFlow(
name="xnnpack",
backend="xnnpack",
tester_factory=XnnpackTester,
)
except Exception:
logger.info("Skipping XNNPACK flow registration due to import failure.")
return None


def create_coreml_flow() -> TestFlow | None:
try:
from executorch.backends.apple.coreml.test.tester import CoreMLTester

return TestFlow(
name="coreml",
backend="coreml",
tester_factory=CoreMLTester,
)
except Exception:
logger.info("Skipping Core ML flow registration due to import failure.")
return None


def all_flows() -> dict[str, TestFlow]:
flows = [
create_xnnpack_flow(),
create_coreml_flow(),
]
return {f.name: f for f in flows if f is not None}
11 changes: 11 additions & 0 deletions backends/test/suite/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,14 @@
# LICENSE file in the root directory of this source tree.

# pyre-unsafe

import os


def load_tests(loader, suite, pattern):
package_dir = os.path.dirname(__file__)
discovered_suite = loader.discover(
start_dir=package_dir, pattern=pattern or "test_*.py"
)
suite.addTests(discovered_suite)
return suite
6 changes: 1 addition & 5 deletions backends/test/suite/reporting.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from collections import Counter
from dataclasses import dataclass
from enum import IntEnum, nonmember
from enum import IntEnum


class TestResult(IntEnum):
Expand Down Expand Up @@ -33,19 +33,15 @@ class TestResult(IntEnum):
UNKNOWN_FAIL = 8
""" The test failed in an unknown or unexpected manner. """

@nonmember
def is_success(self):
return self in {TestResult.SUCCESS, TestResult.SUCCESS_UNDELEGATED}

@nonmember
def is_non_backend_failure(self):
return self in {TestResult.EAGER_FAIL, TestResult.EAGER_FAIL}

@nonmember
def is_backend_failure(self):
return not self.is_success() and not self.is_non_backend_failure()

@nonmember
def display_name(self):
if self == TestResult.SUCCESS:
return "Success (Delegated)"
Expand Down
40 changes: 35 additions & 5 deletions backends/test/suite/runner.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import argparse
import importlib
import unittest

from typing import Callable

import torch

from executorch.backends.test.harness import Tester
from executorch.backends.test.suite.discovery import discover_tests
from executorch.backends.test.suite.reporting import (
begin_test_session,
complete_test_session,
Expand All @@ -15,6 +17,12 @@
)


# A list of all runnable test suites and the corresponding python package.
NAMED_SUITES = {
"operators": "executorch.backends.test.suite.operators",
}


def run_test( # noqa: C901
model: torch.nn.Module,
inputs: any,
Expand Down Expand Up @@ -130,20 +138,42 @@ def parse_args():
prog="ExecuTorch Backend Test Suite",
description="Run ExecuTorch backend tests.",
)
parser.add_argument("test_path", nargs="?", help="Prefix filter for tests to run.")
parser.add_argument(
"suite",
nargs="*",
help="The test suite to run.",
choices=NAMED_SUITES.keys(),
default=["operators"],
)
parser.add_argument(
"-b", "--backend", nargs="*", help="The backend or backends to test."
)
return parser.parse_args()


def test(suite):
if isinstance(suite, unittest.TestSuite):
print(f"Suite: {suite}")
for t in suite:
test(t)
else:
print(f"Leaf: {type(suite)} {suite}")
print(f" {suite.__name__}")
print(f" {callable(suite)}")


def runner_main():
args = parse_args()

begin_test_session()

test_path = args.test_path or "executorch.backends.test.suite.operators"
if len(args.suite) > 1:
raise NotImplementedError("TODO Support multiple suites.")

loader = unittest.TestLoader()
suite = loader.loadTestsFromName(test_path)
unittest.TextTestRunner().run(suite)
test_path = NAMED_SUITES[args.suite[0]]
test_root = importlib.import_module(test_path)
suite = discover_tests(test_root, args.backend)
unittest.TextTestRunner(verbosity=2).run(suite)

summary = complete_test_session()
print_summary(summary)
Expand Down
Loading