Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/dbt/cli/requires.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,9 @@ def wrapper(*args, **kwargs):
flags = ctx.obj["flags"]
ctx_project = ctx.obj["project"]

_catalogs = load_catalogs(flags.PROJECT_DIR, ctx_project.project_name, flags.VARS)
_catalogs = load_catalogs(
flags.PROJECT_DIR, ctx_project.project_name, ctx_project.all_source_paths, flags.VARS
)
ctx.obj["catalogs"] = _catalogs

return func(*args, **kwargs)
Expand Down
68 changes: 62 additions & 6 deletions core/dbt/config/catalogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
from dbt_common.exceptions import CompilationError, DbtValidationError


def load_catalogs_yml(project_dir: str, project_name: str) -> Dict[str, Any]:
path = os.path.join(project_dir, CATALOGS_FILE_NAME)

def load_catalogs_yml(path: str, project_name: str, relative_path: str) -> Dict[str, Any]:
if os.path.isfile(path):
try:
contents = load_file_contents(path, strip=False)
Expand All @@ -24,11 +22,57 @@ def load_catalogs_yml(project_dir: str, project_name: str) -> Dict[str, Any]:

return yaml_content
except DbtValidationError as e:
raise YamlLoadError(project_name=project_name, path=CATALOGS_FILE_NAME, exc=e)
raise YamlLoadError(project_name=project_name, path=relative_path, exc=e)

return {}


def load_catalogs_from_schema_yml(
path: str, project_name: str, relative_path: str
) -> List[Dict[str, Any]]:
if not os.path.isfile(path):
return []

try:
contents = load_file_contents(path, strip=False)
yaml_content = load_yaml_text(contents)
except DbtValidationError as e:
raise YamlLoadError(project_name=project_name, path=relative_path, exc=e)

if not yaml_content:
return []

if not isinstance(yaml_content, dict):
raise YamlLoadError(
project_name=project_name,
path=relative_path,
exc=DbtValidationError(
f"Contents of file '{relative_path}' are not valid. Dictionary expected."
),
)

return yaml_content.get("catalogs", [])


def _schema_yml_paths(project_dir: str, source_paths: List[str]) -> List[str]:
schema_paths = set()

for source_path in source_paths:
absolute_source_path = os.path.join(project_dir, source_path)
if not os.path.isdir(absolute_source_path):
continue

for root, _, files in os.walk(absolute_source_path):
for file_name in files:
if not file_name.endswith((".yml", ".yaml")):
continue

absolute_path = os.path.join(root, file_name)
schema_paths.add(os.path.relpath(absolute_path, project_dir))

return sorted(schema_paths)


def load_single_catalog(raw_catalog: Dict[str, Any], renderer: SecretRenderer) -> Catalog:
try:
rendered_catalog = renderer.render_data(raw_catalog)
Expand Down Expand Up @@ -76,8 +120,20 @@ def load_single_catalog(raw_catalog: Dict[str, Any], renderer: SecretRenderer) -
)


def load_catalogs(project_dir: str, project_name: str, cli_vars: Dict[str, Any]) -> List[Catalog]:
raw_catalogs = load_catalogs_yml(project_dir, project_name).get("catalogs", [])
def load_catalogs(
project_dir: str, project_name: str, source_paths: List[str], cli_vars: Dict[str, Any]
) -> List[Catalog]:
raw_catalogs = load_catalogs_yml(
os.path.join(project_dir, CATALOGS_FILE_NAME), project_name, CATALOGS_FILE_NAME
).get("catalogs", [])

for relative_path in _schema_yml_paths(project_dir, source_paths):
raw_catalogs.extend(
load_catalogs_from_schema_yml(
os.path.join(project_dir, relative_path), project_name, relative_path
)
)

catalogs_renderer = SecretRenderer(cli_vars)

return [load_single_catalog(raw_catalog, catalogs_renderer) for raw_catalog in raw_catalogs]
Expand Down
9 changes: 9 additions & 0 deletions core/dbt/jsonschemas/resources/latest.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@
"$ref": "#/definitions/AnyValue"
}
},
"catalogs": {
"type": [
"array",
"null"
],
"items": {
"$ref": "#/definitions/AnyValue"
}
},
"data_tests": {
"type": [
"array",
Expand Down
31 changes: 31 additions & 0 deletions tests/functional/catalogs/test_catalogs_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,34 @@ def test_integration(self, project, catalogs, adapter):
error_msg = "Catalog 'write_catalog' cannot have multiple 'write_integrations' with the same name: 'write_integration_1'."
with pytest.raises(DbtValidationError, match=error_msg):
run_dbt(["run"])


class TestCatalogsInSchemaYml:
@pytest.fixture
def schema_yml(self):
return {
"version": 2,
"catalogs": [
{"name": "write_catalog_1", "write_integrations": [write_integration_1]},
{"name": "write_catalog_2", "write_integrations": [write_integration_2]},
],
}

def test_integration(self, project, schema_yml, adapter):
write_config_file(schema_yml, project.project_root, "models", "schema.yml")

with mock.patch.object(
type(project.adapter), "CATALOG_INTEGRATIONS", [WriteCatalogIntegration]
):
run_dbt(["run"])

for i in range(1, 3):
write_integration = project.adapter.get_catalog_integration(f"write_catalog_{i}")
assert isinstance(write_integration, WriteCatalogIntegration)
assert write_integration.name == f"write_catalog_{i}"
assert write_integration.catalog_type == "write"
assert write_integration.catalog_name == f"write_integration_{i}"
assert write_integration.table_format == "write_format"
assert write_integration.external_volume == "write_external_volume"
assert write_integration.allows_writes is True
assert write_integration.my_custom_property == f"foo_{i}"
57 changes: 57 additions & 0 deletions tests/unit/config/test_catalogs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from dbt.config.catalogs import load_catalogs
from dbt.tests.util import write_config_file

write_integration_1 = {
"name": "write_integration_1",
"external_volume": "write_external_volume",
"table_format": "write_format",
"catalog_type": "write",
"adapter_properties": {"my_custom_property": "foo_1"},
}

write_integration_2 = {
"name": "write_integration_2",
"external_volume": "write_external_volume",
"table_format": "write_format",
"catalog_type": "write",
"adapter_properties": {"my_custom_property": "foo_2"},
}


def test_load_catalogs_from_schema_yml(tmp_path):
(tmp_path / "models").mkdir()
write_config_file(
{
"version": 2,
"catalogs": [{"name": "schema_catalog", "write_integrations": [write_integration_1]}],
},
tmp_path,
"models",
"schema.yml",
)

catalogs = load_catalogs(str(tmp_path), "test", ["models"], {})

assert [catalog.name for catalog in catalogs] == ["schema_catalog"]


def test_load_catalogs_combines_root_and_schema_yml_files(tmp_path):
(tmp_path / "models").mkdir()
write_config_file(
{"catalogs": [{"name": "root_catalog", "write_integrations": [write_integration_1]}]},
tmp_path,
"catalogs.yml",
)
write_config_file(
{
"version": 2,
"catalogs": [{"name": "schema_catalog", "write_integrations": [write_integration_2]}],
},
tmp_path,
"models",
"schema.yml",
)

catalogs = load_catalogs(str(tmp_path), "test", ["models"], {})

assert [catalog.name for catalog in catalogs] == ["root_catalog", "schema_catalog"]
Loading