Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions singer_sdk/tap_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def run_connection_test(self) -> bool:
def run_sync_dry_run(
self,
dry_run_record_limit: int | None = 1,
streams: t.Iterable[Stream] | None = None,
streams: t.Iterable[Stream | str] | None = None,
) -> bool:
"""Run connection test.

Expand All @@ -279,15 +279,24 @@ def run_sync_dry_run(
if streams is None:
streams = self.streams.values()

for stream in streams:
selected_streams: list[Stream] = []

for stream_or_name in streams:
stream = (
self.streams[stream_or_name]
if isinstance(stream_or_name, str)
else stream_or_name
)

if not stream.child_streams: # pragma: no branch
# Initialize streams' record limits before beginning the sync test.
stream.ABORT_AT_RECORD_COUNT = dry_run_record_limit

# Force selection of streams.
stream.selected = True
selected_streams.append(stream)

for stream in streams:
for stream in selected_streams:
if stream.parent_stream_type:
self.logger.debug(
"Child stream '%s' should be called by "
Expand Down
34 changes: 33 additions & 1 deletion singer_sdk/testing/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
)

if t.TYPE_CHECKING:
from collections.abc import Sequence

from singer_sdk import Stream, Tap, Target
from singer_sdk.testing.templates import (
AttributeTestTemplate,
Expand Down Expand Up @@ -91,6 +93,7 @@ def new_test_class(
include_stream_attribute_tests: bool = True,
custom_suites: list | None = None,
suite_config: SuiteConfig | None = None,
streams: Sequence[str] | None = None,
**kwargs: t.Any,
) -> type[BaseTestClass]:
"""Get a new test class.
Expand All @@ -102,6 +105,9 @@ def new_test_class(
Include stream attribute tests in the test class.
custom_suites: List of custom test suites to include in the test class.
suite_config: SuiteConfig instance to be used when instantiating tests.
streams: Optional list of stream names to test. If provided, only the
specified streams will be tested during dry run execution. If omitted
or None, all available streams will be tested.
kwargs: Default arguments to be passed to tap on create.

Returns:
Expand All @@ -124,6 +130,7 @@ def new_test_class(
tap_class=self.tap_class,
config=self.config,
suite_config=suite_config,
streams=streams,
**kwargs,
)

Expand Down Expand Up @@ -192,7 +199,17 @@ def _annotate_test_class(
self._with_tap_tests(empty_test_class, suite)

if suite.kind in {"tap_stream", "tap_stream_attribute"}:
streams = list(test_runner.new_tap().streams.values())
all_streams = list(test_runner.new_tap().streams.values())

# Filter streams if specific streams were requested
if test_runner.streams is not None:
streams = [
stream
for stream in all_streams
if stream.name in test_runner.streams
]
else:
streams = all_streams

if suite.kind == "tap_stream":
self._with_stream_tests(empty_test_class, suite, streams)
Expand Down Expand Up @@ -396,6 +413,7 @@ def get_tap_test_class(
include_stream_attribute_tests: bool = True,
custom_suites: list | None = None,
suite_config: SuiteConfig | None = None,
streams: Sequence[str] | None = None,
**kwargs: t.Any,
) -> type[BaseTestClass]:
"""Get Tap Test Class.
Expand All @@ -408,10 +426,23 @@ def get_tap_test_class(
include_stream_attribute_tests: Include Tap stream attribute tests.
custom_suites: Custom test suites to add to standard tests.
suite_config: SuiteConfig instance to pass to tests.
streams: Optional list of stream names to test. If provided, only the
specified streams will be tested during dry run execution. If omitted
or None, all available streams will be tested. This is useful for
focusing tests on specific streams or reducing test execution time.
kwargs: Keyword arguments to pass to the TapRunner.

Returns:
A test class usable by pytest.

Example:
Test only specific streams::

TestMyTap = get_tap_test_class(
tap_class=MyTap,
config=SAMPLE_CONFIG,
streams=["users", "orders"],
)
"""
factory = TapTestClassFactory(
tap_class=tap_class,
Expand All @@ -423,6 +454,7 @@ def get_tap_test_class(
include_tap_tests=include_tap_tests,
include_stream_tests=include_stream_tests,
include_stream_attribute_tests=include_stream_attribute_tests,
streams=streams,
**kwargs,
)

Expand Down
8 changes: 8 additions & 0 deletions singer_sdk/testing/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from singer_sdk.testing.config import SuiteConfig

if t.TYPE_CHECKING:
from collections.abc import Sequence
from pathlib import Path

from singer_sdk.helpers._compat import Traversable
Expand Down Expand Up @@ -96,6 +97,8 @@ def __init__(
tap_class: type[Tap],
config: dict | None = None,
suite_config: SuiteConfig | None = None,
*,
streams: Sequence[str] | None = None,
**kwargs: t.Any,
) -> None:
"""Initialize Tap instance.
Expand All @@ -105,6 +108,9 @@ def __init__(
config: Config dict to pass to Tap class.
suite_config (SuiteConfig): SuiteConfig instance to be used when
instantiating tests.
streams: Optional list of stream names to test. If provided, only the
specified streams will be tested during dry run execution. If omitted
or None, all available streams will be tested.
kwargs: Default arguments to be passed to tap on create.
"""
super().__init__(
Expand All @@ -113,6 +119,7 @@ def __init__(
suite_config=suite_config,
**kwargs,
)
self.streams = streams

def new_tap(self) -> Tap:
"""Get new Tap instance.
Expand Down Expand Up @@ -148,6 +155,7 @@ def run_sync_dry_run(self) -> bool:
new_tap = self.new_tap()
return new_tap.run_sync_dry_run(
dry_run_record_limit=self.suite_config.max_records_limit,
streams=self.streams,
)

def sync_all(self, **kwargs: t.Any) -> None: # noqa: ARG002
Expand Down
143 changes: 143 additions & 0 deletions tests/core/testing/test_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def test_new_test_class_default_suites(
):
"""Test new_test_class creates class with default test suites."""
mock_runner = Mock()
mock_runner.streams = None # No stream filtering
mock_runner_class.return_value = mock_runner

# Mock streams for stream tests with proper stream_maps structure
Expand All @@ -136,6 +137,51 @@ def test_new_test_class_default_suites(
tap_class=factory.tap_class,
config=factory.config,
suite_config=None,
streams=None,
parse_env_config=True,
)

@patch("singer_sdk.testing.factory.TapTestRunner")
@pytest.mark.parametrize(
"streams",
[
None,
[],
["stream1"],
["stream1", "stream2"],
],
)
def test_new_test_class_with_streams_parameter(
self,
mock_runner_class,
factory: TapTestClassFactory,
streams,
):
"""Test new_test_class with various streams parameter values."""
mock_runner = Mock()
mock_runner.streams = streams # Set the streams attribute on the runner
mock_runner_class.return_value = mock_runner

mock_stream = Mock(spec=Stream)
mock_stream.name = "test_stream"
mock_stream_map = Mock()
mock_stream_map.transformed_schema = {"properties": {"id": {"type": "integer"}}}
mock_stream.stream_maps = [mock_stream_map]

mock_tap = Mock()
mock_tap.streams = {"test_stream": mock_stream}
mock_runner.new_tap.return_value = mock_tap

test_class = factory.new_test_class(streams=streams)

assert issubclass(test_class, BaseTestClass)

# Verify TapTestRunner was called with correct streams parameter
mock_runner_class.assert_called_once_with(
tap_class=factory.tap_class,
config=factory.config,
suite_config=None,
streams=streams,
parse_env_config=True,
)

Expand All @@ -147,6 +193,7 @@ def test_new_test_class_custom_kwargs(
):
"""Test new_test_class with custom kwargs."""
mock_runner = Mock()
mock_runner.streams = None # No stream filtering
mock_runner_class.return_value = mock_runner

mock_stream = Mock(spec=Stream)
Expand Down Expand Up @@ -176,6 +223,7 @@ def test_new_test_class_custom_kwargs(
tap_class=factory.tap_class,
config=factory.config,
suite_config=suite_config,
streams=None,
parse_env_config=False,
custom_kwarg=custom_kwarg,
)
Expand Down Expand Up @@ -278,6 +326,52 @@ def test(self) -> None: ...
"attribute_name",
}

@patch("singer_sdk.testing.factory.TapTestRunner")
def test_new_test_class_filters_streams_correctly(
self,
mock_runner_class,
factory: TapTestClassFactory,
):
"""Test that streams parameter filters which stream tests are generated."""
mock_runner = Mock()
mock_runner.streams = ["stream1"] # Only test stream1
mock_runner_class.return_value = mock_runner

# Create multiple mock streams
stream1 = Mock(spec=Stream)
stream1.name = "stream1"
stream2 = Mock(spec=Stream)
stream2.name = "stream2"
stream3 = Mock(spec=Stream)
stream3.name = "stream3"

# Mock the tap to return all streams
mock_tap = Mock()
mock_tap.streams = {
"stream1": stream1,
"stream2": stream2,
"stream3": stream3,
}
mock_runner.new_tap.return_value = mock_tap

# Create test class with streams filter
test_class = factory.new_test_class(
streams=["stream1"],
include_tap_tests=False,
include_stream_tests=True,
include_stream_attribute_tests=False,
)

# Only stream1 should have test methods generated
# Check that params only contains stream1
for method_name, params in test_class.params.items():
if method_name.startswith("test_tap_stream_"):
# All params should be for stream1 only
param_ids = [param.id for param in params]
assert param_ids == ["stream1"], (
f"Expected only stream1 tests, but found: {param_ids}"
)

def test_get_empty_test_class(self, factory: TapTestClassFactory):
"""Test _get_empty_test_class creates proper fixtures."""
mock_runner = Mock()
Expand Down Expand Up @@ -636,11 +730,60 @@ def test_get_tap_test_class(
include_tap_tests=True,
include_stream_tests=False,
include_stream_attribute_tests=True,
streams=None,
custom_kwarg="value",
)

assert result is mock_test_class

@patch("singer_sdk.testing.factory.TapTestClassFactory")
@pytest.mark.parametrize(
"streams",
[
None,
[],
["stream1"],
["stream1", "stream2"],
],
)
def test_get_tap_test_class_with_streams_parameter(
self,
mock_factory_class,
mock_tap_class,
streams,
):
"""Test get_tap_test_class with various streams parameter values."""
mock_factory = Mock()
mock_test_class = Mock()
mock_factory.new_test_class.return_value = mock_test_class
mock_factory_class.return_value = mock_factory

config = {"test": "config"}

result = get_tap_test_class(
tap_class=mock_tap_class,
config=config,
streams=streams,
)

# Verify factory was created correctly
mock_factory_class.assert_called_once_with(
tap_class=mock_tap_class,
config=config,
)

# Verify new_test_class was called with correct streams parameter
mock_factory.new_test_class.assert_called_once_with(
custom_suites=None,
suite_config=None,
include_tap_tests=True,
include_stream_tests=True,
include_stream_attribute_tests=True,
streams=streams,
)

assert result is mock_test_class

@patch("singer_sdk.testing.factory.TargetTestClassFactory")
def test_get_target_test_class(self, mock_factory_class, mock_target_class):
"""Test get_target_test_class function."""
Expand Down
Loading
Loading