diff --git a/core/dbt/cli/requires.py b/core/dbt/cli/requires.py index bf13fb5aa57..12a631417bc 100644 --- a/core/dbt/cli/requires.py +++ b/core/dbt/cli/requires.py @@ -385,7 +385,9 @@ def wrapper(*args, **kwargs): flags = ctx.obj["flags"] ctx_project = ctx.obj["project"] - _catalogs = load_catalogs(flags.PROJECT_DIR, ctx_project.project_name, flags.VARS) + _catalogs = load_catalogs( + flags.PROJECT_DIR, ctx_project.project_name, ctx_project.all_source_paths, flags.VARS + ) ctx.obj["catalogs"] = _catalogs return func(*args, **kwargs) diff --git a/core/dbt/config/catalogs.py b/core/dbt/config/catalogs.py index 0992bcc7db5..1dce6d361e8 100644 --- a/core/dbt/config/catalogs.py +++ b/core/dbt/config/catalogs.py @@ -11,9 +11,7 @@ from dbt_common.exceptions import CompilationError, DbtValidationError -def load_catalogs_yml(project_dir: str, project_name: str) -> Dict[str, Any]: - path = os.path.join(project_dir, CATALOGS_FILE_NAME) - +def load_catalogs_yml(path: str, project_name: str, relative_path: str) -> Dict[str, Any]: if os.path.isfile(path): try: contents = load_file_contents(path, strip=False) @@ -24,11 +22,57 @@ def load_catalogs_yml(project_dir: str, project_name: str) -> Dict[str, Any]: return yaml_content except DbtValidationError as e: - raise YamlLoadError(project_name=project_name, path=CATALOGS_FILE_NAME, exc=e) + raise YamlLoadError(project_name=project_name, path=relative_path, exc=e) return {} +def load_catalogs_from_schema_yml( + path: str, project_name: str, relative_path: str +) -> List[Dict[str, Any]]: + if not os.path.isfile(path): + return [] + + try: + contents = load_file_contents(path, strip=False) + yaml_content = load_yaml_text(contents) + except DbtValidationError as e: + raise YamlLoadError(project_name=project_name, path=relative_path, exc=e) + + if not yaml_content: + return [] + + if not isinstance(yaml_content, dict): + raise YamlLoadError( + project_name=project_name, + path=relative_path, + exc=DbtValidationError( + f"Contents of file '{relative_path}' are not valid. Dictionary expected." + ), + ) + + return yaml_content.get("catalogs", []) + + +def _schema_yml_paths(project_dir: str, source_paths: List[str]) -> List[str]: + schema_paths = set() + + for source_path in source_paths: + absolute_source_path = os.path.join(project_dir, source_path) + if not os.path.isdir(absolute_source_path): + continue + + for root, _, files in os.walk(absolute_source_path): + for file_name in files: + if not file_name.endswith((".yml", ".yaml")): + continue + + absolute_path = os.path.join(root, file_name) + schema_paths.add(os.path.relpath(absolute_path, project_dir)) + + return sorted(schema_paths) + + def load_single_catalog(raw_catalog: Dict[str, Any], renderer: SecretRenderer) -> Catalog: try: rendered_catalog = renderer.render_data(raw_catalog) @@ -76,8 +120,20 @@ def load_single_catalog(raw_catalog: Dict[str, Any], renderer: SecretRenderer) - ) -def load_catalogs(project_dir: str, project_name: str, cli_vars: Dict[str, Any]) -> List[Catalog]: - raw_catalogs = load_catalogs_yml(project_dir, project_name).get("catalogs", []) +def load_catalogs( + project_dir: str, project_name: str, source_paths: List[str], cli_vars: Dict[str, Any] +) -> List[Catalog]: + raw_catalogs = load_catalogs_yml( + os.path.join(project_dir, CATALOGS_FILE_NAME), project_name, CATALOGS_FILE_NAME + ).get("catalogs", []) + + for relative_path in _schema_yml_paths(project_dir, source_paths): + raw_catalogs.extend( + load_catalogs_from_schema_yml( + os.path.join(project_dir, relative_path), project_name, relative_path + ) + ) + catalogs_renderer = SecretRenderer(cli_vars) return [load_single_catalog(raw_catalog, catalogs_renderer) for raw_catalog in raw_catalogs] diff --git a/core/dbt/jsonschemas/resources/latest.json b/core/dbt/jsonschemas/resources/latest.json index 2283b7ea2f0..bb30427ddc3 100644 --- a/core/dbt/jsonschemas/resources/latest.json +++ b/core/dbt/jsonschemas/resources/latest.json @@ -21,6 +21,15 @@ "$ref": "#/definitions/AnyValue" } }, + "catalogs": { + "type": [ + "array", + "null" + ], + "items": { + "$ref": "#/definitions/AnyValue" + } + }, "data_tests": { "type": [ "array", diff --git a/tests/functional/catalogs/test_catalogs_parsing.py b/tests/functional/catalogs/test_catalogs_parsing.py index 8e1364d3579..5d91a4c3214 100644 --- a/tests/functional/catalogs/test_catalogs_parsing.py +++ b/tests/functional/catalogs/test_catalogs_parsing.py @@ -159,3 +159,34 @@ def test_integration(self, project, catalogs, adapter): error_msg = "Catalog 'write_catalog' cannot have multiple 'write_integrations' with the same name: 'write_integration_1'." with pytest.raises(DbtValidationError, match=error_msg): run_dbt(["run"]) + + +class TestCatalogsInSchemaYml: + @pytest.fixture + def schema_yml(self): + return { + "version": 2, + "catalogs": [ + {"name": "write_catalog_1", "write_integrations": [write_integration_1]}, + {"name": "write_catalog_2", "write_integrations": [write_integration_2]}, + ], + } + + def test_integration(self, project, schema_yml, adapter): + write_config_file(schema_yml, project.project_root, "models", "schema.yml") + + with mock.patch.object( + type(project.adapter), "CATALOG_INTEGRATIONS", [WriteCatalogIntegration] + ): + run_dbt(["run"]) + + for i in range(1, 3): + write_integration = project.adapter.get_catalog_integration(f"write_catalog_{i}") + assert isinstance(write_integration, WriteCatalogIntegration) + assert write_integration.name == f"write_catalog_{i}" + assert write_integration.catalog_type == "write" + assert write_integration.catalog_name == f"write_integration_{i}" + assert write_integration.table_format == "write_format" + assert write_integration.external_volume == "write_external_volume" + assert write_integration.allows_writes is True + assert write_integration.my_custom_property == f"foo_{i}" diff --git a/tests/unit/config/test_catalogs.py b/tests/unit/config/test_catalogs.py new file mode 100644 index 00000000000..1154d872d78 --- /dev/null +++ b/tests/unit/config/test_catalogs.py @@ -0,0 +1,57 @@ +from dbt.config.catalogs import load_catalogs +from dbt.tests.util import write_config_file + +write_integration_1 = { + "name": "write_integration_1", + "external_volume": "write_external_volume", + "table_format": "write_format", + "catalog_type": "write", + "adapter_properties": {"my_custom_property": "foo_1"}, +} + +write_integration_2 = { + "name": "write_integration_2", + "external_volume": "write_external_volume", + "table_format": "write_format", + "catalog_type": "write", + "adapter_properties": {"my_custom_property": "foo_2"}, +} + + +def test_load_catalogs_from_schema_yml(tmp_path): + (tmp_path / "models").mkdir() + write_config_file( + { + "version": 2, + "catalogs": [{"name": "schema_catalog", "write_integrations": [write_integration_1]}], + }, + tmp_path, + "models", + "schema.yml", + ) + + catalogs = load_catalogs(str(tmp_path), "test", ["models"], {}) + + assert [catalog.name for catalog in catalogs] == ["schema_catalog"] + + +def test_load_catalogs_combines_root_and_schema_yml_files(tmp_path): + (tmp_path / "models").mkdir() + write_config_file( + {"catalogs": [{"name": "root_catalog", "write_integrations": [write_integration_1]}]}, + tmp_path, + "catalogs.yml", + ) + write_config_file( + { + "version": 2, + "catalogs": [{"name": "schema_catalog", "write_integrations": [write_integration_2]}], + }, + tmp_path, + "models", + "schema.yml", + ) + + catalogs = load_catalogs(str(tmp_path), "test", ["models"], {}) + + assert [catalog.name for catalog in catalogs] == ["root_catalog", "schema_catalog"]