Skip to content

Commit d54f2c5

Browse files
ravenac95ryscheng
andauthored
feat: rudimentary scheduler (#5621)
* wip: working scheduler * feat: add udm models asset * clean up * fix: move things to resource dependency injection * fix: update to latest dagster-sqlmesh * Update warehouse/oso_dagster/resources/udm_state.py Co-authored-by: Raymond Cheng <[email protected]> * Update warehouse/oso_dagster/resources/udm_engine_adapter.py Co-authored-by: Raymond Cheng <[email protected]> * remove prototyping code * fix: make engine adapter loading lazy * fix: updated lint rules for new sqlmesh version * fix: various build fixes for updated dagster-sqlmesh * fix: dagster run issues * Update warehouse/oso_dagster/resources/udm_engine_adapter.py --------- Co-authored-by: Raymond Cheng <[email protected]>
1 parent efec824 commit d54f2c5

File tree

25 files changed

+2415
-1684
lines changed

25 files changed

+2415
-1684
lines changed

pyproject.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ dependencies = [
6767
"kr8s==0.20.9",
6868
"structlog>=25.4.0",
6969
"pandas-gbq>=0.29.2",
70-
"dagster-sqlmesh>=0.20.0",
70+
"dagster-sqlmesh>=0.22.0",
7171
"oso-core",
7272
"pyoso",
7373
"metrics-service",
@@ -110,6 +110,7 @@ pyoso = { workspace = true }
110110
oso-mcp = { workspace = true }
111111
oso-core = { workspace = true }
112112
metrics-service = { workspace = true }
113+
scheduler = { workspace = true }
113114

114115
[tool.uv.workspace]
115116
members = [
@@ -119,7 +120,8 @@ members = [
119120
"warehouse/pynessie-gc",
120121
"warehouse/metrics-service",
121122
"lib/oso-core",
122-
"warehouse/oso_semantic"
123+
"warehouse/oso_semantic",
124+
"warehouse/scheduler",
123125
]
124126

125127
[build-system]

uv.lock

Lines changed: 1667 additions & 1612 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

warehouse/metrics_tools/models/tools.py

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
logger = logging.getLogger(__name__)
2121

22-
CallableAliasList = t.List[t.Callable[..., t.Any] | t.Tuple[t.Callable[..., t.Any], t.List[str]]]
22+
CallableAliasList = t.List[
23+
t.Callable[..., t.Any] | t.Tuple[t.Callable[..., t.Any], t.List[str]]
24+
]
2325

2426

2527
class MacroOverridingModel(model):
@@ -41,12 +43,12 @@ def model(self, *args, **kwargs):
4143
if len(self._additional_macros) > 0:
4244
macros = MacroRegistry(f"macros_for_{self.name}")
4345
system_macros = kwargs.get("macros", macros)
44-
macros.update(system_macros)
46+
macros.update(system_macros)
4547
macros.update(macro.get_registry())
4648

4749
additional_macros = create_macro_registry_from_list(self._additional_macros)
4850
macros.update(additional_macros)
49-
if not macros.get('datetrunc'):
51+
if not macros.get("datetrunc"):
5052
raise ValueError("FUCL")
5153
kwargs["macros"] = macros
5254

@@ -68,12 +70,14 @@ def escape_triple_quotes(input_string: str) -> str:
6870

6971

7072
def create_unregistered_macro_registry(
71-
macros: t.List[t.Callable[..., t.Any] | t.Tuple[t.Callable[..., t.Any], t.List[str]]]
73+
macros: t.List[
74+
t.Callable[..., t.Any] | t.Tuple[t.Callable[..., t.Any], t.List[str]]
75+
],
7276
):
7377
registry = MacroRegistry(f"macro_registry_{uuid.uuid4().hex}")
7478
for additional_macro in macros:
7579
if isinstance(additional_macro, tuple):
76-
registry.update(create_unregistered_wrapped_macro(*additional_macro)) # type: ignore
80+
registry.update(create_unregistered_wrapped_macro(*additional_macro)) # type: ignore
7781
else:
7882
registry.update(create_unregistered_wrapped_macro(additional_macro))
7983
return registry
@@ -218,11 +222,31 @@ def create_basic_python_env(
218222
return serialized
219223

220224

225+
def _dict_sort(obj: t.Any) -> str:
226+
"""Copied from sqlmesh for now"""
227+
try:
228+
if isinstance(obj, dict):
229+
obj = dict(sorted(obj.items(), key=lambda x: str(x[0])))
230+
except Exception:
231+
logger.warning("Failed to sort non-recursive dict", exc_info=True)
232+
return repr(obj)
233+
234+
221235
class PrettyExecutable(Executable):
222236
@classmethod
223-
def value(cls, v: t.Any, is_metadata: t.Optional[bool] = None) -> Executable:
224-
pretty_v = json.dumps(v, indent=1)
225-
return cls(payload=pretty_v, kind=ExecutableKind.VALUE)
237+
def value(
238+
cls,
239+
v: t.Any,
240+
is_metadata: t.Optional[bool] = None,
241+
sort_root_dict: bool = False,
242+
) -> Executable:
243+
payload = _dict_sort(v) if sort_root_dict else repr(v)
244+
pretty_v = json.dumps(payload, indent=1)
245+
return Executable(
246+
payload=pretty_v,
247+
kind=ExecutableKind.VALUE,
248+
is_metadata=is_metadata or None,
249+
)
226250

227251

228252
def create_import_call_env(
@@ -291,7 +315,7 @@ def create_macro_registry_from_list(macro_list: CallableAliasList):
291315
registry = MacroRegistry("macros")
292316
for additional_macro in macro_list:
293317
if isinstance(additional_macro, tuple):
294-
registry.update(create_unregistered_wrapped_macro(*additional_macro)) # type: ignore
318+
registry.update(create_unregistered_wrapped_macro(*additional_macro)) # type: ignore
295319
else:
296320
registry.update(create_unregistered_wrapped_macro(additional_macro))
297321
return registry
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import functools
2+
3+
import dagster as dg
4+
from oso_dagster.factories.common import (
5+
AssetFactoryResponse,
6+
early_resources_asset_factory,
7+
)
8+
from oso_dagster.resources.udm_engine_adapter import (
9+
UserDefinedModelEngineAdapterResource,
10+
)
11+
from oso_dagster.resources.udm_state import UserDefinedModelStateResource
12+
from scheduler.evaluator import UserDefinedModelEvaluator
13+
14+
15+
@early_resources_asset_factory()
16+
def udm_models() -> AssetFactoryResponse:
17+
@dg.asset
18+
async def udm_models(
19+
context: dg.AssetExecutionContext,
20+
udm_engine_adapter: dg.ResourceParam[UserDefinedModelEngineAdapterResource],
21+
udm_state: dg.ResourceParam[UserDefinedModelStateResource],
22+
) -> None:
23+
async with udm_state.get_client() as udm_client:
24+
evaluator = UserDefinedModelEvaluator.prepare(udm_client)
25+
await evaluator.evaluate(
26+
functools.partial(
27+
udm_engine_adapter.get_adapter, log_override=context.log
28+
)
29+
)
30+
31+
job = dg.define_asset_job(
32+
name="udm_models_job", selection=dg.AssetSelection.assets(udm_models)
33+
)
34+
35+
return AssetFactoryResponse(
36+
assets=[udm_models],
37+
jobs=[job],
38+
# Run this job every 2 hours to evaluate UDM models
39+
schedules=[dg.ScheduleDefinition(job=job, cron_schedule="0 */2 * * *")],
40+
)

warehouse/oso_dagster/assets/sqlmesh/sqlmesh.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313
IntermediateAssetDep,
1414
IntermediateAssetOut,
1515
PlanOptions,
16-
SQLMeshContextConfig,
17-
SQLMeshDagsterTranslator,
1816
SQLMeshMultiAssetOptions,
1917
SQLMeshResource,
2018
sqlmesh_asset_from_multi_asset_options,
@@ -24,6 +22,7 @@
2422
from oso_dagster.config import DagsterConfig
2523
from oso_dagster.factories import AssetFactoryResponse, cacheable_asset_factory
2624
from oso_dagster.factories.common import CacheableDagsterContext
25+
from oso_dagster.resources.sqlmesh import PrefixedSQLMeshContextConfig
2726
from oso_dagster.resources.trino import TrinoResource
2827
from oso_dagster.utils.asynctools import multiple_async_contexts
2928
from pydantic import BaseModel
@@ -133,16 +132,15 @@ def sqlmesh_factory(
133132
def cacheable_sqlmesh_multi_asset_options(
134133
*,
135134
sqlmesh_infra_config: dict,
136-
sqlmesh_context_config: SQLMeshContextConfig,
137-
sqlmesh_translator: SQLMeshDagsterTranslator,
135+
sqlmesh_context_config: ResourceParam[PrefixedSQLMeshContextConfig],
138136
) -> CacheableSQLMeshMultiAssetOptions:
139137
environment = sqlmesh_infra_config["environment"]
140138

141139
return CacheableSQLMeshMultiAssetOptions.from_sqlmesh_multi_asset_options(
142140
sqlmesh_to_multi_asset_options(
143141
config=sqlmesh_context_config,
144142
environment=environment,
145-
dagster_sqlmesh_translator=sqlmesh_translator,
143+
# dagster_sqlmesh_translator=sqlmesh_translator,
146144
)
147145
)
148146

@@ -163,6 +161,7 @@ async def sqlmesh_project(
163161
context: AssetExecutionContext,
164162
global_config: ResourceParam[DagsterConfig],
165163
sqlmesh: SQLMeshResource,
164+
sqlmesh_context_config: ResourceParam[PrefixedSQLMeshContextConfig],
166165
trino: TrinoResource,
167166
config: SQLMeshRunConfig,
168167
):
@@ -210,6 +209,7 @@ def run_sqlmesh(
210209
all(
211210
sqlmesh.run(
212211
context,
212+
config=sqlmesh_context_config,
213213
environment=dev_environment,
214214
plan_options=copy.deepcopy(plan_options),
215215
start=config.start,
@@ -223,6 +223,7 @@ def run_sqlmesh(
223223
context.log.info("Starting to process prod environment")
224224
for result in sqlmesh.run(
225225
context,
226+
config=sqlmesh_context_config,
226227
environment=environment,
227228
plan_options=copy.deepcopy(plan_options),
228229
start=config.start,

warehouse/oso_dagster/assets/sqlmesh/sqlmesh_export.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from oso_dagster.config import DagsterConfig
99
from oso_dagster.factories import AssetFactoryResponse, cacheable_asset_factory
1010
from oso_dagster.factories.common import CacheableDagsterContext
11-
from oso_dagster.resources import PrefixedSQLMeshTranslator, SQLMeshExporter
11+
from oso_dagster.resources import SQLMeshExporter
1212
from oso_dagster.resources.sqlmesh import SQLMeshExportedAssetDefinition
1313
from pydantic import BaseModel
1414
from sqlmesh.core.model import Model
@@ -38,8 +38,7 @@ def sqlmesh_export_factory(
3838
)
3939
def cacheable_exported_assets_defs(
4040
sqlmesh_infra_config: dict,
41-
sqlmesh_context_config: SQLMeshContextConfig,
42-
sqlmesh_translator: PrefixedSQLMeshTranslator,
41+
sqlmesh_context_config: ResourceParam[SQLMeshContextConfig],
4342
sqlmesh_exporters: ResourceParam[t.List[SQLMeshExporter]],
4443
) -> SQLMeshExportedAssetsCollection:
4544
environment = sqlmesh_infra_config["environment"]
@@ -49,6 +48,7 @@ def cacheable_exported_assets_defs(
4948
context_factory=DEFAULT_CONTEXT_FACTORY,
5049
)
5150
assets_map: dict[str, SQLMeshExportedAssetDefinition] = {}
51+
sqlmesh_translator = sqlmesh_context_config.get_translator()
5252

5353
with controller.instance(environment) as mesh:
5454
models = mesh.models()

warehouse/oso_dagster/assets/sqlmesh/sqlmesh_render.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from dagster_sqlmesh import SQLMeshResource
77
from dagster_sqlmesh.controller.base import DEFAULT_CONTEXT_FACTORY
88
from oso_dagster.factories import dlt_factory
9+
from oso_dagster.resources.sqlmesh import PrefixedSQLMeshContextConfig
910
from sqlglot import exp
1011

1112

@@ -17,6 +18,7 @@
1718
def get_rendered_models(
1819
context: AssetExecutionContext,
1920
sqlmesh: SQLMeshResource,
21+
sqlmesh_context_config: PrefixedSQLMeshContextConfig,
2022
environment: str,
2123
):
2224
"""
@@ -31,7 +33,9 @@ def get_rendered_models(
3133
Dict: Information about each rendered model
3234
"""
3335
controller = sqlmesh.get_controller(
34-
context_factory=DEFAULT_CONTEXT_FACTORY, log_override=context.log
36+
config=sqlmesh_context_config,
37+
context_factory=DEFAULT_CONTEXT_FACTORY,
38+
log_override=context.log,
3539
)
3640

3741
with controller.instance(environment, "model_renderer") as mesh:
@@ -71,6 +75,7 @@ def get_rendered_models(
7175
def sqlmesh_render_models(
7276
context: AssetExecutionContext,
7377
sqlmesh: SQLMeshResource,
78+
sqlmesh_context_config: PrefixedSQLMeshContextConfig,
7479
sqlmesh_infra_config: dict,
7580
):
7681
"""
@@ -90,4 +95,5 @@ def sqlmesh_render_models(
9095
context=context,
9196
sqlmesh=sqlmesh,
9297
environment=environment,
98+
sqlmesh_context_config=sqlmesh_context_config,
9399
)

0 commit comments

Comments
 (0)