-
Notifications
You must be signed in to change notification settings - Fork 64
Add summary metrics #553
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add summary metrics #553
Conversation
❌ 388/404 passed, 5 flaky, 16 failed, 18 skipped, 4h47m15s total ❌ test_e2e_workflow_with_custom_install_folder: [gw5] linux -- Python 3.12.11 /home/runner/work/dqx/dqx/.venv/bin/python (726ms)
❌ test_quality_checker_workflow_with_metrics: pyspark.errors.exceptions.connect.AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `main`.`dummy_scowo`.`metrics_gttua7` cannot be found. Verify the spelling and correctness of the schema and catalog. (2m40.658s)
❌ test_custom_metrics_in_workflow: pyspark.errors.exceptions.connect.AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `main`.`dummy_sccc2`.`metrics_pm3txo` cannot be found. Verify the spelling and correctness of the schema and catalog. (3m1.997s)
❌ test_quality_checker_workflow_with_quarantine_and_metrics: databricks.sdk.errors.platform.Unknown: apply_checks: Run failed with error message (1m5.863s)
❌ test_profiler_workflow_with_custom_install_folder: [gw9] linux -- Python 3.12.11 /home/runner/work/dqx/dqx/.venv/bin/python (812ms)
❌ test_e2e_workflow_for_multiple_run_configs: databricks.sdk.errors.platform.Unknown: prepare: Run failed with error message (1m33.08s)
❌ test_quality_checker_workflow_for_multiple_run_configs: databricks.labs.dqx.errors.InvalidConfigError: Run config flag is required (2m47.981s)
❌ test_e2e_workflow_with_metrics: pyspark.errors.exceptions.connect.AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `main`.`dummy_selzg`.`metrics_twfapr` cannot be found. Verify the spelling and correctness of the schema and catalog. (8m28.732s)
❌ test_quality_checker_workflow_for_multiple_run_configs_table_checks_storage: databricks.labs.dqx.errors.InvalidConfigError: Run config flag is required (3m11.575s)
❌ test_observer_metrics_output: chispa.dataframe_comparer.DataFramesNotEqualError: (11.486s)
❌ test_observer_metrics_output_with_quarantine: chispa.dataframe_comparer.DataFramesNotEqualError: (13.597s)
❌ test_streaming_observer_metrics_output: pyspark.errors.exceptions.connect.AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `main`.`dummy_seqiy`.`metrics_p3llvn` cannot be found. Verify the spelling and correctness of the schema and catalog. (12.845s)
❌ test_save_results_in_table_batch_with_metrics: chispa.dataframe_comparer.DataFramesNotEqualError: (10.446s)
❌ test_save_results_in_table_streaming_with_metrics: pyspark.errors.exceptions.connect.AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `main`.`dummy_sj69h`.`metrics_bqkexy` cannot be found. Verify the spelling and correctness of the schema and catalog. (11.513s)
❌ test_quality_checker_workflow_with_custom_install_folder: [gw6] linux -- Python 3.12.11 /home/runner/work/dqx/dqx/.venv/bin/python (630ms)
❌ test_list_tables: databricks.sdk.errors.platform.NotFound: Catalog 'ucx_9tmnasv5gbmchcct' does not exist. (18m39.399s)
Flaky tests:
Running from acceptance #2724 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces summary metrics as outputs of quality checking methods, using Spark's Observation feature to track data quality metrics. The DQObserver
class manages Spark observations and tracks both default metrics (input/error/warning/valid counts) and custom user-defined SQL expressions.
- Adds
DQObserver
class for managing Spark observations and tracking summary metrics - Updates
DQEngine
methods to return tuples with both DataFrames and observations - Integrates metrics collection with existing workflows and configuration system
Reviewed Changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
src/databricks/labs/dqx/observer.py | New DQObserver class for managing Spark observations and tracking metrics |
src/databricks/labs/dqx/engine.py | Updated engine methods to support metrics collection and storage |
src/databricks/labs/dqx/config.py | Added metrics configuration fields to RunConfig and WorkspaceConfig |
tests/unit/test_observer.py | Unit tests for the DQObserver class functionality |
tests/integration/test_summary_metrics.py | Integration tests for end-to-end metrics collection |
tests/integration/test_metrics_workflow.py | Tests for workflow-based metrics collection |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
src/databricks/labs/dqx/engine.py
Outdated
save_dataframe_as_table(metrics_df, metrics_config) | ||
save_dataframe_as_table(metrics_df, metrics_config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree
assert run_config.metrics_config is None | ||
|
||
ctx.deployed_workflows.run_workflow("quality-checker", run_config.name) | ||
assert not ws.tables.exists(run_config.metrics_config.location).table_exists |
Copilot
AI
Aug 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assertion will fail because run_config.metrics_config
is None
when metrics are disabled, causing a NoneType attribute access error. The assertion should check that run_config.metrics_config
is None
instead.
assert not ws.tables.exists(run_config.metrics_config.location).table_exists | |
# Cannot check for metrics table existence as metrics_config is None. |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going in the right direction
- `sample_seed`: seed for reproducible sampling. | ||
- `limit`: maximum number of records to analyze. | ||
- `extra_params`: (optional) extra parameters to pass to the jobs such as result column names and user_metadata | ||
- `custom_metrics`: (optional) list of Spark SQL expressions for capturing custom summary metrics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be worth to add that a set of default metrics are always used regardless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think this is clear?
By default, the number of input, warning, and error rows will be tracked. When custom metrics are defined, they will be tracked in addition to the default metrics.
src/databricks/labs/dqx/observer.py
Outdated
A list of Spark SQL expressions as strings | ||
""" | ||
result_columns = self.result_columns or {} | ||
errors_column = result_columns.get(ColumnArguments.ERRORS.value, DefaultColumnNames.ERRORS.value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be better to provide this directly to avoid repeating the engine implementation. I would set this in the engine. Otherwise, user have to provide extra params twice and it would be error prone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a way to update the column names in SQL expressions used for default metrics. I added a _set_column_names
method in the DQMetricsObserver
that is called whenever DQEngine
is initialized with an observer.
src/databricks/labs/dqx/observer.py
Outdated
""" | ||
Spark `Observation` which can be attached to a `DataFrame` to track summary metrics. Metrics will be collected | ||
when the 1st action is triggered on the attached `DataFrame`. Subsequent operations on the attached `DataFrame` | ||
will not update the observed metrics. See: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Observation.html |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this a proper link so that we render this nicely in api docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
assert run_config.metrics_config is None | ||
|
||
ctx.deployed_workflows.run_workflow("quality-checker", run_config.name) | ||
assert not ws.tables.exists(run_config.metrics_config.location).table_exists |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true
|
||
def test_engine_with_observer_before_action(ws, spark): | ||
"""Test that summary metrics are empty before running a Spark action.""" | ||
custom_metrics = ["avg(age) as avg_age", "sum(salary) as total_salary"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need custom metrics here? it seems we can remove it for this test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
updated docs
…trics # Conflicts: # docs/dqx/docs/reference/benchmarks.mdx # src/databricks/labs/dqx/engine.py # tests/perf/.benchmarks/baseline.json
Changes
This PR introduces summary metrics as outputs of quality checking methods. Summary metrics computation relies on Spark's Observation feature.
Basic usage
The
DQObserver
can be added toDQEngine
to manage Spark observations and track summary metrics on datasets checked with DQX:Methods of
DQEngine
have been updated to optionally return the Spark observation associated with a given run:Writing summary metrics with checked data
When
DQEngine
methods write results to an output sink, metrics can also be written:Integration with installed workflows
I have also updated the quality checking and e2e workflows to allow users to specify an output table where metrics are stored.
TODO
Linked issues
Resolves #376
Tests