diff --git a/docs/dqx/docs/guide/index.mdx b/docs/dqx/docs/guide/index.mdx
index 0d736108a..3d551e53d 100644
--- a/docs/dqx/docs/guide/index.mdx
+++ b/docs/dqx/docs/guide/index.mdx
@@ -45,4 +45,15 @@ Quality rules can be defined in the following ways:
Additionally, quality rule candidates can be auto-generated using the DQX profiler.
-For more details, see the [Quality Checks Definition Guide](/docs/guide/quality_checks_definition).
\ No newline at end of file
+For more details, see the [Quality Checks Definition Guide](/docs/guide/quality_checks_definition).
+
+## Summary metrics and monitoring
+
+DQX can capture and store data summary metrics about your data quality across multiple tables and runs. Metrics are computed lazily and accessible after checked datasets are counted, displayed, or written to a table or files. Users can:
+
+- Capture quality metrics for each checked dataset
+- Track both default (e.g. input/error/warning/valid counts) and custom quality metrics
+- Store quality metrics in Delta tables for historical analysis and alerting
+- Centralize quality metrics across datasets, jobs, or job runs in a unified data quality history table
+
+For more details, see the [Summary Metrics Guide](/docs/guide/summary_metrics).
\ No newline at end of file
diff --git a/docs/dqx/docs/guide/quality_checks_apply.mdx b/docs/dqx/docs/guide/quality_checks_apply.mdx
index 198a39f56..b20617c3c 100644
--- a/docs/dqx/docs/guide/quality_checks_apply.mdx
+++ b/docs/dqx/docs/guide/quality_checks_apply.mdx
@@ -717,6 +717,7 @@ The following fields from the [configuration file](/docs/installation/#configura
- `output_config`: configuration for the output data. 'location' is autogenerated when the workflow is executed for patterns.
- `quarantine_config`: (optional) configuration for the quarantine data. 'location' is autogenerated when the workflow is executed for patterns.
- `checks_location`: location of the quality checks in storage. Autogenerated when the workflow is executed for patterns.
+- `metrics_config`: (optional) configuration for storing summary metrics.
- `serverless_clusters`: whether to use serverless clusters for running the workflow (default: `true`). Using serverless clusters is recommended as it allows for automated cluster management and scaling.
- `e2e_spark_conf`: (optional) spark configuration to use for the e2e workflow, only applicable if `serverless_clusters` is set to `false`.
- `e2e_override_clusters`: (optional) cluster configuration to use for the e2e workflow, only applicable if `serverless_clusters` is set to `false`.
@@ -733,6 +734,7 @@ The following fields from the [configuration file](/docs/installation/#configura
- `limit`: maximum number of records to analyze.
- `filter`: filter for the input data as a string SQL expression (default: None).
- `extra_params`: (optional) extra parameters to pass to the jobs such as result column names and user_metadata
+- `custom_metrics`: (optional) list of Spark SQL expressions for capturing custom summary metrics. By default, the number of input, warning, and error rows will be tracked. When custom metrics are defined, they will be tracked in addition to the default metrics.
- `custom_check_functions`: (optional) custom check functions defined in Python files that can be used in the quality checks.
- `reference_tables`: (optional) reference tables that can be used in the quality checks.
@@ -762,6 +764,10 @@ Example of the configuration file (relevant fields only):
#checkpointLocation: /Volumes/catalog/schema/volume/checkpoint # only applicable if input_config.is_streaming is enabled
#trigger: # streaming trigger, only applicable if input_config.is_streaming is enabled
# availableNow: true
+ metrics_config: # optional - summary metrics storage
+ format: delta
+ location: main.nytaxi.dq_metrics
+ mode: append
profiler_config:
limit: 1000
sample_fraction: 0.3
@@ -775,8 +781,67 @@ Example of the configuration file (relevant fields only):
input_config:
format: delta
location: main.nytaxi.ref
+ # Global custom metrics for summary statistics (optional)
+ custom_metrics:
+ - "sum(array_size(_warnings)) as total_warnings"
+ - "sum(array_size(_errors)) as total_errors"
```
+## Summary Metrics
+
+DQX can automatically capture and store summary metrics about your data quality checking processes. When enabled, the system collects both default metrics (input count, error count, warning count, valid count) and any custom metrics you define. Metrics can be configured programmatically or via a configuration file when installing DQX as a tool in the workspace.
+
+### Enabling summary metrics programmatically
+
+To enable summary metrics programmatically, create and pass a `DQMetricsObserver` when initializing the `DQEngine`:
+
+
+
+ ```python
+ from databricks.labs.dqx.engine import DQEngine
+ from databricks.labs.dqx.metrics_observer import DQMetricsObserver
+
+ # set up a DQMetricsObserver with default name and metrics
+ dq_observer = DQMetricsObserver()
+ dq_engine = DQEngine(observer=dq_observer)
+
+ # Option 1: apply quality checks, provide a single result DataFrame, and return a metrics observation
+ valid_and_invalid_df, metrics_observation = dq_engine.apply_checks(input_df, checks)
+ print(metrics_observation.get)
+
+ # Option 2: apply quality checks on the DataFrame, provide valid and invalid (quarantined) DataFrames, and return a metrics observation
+ valid_df, invalid_df, metrics_observation = dq_engine.apply_checks_and_split(input_df, checks)
+ print(metrics_observation.get)
+
+ # Option 3 End-to-End approach: apply quality checks to a table, save results to valid and invalid (quarantined) tables, and save metrics to a metrics table
+ dq_engine.apply_checks_and_save_in_table(
+ checks=checks,
+ input_config=InputConfig(location="catalog.schema.input"),
+ output_config=OutputConfig(location="catalog.schema.valid"),
+ quarantine_config=OutputConfig(location="catalog.schema.quarantine"),
+ metrics_config=OutputConfig(location="catalog.schema.metrics"),
+ )
+
+ # Option 4 End-to-End approach: apply quality checks to a table, save results to an output table, and save metrics to a metrics table
+ dq_engine.apply_checks_and_save_in_table(
+ checks=checks,
+ input_config=InputConfig(location="catalog.schema.input"),
+ output_config=OutputConfig(location="catalog.schema.output"),
+ metrics_config=OutputConfig(location="catalog.schema.metrics"),
+ )
+ ```
+
+
+
+### Enabling summary metrics in DQX workflows
+
+Summary metrics can also be enabled in DQX workflows. Metrics are configured:
+
+1. **During installation for default run config**: When prompted, choose to store summary metrics and configure the default metrics table location
+2. **Configuration file**: Add `custom_metrics` and `metrics_config` to the [configuration file](/docs/installation/#configuration-file)
+
+For detailed information about summary metrics, including examples and best practices, see the [Summary Metrics Guide](/docs/guide/summary_metrics).
+
## Quality checking results
Quality check results are added as additional columns to the output or quarantine (if defined) DataFrame or tables (if saved). These columns capture the outcomes of the checks performed on the input data.
diff --git a/docs/dqx/docs/guide/summary_metrics.mdx b/docs/dqx/docs/guide/summary_metrics.mdx
new file mode 100644
index 000000000..b84a3d669
--- /dev/null
+++ b/docs/dqx/docs/guide/summary_metrics.mdx
@@ -0,0 +1,434 @@
+---
+sidebar_position: 8
+---
+
+import Admonition from '@theme/Admonition';
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+# Summary Metrics
+
+DQX provides comprehensive functionality to capture and store aggregate statistics about your data quality. This allows you to track data quality trends over time, monitor the health of your data pipelines, and gain insights into the overall quality of your datasets.
+
+## Overview
+
+Summary metrics in DQX capture both **built-in metrics** (automatically calculated) and **custom metrics** (user-defined SQL expressions) during data quality checking. These metrics are collected using Spark's built-in [Observation](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Observation.html) functionality and can be persisted to tables for historical analysis.
+
+### Built-in Metrics
+
+DQX automatically captures the following built-in metrics for every data quality check execution:
+
+| Metric Name | Data Type | Description |
+|--------------------------|-------------------------------------------------|----------------------------------------------------|
+| `input_row_count` | `int` | Total number of input rows processed |
+| `error_row_count` | `int` | Number of rows that failed error-level checks |
+| `warning_row_count` | `int` | Number of rows that triggered warning-level checks |
+| `valid_row_count` | `int` | Number of rows that passed all checks |
+
+### Custom Metrics
+
+Users can define custom metrics with Spark SQL expressions. These metrics will be collected in addition to DQX's built-in metrics.
+
+
+Summary metrics are calculated on all records processed by DQX. Complex aggregations can degrade performance when processing large datasets. Be cautious with operations like `DISTINCT` on high-cardinality columns.
+
+
+Example of custom data quality summary metrics:
+```sql
+sum(array_size(_errors)) as total_errors
+avg(array_size(_errors)) as errors_avg
+count(case when array_size(_errors) > 1) as count_multiple_errors
+```
+
+## Usage Examples
+
+### Accessing Metrics when Applying Checks
+
+Engine methods (e.g. `apply_checks`, `apply_checks_by_metadata`) return a Spark Observation with 1 or more output DataFrames. Data quality metrics can be accessed from the Spark Observation after any action is performed on the output DataFrames.
+
+
+Metrics are not directly accessible from the returned Spark Observation when data is processed with streaming. Use DQX's built-in methods to persist streaming metrics to an output table. See [Writing Metrics to a Table with Streaming](#writing-metrics-to-a-table-with-streaming) for more details.
+
+
+
+
+ ```python
+ from databricks.labs.dqx.engine import DQEngine
+ from databricks.labs.dqx.metrics_observer import DQMetricsObserver
+
+ # Create observer
+ observer = DQMetricsObserver(name="dq_metrics")
+
+ # Create the engine with the optional observer
+ engine = DQEngine(WorkspaceClient(), observer=observer)
+
+ # Apply checks and get metrics
+ checked_df, observation = engine.apply_checks_by_metadata(df, checks)
+
+ # Trigger an action to populate metrics (e.g. count, save to a table)
+ # Metrics are only populated if an action is triggered on the DataFrame such as count or saving to a table.
+ row_count = checked_df.count()
+
+ # Access metrics
+ metrics = observation.get
+ print(f"Input row count: {metrics['input_row_count']}")
+ print(f"Error row count: {metrics['error_row_count']}")
+ print(f"Warning row count: {metrics['warning_row_count']}")
+ print(f"Valid row count: {metrics['valid_row_count']}")
+ ```
+
+
+
+### Writing Metrics to a Table
+
+Engine methods (e.g. `apply_checks_and_save_in_table`, `apply_checks_by_metadata_and_save_in_table`, and `save_results_in_table`) can write summary metrics into a table. Metrics can be written to a table in batch or streaming. You can write metrics for different datasets or workloads into a common metrics table to track data quality over time.
+
+#### Writing Metrics to a Table in Batch
+
+Summary metrics can be written to a table when calling `DQEngine` methods to apply checks and write output data. When the input data is read as a batch source, metrics will be collected and written in batch.
+
+
+
+ ```python
+ from databricks.labs.dqx import check_funcs
+ from databricks.labs.dqx.engine import DQEngine
+ from databricks.labs.dqx.metrics_observer import DQMetricsObserver
+ from databricks.labs.dqx.rule import DQRowRule, DQDatasetRule
+ from databricks.labs.dqx.config import InputConfig, OutputConfig
+ from databricks.sdk import WorkspaceClient
+
+ # Define the checks
+ checks = [
+ DQRowRule(
+ criticality="warn",
+ check_func=check_funcs.is_not_null,
+ column="col3",
+ ),
+ DQDatasetRule(
+ criticality="error",
+ check_func=check_funcs.is_unique,
+ columns=["col1", "col2"],
+ ),
+ DQRowRule(
+ name="email_invalid_format",
+ criticality="error",
+ check_func=check_funcs.regex_match,
+ column="email",
+ check_func_kwargs={"regex": "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"},
+ ),
+ ]
+
+ # Create the observer
+ observer = DQMetricsObserver(name="dq_metrics")
+
+ # Create the engine with the metrics observer
+ engine = DQEngine(WorkspaceClient(), observer=observer)
+
+ # Create the input config for a batch data source
+ input_config = InputConfig("main.demo.input_data")
+
+ # Create the output, quarantine, and metrics configs
+ output_config = OutputConfig("main.demo.valid_data")
+ quarantine_config = OutputConfig("main.demo.quarantine_data")
+ metrics_config = OutputConfig("main.demo.metrics_data")
+
+ # Read the data, apply the checks, write data to valid and quarantine tables, and write metrics to the metrics table
+ engine.apply_checks_and_save_in_table(
+ checks,
+ input_config,
+ output_config,
+ quarantine_config,
+ metrics_config
+ )
+ ```
+
+
+
+#### Writing Metrics to a Table with Streaming
+
+Summary metrics can also be written in streaming. When the input data is read as a streaming source, metrics will be written for each streaming micro-batch:
+
+
+
+ ```python
+ from databricks.labs.dqx import check_funcs
+ from databricks.labs.dqx.engine import DQEngine
+ from databricks.labs.dqx.metrics_observer import DQMetricsObserver
+ from databricks.labs.dqx.rule import DQRowRule, DQDatasetRule
+ from databricks.labs.dqx.config import InputConfig, OutputConfig
+ from databricks.sdk import WorkspaceClient
+
+ # Define the checks
+ checks = [
+ DQRowRule(
+ criticality="warn",
+ check_func=check_funcs.is_not_null,
+ column="col3",
+ ),
+ DQDatasetRule(
+ criticality="error",
+ check_func=check_funcs.is_unique,
+ columns=["col1", "col2"],
+ ),
+ DQRowRule(
+ name="email_invalid_format",
+ criticality="error",
+ check_func=check_funcs.regex_match,
+ column="email",
+ check_func_kwargs={"regex": "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"},
+ ),
+ ]
+
+ # Create the observer
+ observer = DQMetricsObserver(name="dq_metrics")
+
+ # Create the engine with the metrics observer
+ engine = DQEngine(WorkspaceClient(), observer=observer)
+
+ # Create the input config for a streaming data source
+ input_config = InputConfig("main.demo.input_data", is_streaming=True)
+
+ # Create the output, quarantine, and metrics configs
+ output_config = OutputConfig("main.demo.valid_data")
+ quarantine_config = OutputConfig("main.demo.quarantine_data")
+ metrics_config = OutputConfig("main.demo.metrics_data")
+
+ # Read the data, apply the checks, write data to valid and quarantine tables, and write metrics to the metrics table
+ # Output and quarantine data will be written in streaming and summary metrics will be written for each micro-batch
+ engine.apply_checks_and_save_in_table(
+ checks,
+ input_config,
+ output_config,
+ quarantine_config,
+ metrics_config
+ )
+ ```
+
+
+
+#### Saving Metrics to a Table
+
+Summary metrics can be written to a table when calling `save_results_in_table`. After applying checks, pass the Spark Observation and output DataFrame(s) with the appropriate output configuration.
+
+
+
+ ```python
+ from databricks.labs.dqx import check_funcs
+ from databricks.labs.dqx.engine import DQEngine
+ from databricks.labs.dqx.metrics_observer import DQMetricsObserver
+ from databricks.labs.dqx.rule import DQRowRule, DQDatasetRule
+ from databricks.labs.dqx.config import OutputConfig
+ from databricks.sdk import WorkspaceClient
+
+ # Define the checks
+ checks = [
+ DQRowRule(
+ criticality="warn",
+ check_func=check_funcs.is_not_null,
+ column="col3",
+ ),
+ DQDatasetRule(
+ criticality="error",
+ check_func=check_funcs.is_unique,
+ columns=["col1", "col2"],
+ ),
+ DQRowRule(
+ name="email_invalid_format",
+ criticality="error",
+ check_func=check_funcs.regex_match,
+ column="email",
+ check_func_kwargs={"regex": "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"},
+ ),
+ ]
+
+ # Create the observer
+ observer = DQMetricsObserver(name="dq_metrics")
+
+ # Create the engine with the metrics observer
+ engine = DQEngine(WorkspaceClient(), observer=observer)
+
+ # Apply checks and get metrics
+ valid_df, quarantine_df, observation = engine.apply_checks_by_metadata_and_split(df, checks)
+
+ # Create the output, quarantine, and metrics configs
+ output_config = OutputConfig("main.demo.valid_data")
+ quarantine_config = OutputConfig("main.demo.quarantine_data")
+ metrics_config = OutputConfig("main.demo.metrics_data")
+
+ # Write the data to valid and quarantine tables, and write metrics to the metrics table
+ engine.apply_checks_and_save_in_table(
+ valid_df,
+ quarantine_df,
+ observation,
+ output_config,
+ quarantine_config,
+ metrics_config
+ )
+ ```
+
+
+
+### Configuring Custom Metrics
+
+Pass custom metrics as Spark SQL expressions when creating the `DQMetricsObserver`. Custom metrics should be defined as Spark SQL expressions with column aliases and will be accessible by their alias.
+
+
+
+ ```python
+ from databricks.labs.dqx.engine import DQEngine
+ from databricks.labs.dqx.metrics_observer import DQMetricsObserver
+ from databricks.labs.dqx.config import InputConfig, OutputConfig
+
+ # Define custom metrics
+ custom_metrics = [
+ "sum(array_size(_errors)) as total_check_errors",
+ "sum(array_size(_warnings)) as total_check_warnings",
+ ]
+
+ # Create the observer with custom metrics
+ observer = DQMetricsObserver(
+ name="business_metrics",
+ custom_metrics=custom_metrics
+ )
+
+ # Create the engine with the optional observer
+ engine = DQEngine(WorkspaceClient(), observer=observer)
+
+ # Apply checks and get metrics
+ checked_df, observation = engine.apply_checks_by_metadata(df, checks)
+
+ # Trigger an action to populate metrics (e.g. count, save to a table)
+ # Metrics are only populated if an action is triggered on the DataFrame such as count or saving to a table.
+ row_count = checked_df.count()
+
+ # Access metrics
+ metrics = observation.get
+ print(f"Input row count: {metrics['input_row_count']}")
+ print(f"Error row count: {metrics['error_row_count']}")
+ print(f"Warning row count: {metrics['warning_row_count']}")
+ print(f"Valid row count: {metrics['valid_row_count']}")
+ print(f"Total check errors: {metrics['total_check_errors']}")
+ print(f"Total check warnings: {metrics['total_check_warnings']}")
+ ```
+
+
+
+## Workflow Integration
+
+### No-Code Approach (Workflows)
+
+When using DQX workflows, summary metrics are automatically configured based on your installation [configuration file](/docs/installation/#configuration-file):
+
+1. **Installation Configuration**: During installation, specify metrics table and custom metrics
+2. **Automatic Observer Creation**: Workflows automatically create `DQMetricsObserver` when metrics are configured
+3. **Metrics Persistence**: Metrics are automatically saved to the configured table after each workflow run
+
+```bash
+# Run quality checker workflow with metrics enabled
+databricks labs dqx apply-checks --run-config "production"
+
+# Run end-to-end workflow with metrics enabled
+databricks labs dqx e2e --run-config "production"
+```
+
+#### Configuration File Example
+
+Metrics will be defined in the `metrics_config` section of your configuration file.
+
+```yaml
+run_configs:
+- name: production
+ input_config:
+ location: main.raw.sales_data
+ format: delta
+ output_config:
+ location: main.clean.sales_data
+ format: delta
+ mode: append
+ quarantine_config:
+ location: main.quarantine.sales_data
+ format: delta
+ mode: append
+ metrics_config: # Summary metrics configuration
+ location: main.analytics.dq_metrics
+ format: delta
+ mode: append
+ checks_location: main.config.quality_checks
+
+# Global custom metrics (applied to all run configs)
+custom_metrics:
+ - "avg(amount) as average_transaction_amount"
+ - "sum(case when region = 'US' then amount else 0 end) as us_revenue"
+ - "count(distinct customer_id) as unique_customers"
+```
+
+## Metrics Table Schema
+
+Metrics can be written automatically and centralized by specifying a metrics table. The metrics table will contain the following fields:
+
+| Column Name | Column Type | Description |
+|-----------------------|-----------------------|--------------------------------------------------------------|
+| `run_name` | `STRING` | Name of the metrics observer |
+| `input_location` | `STRING` | Location of the input dataset (table name or file path) |
+| `output_location` | `STRING` | Location of the output dataset (table name or file path) |
+| `quarantine_location` | `STRING` | Location of the quarantine dataset (table name or file path) |
+| `checks_location` | `STRING` | Location where checks are stored (table name or file path) |
+| `metric_name` | `STRING` | Name of the metric (e.g., 'input_row_count') |
+| `metric_value` | `STRING` | Value of the metric (stored as string) |
+| `run_ts` | `TIMESTAMP` | Run timestamp when the summary metrics were calculated |
+| `error_column_name` | `STRING` | Name of the error column (default: '_errors') |
+| `warning_column_name` | `STRING` | Name of the warning column (default: '_warnings') |
+| `user_metadata` | `MAP[STRING, STRING]` | User-defined, run-level metadata |
+
+## Best Practices
+
+### Performance Considerations
+
+1. **Batch Metrics Collection**: Collect metrics during regular data processing after data is written
+2. **Monitor Metrics Overhead**: Complex custom metrics may impact processing performance
+
+### Monitoring and Alerting
+
+1. **Track Trends**: Monitor metrics over time to identify data quality degradation
+2. **Set Thresholds**: Establish acceptable ranges for error rates and warning counts
+3. **Alert on Anomalies**: Set up alerts when metrics deviate significantly from historical patterns, e.g. by using Databricks SQL Alerts
+
+The example query below shows how you can analyze metrics persisted to a table.
+```sql
+/* EXAMPLE: Identify quality degradation */
+WITH daily_metrics AS (
+ SELECT
+ date_trunc('day', run_ts) as run_date,
+ input_location,
+ metric_name,
+ CAST(metric_value AS DOUBLE) as metric_value
+ FROM
+ main.analytics.dq_metrics
+ WHERE
+ run_ts >= current_date - INTERVAL 30 DAYS
+ AND metric_name IN ('input_row_count', 'error_row_count', 'warning_row_count')
+),
+pivoted_metrics AS (
+ SELECT
+ run_date,
+ input_location,
+ MAX(CASE WHEN metric_name = 'input_row_count' THEN metric_value END) as input_count,
+ MAX(CASE WHEN metric_name = 'error_row_count' THEN metric_value END) as error_count,
+ MAX(CASE WHEN metric_name = 'warning_row_count' THEN metric_value END) as warning_count
+ FROM daily_metrics
+ GROUP BY run_date, input_location
+)
+SELECT
+ run_date,
+ input_location,
+ avg(error_count * 100.0 / NULLIF(input_count, 0)) as avg_error_rate,
+ avg(warning_count * 100.0 / NULLIF(input_count, 0)) as avg_warning_rate
+FROM
+ pivoted_metrics
+WHERE
+ input_count > 0
+GROUP BY
+ run_date, input_location
+ORDER BY
+ run_date DESC, input_location
+```
diff --git a/docs/dqx/docs/reference/engine.mdx b/docs/dqx/docs/reference/engine.mdx
index 6cbfd5e72..48a0b2d42 100644
--- a/docs/dqx/docs/reference/engine.mdx
+++ b/docs/dqx/docs/reference/engine.mdx
@@ -49,22 +49,22 @@ The following table outlines the available methods of the `DQEngine` and their f
**Available DQX engine methods**
-| Method | Description | Arguments | Supports local execution |
-| ----------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------- |
-| `apply_checks` | Applies quality checks to the DataFrame and returns a DataFrame with result columns. | `df`: DataFrame to check; `checks`: List of checks defined using DQX classes, each check is an instance of the DQRule class; `ref_dfs`: (optional) Reference dataframes to use in the checks, if applicable. | Yes |
-| `apply_checks_and_split` | Applies quality checks to the DataFrame and returns valid and invalid (quarantine) DataFrames with result columns. | `df`: DataFrame to check; `checks`: List of checks defined using DQX classes, each check is an instance of the DQRule class; `ref_dfs`: (optional) Reference dataframes to use in the checks, if applicable. | Yes |
-| `apply_checks_and_save_in_table` | Applies quality checks using DQRule objects and writes results to valid and invalid Delta table(s) with result columns. | `input_config`: `InputConfig` object with the table name and options for reading the input data; `checks`: List of checks defined using DQX classes, each check is an instance of the DQRule class; `output_config`: `OutputConfig` object with the table name, output mode, and options for the output data; `quarantine_config`: `OutputConfig` object with the table name, output mode, and options for the quarantine data - if provided, data will be split; `ref_dfs`: (optional) Reference dataframes to use in the checks, if applicable. | No |
-| `apply_checks_by_metadata` | Applies quality checks defined as a dictionary to the DataFrame and returns a DataFrame with result columns. | `df`: DataFrame to check; `checks`: List of checks defined as dictionary; `custom_check_functions`: (optional) Dictionary with custom check functions (e.g., globals() of the calling module); `ref_dfs`: (optional) Reference dataframes to use in the checks, if applicable. | Yes |
-| `apply_checks_by_metadata_and_split` | Applies quality checks defined as a dictionary and returns valid and invalid (quarantine) DataFrames. | `df`: DataFrame to check; `checks`: List of checks defined as dictionary; `custom_check_functions`: (optional) Dictionary with custom check functions (e.g., globals() of the calling module); `ref_dfs`: (optional) Reference dataframes to use in the checks, if applicable. | Yes |
-| `apply_checks_by_metadata_and_save_in_table` | Applies quality checks defined as a dictionary and writes results to valid and invalid Delta table(s) with result columns. | `input_config`: `InputConfig` object with the table name and options for reading the input data; `checks`: List of checks defined as dictionary; `output_config`: `OutputConfig` object with the table name, output mode, and options for the output data; `quarantine_config`: `OutputConfig` object with the table name, output mode, and options for the quarantine data - if provided, data will be split; `custom_check_functions`: (optional) Dictionary with custom check functions; `ref_dfs`: (optional) Reference dataframes to use in the checks, if applicable. | No |
-| `apply_checks_and_save_in_tables` | Applies quality checks persisted in a storage to multiple tables and writes results to valid and invalid Delta table(s) with result columns. | `run_configs`: list of run config objects (`RunConfig`) containing input config (`InputConfig`), output config (`OutputConfig`), quarantine config (`OutputConfig`, if provided data will be split), 'checks_location', and if provided 'reference_tables' and 'custom_check_functions'; `max_parallelism`: (optional) Maximum number of tables to check in parallel (defaults to the number of CPU cores). | No |
-| `apply_checks_and_save_in_tables_for_patterns` | Applies quality checks persisted in a storage to multiple tables matching provided wildcard patterns and writes results to valid and invalid Delta table(s) with result columns. Skip output and quarantine tables based on specified suffixes. | `patterns`: List of table names or filesystem-style wildcards (e.g. 'schema.*') to include (if None, all tables are included); ; `exclude_patterns`: (optional) List of table names or filesystem-style wildcards (e.g. '*_dq_output') to exclude, useful if wanting to exclude existing output or quarantine tables; `checks_location`: Location of the checks files (e.g. absolute workspace or volume directory or delta table), for file based locations, checks are expected to be found under 'checks_location/<input_table>.yml'; `exclude_matched`:(optional) Whether to exclude matched tables (default False); `run_config_template`: (optional) Run configuration template to use for all tables (skip location in the 'input_config', 'output_config', and 'quarantine_config' fields as it is derived from patterns, skip 'checks_location' of the run config as it is derived separately, autogenerate 'input_config' and 'output_config' if not provided, use 'reference_tables' and 'custom_check_functions' if provided; `max_parallelism`: (optional) Maximum number of tables to check in parallel (defaults to the number of CPU cores); `output_table_suffix`: (optional) Suffix to append to the output table name (default "_dq_output"); `quarantine_table_suffix`: (optional) Suffix to append to the quarantine table name (default "_dq_quarantine"). | No |
-| `validate_checks` | Validates the provided quality checks to ensure they conform to the expected structure and types. | `checks`: List of checks to validate; `custom_check_functions`: (optional) Dictionary of custom check functions that can be used; `validate_custom_check_functions`: (optional) If set to True, validates custom check functions (defaults to True). | Yes |
-| `get_invalid` | Retrieves records from the DataFrame that violate data quality checks (records with warnings and errors). | `df`: Input DataFrame. | Yes |
-| `get_valid` | Retrieves records from the DataFrame that pass all data quality checks. | `df`: Input DataFrame. | Yes |
-| `load_checks` | Loads quality rules (checks) from storage backend. Multiple storage backends are supported including tables, files or workspace files, installation-managed sources where the location is inferred automatically from run config. | `config`: Configuration for loading checks from a storage backend, i.e. `FileChecksStorageConfig`: file in a local filesystem (YAML or JSON), or workspace files if invoked from Databricks notebook or job; `WorkspaceFileChecksStorageConfig`: file in a workspace (YAML or JSON) using absolute paths; `VolumeFileChecksStorageConfig`: file in a Unity Catalog Volume (YAML or JSON); `TableChecksStorageConfig`: a table; `InstallationChecksStorageConfig`: installation-managed storage backend, using the `checks_location` field from the run configuration. See more details below. | Yes (only with `FileChecksStorageConfig`) |
-| `save_checks` | Saves quality rules (checks) to storage backend. Multiple storage backends are supported including tables, files or workspace files, installation-managed targets where the location is inferred automatically from run config. | `checks`: List of checks defined as dictionary; `config`: Configuration for saving checks in a storage backend, i.e. `FileChecksStorageConfig`: file in a local filesystem (YAML or JSON), or workspace files if invoked from Databricks notebook or job; `WorkspaceFileChecksStorageConfig`: file in a workspace (YAML or JSON); `VolumeFileChecksStorageConfig`: file in a Unity Catalog Volume (YAML or JSON); `TableChecksStorageConfig`: a table; `InstallationChecksStorageConfig`: storage defined in the installation context, using the `checks_location` field from the run configuration. See more details below. | Yes (only with `FileChecksStorageConfig`) |
-| `save_results_in_table` | Save quality checking results in delta table(s). | `output_df`: (optional) Dataframe containing the output data; `quarantine_df`: (optional) Dataframe containing the output data; `output_config`: `OutputConfig` object with the table name, output mode, and options for the output data; `quarantine_config`: `OutputConfig` object with the table name, output mode, and options for the quarantine data - if provided, data will be split; `run_config_name`: Name of the run config to use; `install_folder`: (optional) installation folder where DQX is installed, only required when custom installation folder is used; `assume_user`: (optional) If True, assume user installation, otherwise global installation (skipped if `install_folder` is provided). | No |
+| Method | Description | Arguments | Supports local execution |
+| ---------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------- |
+| `apply_checks` | Applies quality checks to the DataFrame; Returns a DataFrame with result columns and an optional Spark Observation with summary metrics. | `df`: DataFrame to check; `checks`: List of checks defined using DQX classes, each check is an instance of the DQRule class; `ref_dfs`: (optional) Reference DataFrames to use in the checks, if applicable. | Yes |
+| `apply_checks_and_split` | Applies quality checks to the DataFrame; Returns valid and invalid (quarantine) DataFrames with result columns and an optional Spark Observation with summary metrics. | `df`: DataFrame to check; `checks`: List of checks defined using DQX classes, each check is an instance of the DQRule class; `ref_dfs`: (optional) Reference DataFrames to use in the checks, if applicable. | Yes |
+| `apply_checks_and_save_in_table` | Applies quality checks using DQRule objects, writes results to valid and invalid Delta table(s) with result columns, and optionally writes summary metrics to a Delta table. | `input_config`: `InputConfig` object with the table name and options for reading the input data; `checks`: List of checks defined using DQX classes, each check is an instance of the DQRule class; `output_config`: `OutputConfig` object with the table name, output mode, and options for the output data; `quarantine_config`: (optional) `OutputConfig` object with the table name, output mode, and options for the quarantine data - if provided, data will be split; `ref_dfs`: (optional) Reference DataFrames to use in the checks, if applicable. | No |
+| `apply_checks_by_metadata` | Applies quality checks defined as a dictionary to the DataFrame; Returns a DataFrame with result columns and an optional Spark Observation with summary metrics. | `df`: DataFrame to check; `checks`: List of checks defined as dictionary; `custom_check_functions`: (optional) Dictionary with custom check functions (e.g., globals() of the calling module); `ref_dfs`: (optional) Reference DataFrames to use in the checks, if applicable. | Yes |
+| `apply_checks_by_metadata_and_split` | Applies quality checks defined as a dictionary; Returns valid and invalid (quarantine) DataFrames with result columns and an optional Spark Observation with summary metrics. | `df`: DataFrame to check; `checks`: List of checks defined as dictionary; `custom_check_functions`: (optional) Dictionary with custom check functions (e.g., globals() of the calling module); `ref_dfs`: (optional) Reference DataFrames to use in the checks, if applicable. | Yes |
+| `apply_checks_by_metadata_and_save_in_table` | Applies quality checks defined as a dictionary, writes results to valid and invalid Delta table(s) with result columns, and optionally writes summary metrics to a Delta table. | `input_config`: `InputConfig` object with the table name and options for reading the input data; `checks`: List of checks defined as dictionary; `output_config`: `OutputConfig` object with the table name, output mode, and options for the output data; `quarantine_config`: (optional) `OutputConfig` object with the table name, output mode, and options for the quarantine data - if provided, data will be split; `metrics_config`: (optional) `OutputConfig` object with the table name, output mode, and options for the summary metrics; `custom_check_functions`: (optional) Dictionary with custom check functions; `ref_dfs`: (optional) Reference DataFrames to use in the checks, if applicable. | No |
+| `apply_checks_and_save_in_tables` | Applies quality checks persisted in a storage to multiple tables and writes results to valid and invalid Delta table(s) with result columns. | `run_configs`: list of run config objects (`RunConfig`) containing input config (`InputConfig`), output config (`OutputConfig`), quarantine config (`OutputConfig`, if provided data will be split), 'checks_location', and if provided 'reference_tables' and 'custom_check_functions'; `max_parallelism`: (optional) Maximum number of tables to check in parallel (defaults to the number of CPU cores). | No |
+| `apply_checks_and_save_in_tables_for_patterns` | Applies quality checks persisted in a storage to multiple tables matching provided wildcard patterns and writes results to valid and invalid Delta table(s) with result columns. Skip output and quarantine tables based on specified suffixes. | `patterns`: List of table names or filesystem-style wildcards (e.g. 'schema.*') to include (if None, all tables are included); ; `exclude_patterns`: (optional) List of table names or filesystem-style wildcards (e.g. '*_dq_output') to exclude, useful if wanting to exclude existing output or quarantine tables; `checks_location`: Location of the checks files (e.g. absolute workspace or volume directory or delta table), for file based locations, checks are expected to be found under 'checks_location/<input_table>.yml'; `exclude_matched`:(optional) Whether to exclude matched tables (default False); `run_config_template`: (optional) Run configuration template to use for all tables (skip location in the 'input_config', 'output_config', and 'quarantine_config' fields as it is derived from patterns, skip 'checks_location' of the run config as it is derived separately, autogenerate 'input_config' and 'output_config' if not provided, use 'reference_tables' and 'custom_check_functions' if provided; `max_parallelism`: (optional) Maximum number of tables to check in parallel (defaults to the number of CPU cores); `output_table_suffix`: (optional) Suffix to append to the output table name (default "_dq_output"); `quarantine_table_suffix`: (optional) Suffix to append to the quarantine table name (default "_dq_quarantine"). | No |
+| `validate_checks` | Validates the provided quality checks to ensure they conform to the expected structure and types. | `checks`: List of checks to validate; `custom_check_functions`: (optional) Dictionary of custom check functions that can be used; `validate_custom_check_functions`: (optional) If True, validates custom check functions (defaults to True). | Yes |
+| `get_invalid` | Retrieves records from the DataFrame that violate data quality checks (records with warnings and errors). | `df`: Input DataFrame. | Yes |
+| `get_valid` | Retrieves records from the DataFrame that pass all data quality checks. | `df`: Input DataFrame. | Yes |
+| `load_checks` | Loads quality rules (checks) from storage backend. Multiple storage backends are supported including tables, files, workspace files, or installation-managed sources inferred from run config. | `config`: Configuration for loading checks from a storage backend, e.g., `FileChecksStorageConfig` (local YAML/JSON file or workspace file), `WorkspaceFileChecksStorageConfig` (workspace file with absolute path), `VolumeFileChecksStorageConfig` (Unity Catalog Volume YAML/JSON), `TableChecksStorageConfig` (table), `InstallationChecksStorageConfig` (installation-managed backend using `checks_location` in run config). | Yes (only with `FileChecksStorageConfig`) |
+| `save_checks` | Saves quality rules (checks) to a storage backend. Multiple storage backends are supported including tables, files, workspace files, or installation-managed targets inferred from run config. | `checks`: List of checks defined as dictionary; `config`: Configuration for saving checks in a storage backend, e.g., `FileChecksStorageConfig` (local YAML/JSON file or workspace file), `WorkspaceFileChecksStorageConfig` (workspace file with absolute path), `VolumeFileChecksStorageConfig` (Unity Catalog Volume YAML/JSON), `TableChecksStorageConfig` (table), `InstallationChecksStorageConfig` (installation-managed backend using `checks_location` in run config). | Yes (only with `FileChecksStorageConfig`) |
+| `save_results_in_table` | Saves quality checking results and (optionally) summary metrics to Delta table(s). | `output_df`: (optional) DataFrame containing the output data; `quarantine_df`: (optional) DataFrame containing invalid data; `observation`: (optional) Spark Observation tracking summary metrics; `output_config`: `OutputConfig` object with the table name, output mode, and options for the output data; `quarantine_config`: (optional) `OutputConfig` object with the table name, output mode, and options for the quarantine data; `metrics_config`: (optional) `OutputConfig` object with the table name, output mode, and options for the summary metrics data; `run_config_name`: Name of the run config to use; `install_folder`: (optional) Installation folder where DQX is installed (only required for custom folder); `assume_user`: (optional) If True, assume user installation, otherwise global. | No |
The 'Supports local execution' in the above table indicates which methods can be used for local testing without a Databricks workspace (see the usage in [local testing section](/docs/reference/testing/#local-execution-and-testing-with-dqengine)).
diff --git a/src/databricks/labs/dqx/base.py b/src/databricks/labs/dqx/base.py
index 3da955be2..df63d32f5 100644
--- a/src/databricks/labs/dqx/base.py
+++ b/src/databricks/labs/dqx/base.py
@@ -2,8 +2,7 @@
from collections.abc import Callable
from functools import cached_property
from typing import final
-from pyspark.sql import DataFrame
-
+from pyspark.sql import DataFrame, Observation
from databricks.labs.dqx.checks_validator import ChecksValidationStatus
from databricks.labs.dqx.rule import DQRule
from databricks.sdk import WorkspaceClient
@@ -44,7 +43,7 @@ class DQEngineCoreBase(DQEngineBase):
@abc.abstractmethod
def apply_checks(
self, df: DataFrame, checks: list[DQRule], ref_dfs: dict[str, DataFrame] | None = None
- ) -> DataFrame:
+ ) -> DataFrame | tuple[DataFrame, Observation]:
"""Apply data quality checks to the given DataFrame.
Args:
@@ -53,13 +52,14 @@ def apply_checks(
ref_dfs: Optional reference DataFrames to use in the checks.
Returns:
- DataFrame that includes errors and warnings result columns.
+ A DataFrame with errors and warnings result columns and an optional Observation which tracks data quality
+ summary metrics. Summary metrics are returned by any `DQEngine` with an `observer` specified.
"""
@abc.abstractmethod
def apply_checks_and_split(
self, df: DataFrame, checks: list[DQRule], ref_dfs: dict[str, DataFrame] | None = None
- ) -> tuple[DataFrame, DataFrame]:
+ ) -> tuple[DataFrame, DataFrame] | tuple[DataFrame, DataFrame, Observation]:
"""Apply data quality checks to the given DataFrame and split the results into two DataFrames
("good" and "bad").
@@ -69,8 +69,9 @@ def apply_checks_and_split(
ref_dfs: Optional reference DataFrames to use in the checks.
Returns:
- A tuple of two DataFrames: "good" (may include rows with warnings but no result columns) and
- "bad" (rows with errors or warnings and the corresponding result columns).
+ A tuple of two DataFrames: "good" (may include rows with warnings but no result columns) and "bad" (rows
+ with errors or warnings and the corresponding result columns) and an optional Observation which tracks data
+ quality summary metrics. Summary metrics are returned by any `DQEngine` with an `observer` specified.
"""
@abc.abstractmethod
@@ -80,7 +81,7 @@ def apply_checks_by_metadata(
checks: list[dict],
custom_check_functions: dict[str, Callable] | None = None,
ref_dfs: dict[str, DataFrame] | None = None,
- ) -> DataFrame:
+ ) -> DataFrame | tuple[DataFrame, Observation]:
"""
Apply data quality checks defined as metadata to the given DataFrame.
@@ -95,7 +96,8 @@ def apply_checks_by_metadata(
ref_dfs: Optional reference DataFrames to use in the checks.
Returns:
- DataFrame that includes errors and warnings result columns.
+ A DataFrame with errors and warnings result columns and an optional Observation which tracks data quality
+ summary metrics. Summary metrics are returned by any `DQEngine` with an `observer` specified.
"""
@abc.abstractmethod
@@ -105,7 +107,7 @@ def apply_checks_by_metadata_and_split(
checks: list[dict],
custom_check_functions: dict[str, Callable] | None = None,
ref_dfs: dict[str, DataFrame] | None = None,
- ) -> tuple[DataFrame, DataFrame]:
+ ) -> tuple[DataFrame, DataFrame] | tuple[DataFrame, DataFrame, Observation]:
"""Apply data quality checks defined as metadata to the given DataFrame and split the results into
two DataFrames ("good" and "bad").
@@ -120,8 +122,9 @@ def apply_checks_by_metadata_and_split(
ref_dfs: Optional reference DataFrames to use in the checks.
Returns:
- A tuple of two DataFrames: "good" (may include rows with warnings but no result columns) and
- "bad" (rows with errors or warnings and the corresponding result columns).
+ A tuple of two DataFrames: "good" (may include rows with warnings but no result columns) and "bad" (rows
+ with errors or warnings and the corresponding result columns) and an optional Observation which tracks data
+ quality summary metrics. Summary metrics are returned by any `DQEngine` with an `observer` specified.
"""
@staticmethod
diff --git a/src/databricks/labs/dqx/config.py b/src/databricks/labs/dqx/config.py
index d1fe902ca..d59db36a9 100644
--- a/src/databricks/labs/dqx/config.py
+++ b/src/databricks/labs/dqx/config.py
@@ -60,6 +60,7 @@ class RunConfig:
input_config: InputConfig | None = None
output_config: OutputConfig | None = None
quarantine_config: OutputConfig | None = None # quarantined data table
+ metrics_config: OutputConfig | None = None # summary metrics table
checks_location: str = (
"checks.yml" # absolute or relative workspace file path or table containing quality rules / checks
)
@@ -110,6 +111,7 @@ class WorkspaceConfig:
profiler_max_parallelism: int = 4 # max parallelism for profiling multiple tables
quality_checker_max_parallelism: int = 4 # max parallelism for quality checking multiple tables
+ custom_metrics: list[str] | None = None # custom summary metrics tracked by the observer when applying checks
def get_run_config(self, run_config_name: str | None = "default") -> RunConfig:
"""Get the run configuration for a given run name, or the default configuration if no run name is provided.
@@ -139,7 +141,13 @@ def get_run_config(self, run_config_name: str | None = "default") -> RunConfig:
@dataclass
class BaseChecksStorageConfig(abc.ABC):
- """Marker base class for storage configuration."""
+ """Marker base class for storage configuration.
+
+ Args:
+ location: The file path or table name where checks are stored.
+ """
+
+ location: str
@dataclass
diff --git a/src/databricks/labs/dqx/contexts/workflow_context.py b/src/databricks/labs/dqx/contexts/workflow_context.py
index e5e413483..e83eb3df8 100644
--- a/src/databricks/labs/dqx/contexts/workflow_context.py
+++ b/src/databricks/labs/dqx/contexts/workflow_context.py
@@ -11,6 +11,7 @@
from databricks.labs.dqx.config import WorkspaceConfig, RunConfig
from databricks.labs.dqx.__about__ import __version__
from databricks.labs.dqx.engine import DQEngine
+from databricks.labs.dqx.metrics_observer import DQMetricsObserver
from databricks.labs.dqx.profiler.generator import DQGenerator
from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.labs.dqx.profiler.profiler_runner import ProfilerRunner
@@ -151,8 +152,16 @@ def profiler(self) -> ProfilerRunner:
@cached_property
def quality_checker(self) -> QualityCheckerRunner:
"""Returns the QualityCheckerRunner instance."""
+ # Create observer if metrics are configured
+ observer = None
+ if self.run_config.metrics_config:
+ observer = DQMetricsObserver(custom_metrics=self.config.custom_metrics)
+
dq_engine = DQEngine(
- workspace_client=self.workspace_client, spark=self.spark, extra_params=self.config.extra_params
+ workspace_client=self.workspace_client,
+ spark=self.spark,
+ extra_params=self.config.extra_params,
+ observer=observer,
)
log_telemetry(self.workspace_client, "workflow", "quality_checker")
return QualityCheckerRunner(self.spark, dq_engine)
diff --git a/src/databricks/labs/dqx/engine.py b/src/databricks/labs/dqx/engine.py
index 03b8cc99d..f6a724758 100644
--- a/src/databricks/labs/dqx/engine.py
+++ b/src/databricks/labs/dqx/engine.py
@@ -4,9 +4,10 @@
from concurrent import futures
from collections.abc import Callable
from datetime import datetime
+from functools import cached_property
import pyspark.sql.functions as F
-from pyspark.sql import DataFrame, SparkSession
+from pyspark.sql import DataFrame, Observation, SparkSession
from databricks.labs.dqx.base import DQEngineBase, DQEngineCoreBase
from databricks.labs.dqx.checks_resolver import resolve_custom_check_functions_from_path
@@ -35,6 +36,8 @@
)
from databricks.labs.dqx.checks_validator import ChecksValidator, ChecksValidationStatus
from databricks.labs.dqx.schema import dq_result_schema
+from databricks.labs.dqx.metrics_observer import DQMetricsObservation, DQMetricsObserver
+from databricks.labs.dqx.metrics_listener import StreamingMetricsListener
from databricks.labs.dqx.io import read_input_data, save_dataframe_as_table, get_reference_dataframes
from databricks.labs.dqx.telemetry import telemetry_logger, log_telemetry
from databricks.sdk import WorkspaceClient
@@ -51,6 +54,7 @@ class DQEngineCore(DQEngineCoreBase):
workspace_client: WorkspaceClient instance used to access the workspace.
spark: Optional SparkSession to use. If not provided, the active session is used.
extra_params: Optional extra parameters for the engine, such as result column names and run metadata.
+ observer: Optional DQMetricsObserver for tracking data quality summary metrics.
"""
def __init__(
@@ -58,6 +62,7 @@ def __init__(
workspace_client: WorkspaceClient,
spark: SparkSession | None = None,
extra_params: ExtraParams | None = None,
+ observer: DQMetricsObserver | None = None,
):
super().__init__(workspace_client)
@@ -75,10 +80,20 @@ def __init__(
self.spark = SparkSession.builder.getOrCreate() if spark is None else spark
self.run_time = datetime.fromisoformat(extra_params.run_time)
self.engine_user_metadata = extra_params.user_metadata
+ self.observer = observer
+ if self.observer:
+ self.observer.set_column_names(
+ error_column_name=self._result_column_names[ColumnArguments.ERRORS],
+ warning_column_name=self._result_column_names[ColumnArguments.WARNINGS],
+ )
+
+ @cached_property
+ def result_column_names(self) -> dict[ColumnArguments, str]:
+ return self._result_column_names
def apply_checks(
self, df: DataFrame, checks: list[DQRule], ref_dfs: dict[str, DataFrame] | None = None
- ) -> DataFrame:
+ ) -> DataFrame | tuple[DataFrame, Observation]:
"""Apply data quality checks to the given DataFrame.
Args:
@@ -87,7 +102,8 @@ def apply_checks(
ref_dfs: Optional reference DataFrames to use in the checks.
Returns:
- DataFrame with errors and warnings result columns.
+ A DataFrame with errors and warnings result columns and an optional Observation which tracks data quality
+ summary metrics. Summary metrics are returned by any `DQEngine` with an `observer` specified.
Raises:
InvalidCheckError: If any of the checks are invalid.
@@ -109,12 +125,17 @@ def apply_checks(
result_df = self._create_results_array(
result_df, warning_checks, self._result_column_names[ColumnArguments.WARNINGS], ref_dfs
)
+ observed_result = self._observe_metrics(result_df)
+
+ if isinstance(observed_result, tuple):
+ observed_df, observation = observed_result
+ return observed_df, observation
- return result_df
+ return observed_result
def apply_checks_and_split(
self, df: DataFrame, checks: list[DQRule], ref_dfs: dict[str, DataFrame] | None = None
- ) -> tuple[DataFrame, DataFrame]:
+ ) -> tuple[DataFrame, DataFrame] | tuple[DataFrame, DataFrame, Observation]:
"""Apply data quality checks to the given DataFrame and split the results into two DataFrames
("good" and "bad").
@@ -124,8 +145,9 @@ def apply_checks_and_split(
ref_dfs: Optional reference DataFrames to use in the checks.
Returns:
- A tuple of two DataFrames: "good" (may include rows with warnings but no result columns) and
- "bad" (rows with errors or warnings and the corresponding result columns).
+ A tuple of two DataFrames: "good" (may include rows with warnings but no result columns) and "bad" (rows
+ with errors or warnings and the corresponding result columns) and an optional Observation which tracks data
+ quality summary metrics. Summary metrics are returned by any `DQEngine` with an `observer` specified.
Raises:
InvalidCheckError: If any of the checks are invalid.
@@ -138,11 +160,16 @@ def apply_checks_and_split(
"All elements in the 'checks' list must be instances of DQRule. Use 'apply_checks_by_metadata_and_split' to pass checks as list of dicts instead."
)
- checked_df = self.apply_checks(df, checks, ref_dfs)
+ observed_result = self.apply_checks(df, checks, ref_dfs)
- good_df = self.get_valid(checked_df)
- bad_df = self.get_invalid(checked_df)
+ if isinstance(observed_result, tuple):
+ checked_df, observation = observed_result
+ good_df = self.get_valid(checked_df)
+ bad_df = self.get_invalid(checked_df)
+ return good_df, bad_df, observation
+ good_df = self.get_valid(observed_result)
+ bad_df = self.get_invalid(observed_result)
return good_df, bad_df
def apply_checks_by_metadata(
@@ -151,7 +178,7 @@ def apply_checks_by_metadata(
checks: list[dict],
custom_check_functions: dict[str, Callable] | None = None,
ref_dfs: dict[str, DataFrame] | None = None,
- ) -> DataFrame:
+ ) -> DataFrame | tuple[DataFrame, Observation]:
"""Apply data quality checks defined as metadata to the given DataFrame.
Args:
@@ -165,7 +192,8 @@ def apply_checks_by_metadata(
ref_dfs: Optional reference DataFrames to use in the checks.
Returns:
- DataFrame with errors and warnings result columns.
+ A DataFrame with errors and warnings result columns and an optional Observation which tracks data quality
+ summary metrics. Summary metrics are returned by any `DQEngine` with an `observer` specified.
"""
dq_rule_checks = deserialize_checks(checks, custom_check_functions)
@@ -177,7 +205,7 @@ def apply_checks_by_metadata_and_split(
checks: list[dict],
custom_check_functions: dict[str, Callable] | None = None,
ref_dfs: dict[str, DataFrame] | None = None,
- ) -> tuple[DataFrame, DataFrame]:
+ ) -> tuple[DataFrame, DataFrame] | tuple[DataFrame, DataFrame, Observation]:
"""Apply data quality checks defined as metadata to the given DataFrame and split the results into
two DataFrames ("good" and "bad").
@@ -192,14 +220,20 @@ def apply_checks_by_metadata_and_split(
ref_dfs: Optional reference DataFrames to use in the checks.
Returns:
- DataFrame that includes errors and warnings result columns.
+ A tuple of two DataFrames: "good" (may include rows with warnings but no result columns) and "bad" (rows
+ with errors or warnings and the corresponding result columns) and an optional Observation which tracks data
+ quality summary metrics. Summary metrics are returned by any `DQEngine` with an `observer` specified.
Raises:
InvalidCheckError: If any of the checks are invalid.
"""
dq_rule_checks = deserialize_checks(checks, custom_check_functions)
- good_df, bad_df = self.apply_checks_and_split(df, dq_rule_checks, ref_dfs)
+ good_df, bad_df, *observations = self.apply_checks_and_split(df, dq_rule_checks, ref_dfs)
+
+ if self.observer:
+ return good_df, bad_df, observations[0]
+
return good_df, bad_df
@staticmethod
@@ -370,6 +404,29 @@ def _create_results_array(
# Ensure the result DataFrame has the same columns as the input DataFrame + the new result column
return result_df.select(*df.columns, dest_col)
+ def _observe_metrics(self, df: DataFrame) -> DataFrame | tuple[DataFrame, Observation]:
+ """
+ Adds Spark observable metrics to the input DataFrame.
+
+ Args:
+ df: Input DataFrame
+
+ Returns:
+ The unmodified DataFrame with observed metrics and the corresponding Spark Observation
+ """
+ if not self.observer:
+ return df
+
+ metric_exprs = [F.expr(metric_statement) for metric_statement in self.observer.metrics]
+ if not metric_exprs:
+ return df
+
+ observation = self.observer.observation
+ if df.isStreaming:
+ return df.observe(self.observer.name, *metric_exprs), observation
+
+ return df.observe(observation, *metric_exprs), observation
+
class DQEngine(DQEngineBase):
"""High-level engine to apply data quality checks and manage IO.
@@ -382,15 +439,16 @@ def __init__(
self,
workspace_client: WorkspaceClient,
spark: SparkSession | None = None,
- engine: DQEngineCoreBase | None = None,
+ engine: DQEngineCore | None = None,
extra_params: ExtraParams | None = None,
checks_handler_factory: BaseChecksStorageHandlerFactory | None = None,
run_config_loader: RunConfigLoader | None = None,
+ observer: DQMetricsObserver | None = None,
):
super().__init__(workspace_client)
self.spark = SparkSession.builder.getOrCreate() if spark is None else spark
- self._engine = engine or DQEngineCore(workspace_client, spark, extra_params)
+ self._engine = engine or DQEngineCore(workspace_client, spark, extra_params, observer)
self._run_config_loader = run_config_loader or RunConfigLoader(workspace_client)
self._checks_handler_factory: BaseChecksStorageHandlerFactory = (
checks_handler_factory or ChecksStorageHandlerFactory(self.ws, self.spark)
@@ -399,7 +457,7 @@ def __init__(
@telemetry_logger("engine", "apply_checks")
def apply_checks(
self, df: DataFrame, checks: list[DQRule], ref_dfs: dict[str, DataFrame] | None = None
- ) -> DataFrame:
+ ) -> DataFrame | tuple[DataFrame, Observation]:
"""Apply data quality checks to the given DataFrame.
Args:
@@ -408,14 +466,15 @@ def apply_checks(
ref_dfs: Optional reference DataFrames to use in the checks.
Returns:
- DataFrame with errors and warnings result columns.
+ A DataFrame with errors and warnings result columns and an optional Observation which tracks data quality
+ summary metrics. Summary metrics are returned by any `DQEngine` with an `observer` specified.
"""
return self._engine.apply_checks(df, checks, ref_dfs)
@telemetry_logger("engine", "apply_checks_and_split")
def apply_checks_and_split(
self, df: DataFrame, checks: list[DQRule], ref_dfs: dict[str, DataFrame] | None = None
- ) -> tuple[DataFrame, DataFrame]:
+ ) -> tuple[DataFrame, DataFrame] | tuple[DataFrame, DataFrame, Observation]:
"""Apply data quality checks to the given DataFrame and split the results into two DataFrames
("good" and "bad").
@@ -425,8 +484,9 @@ def apply_checks_and_split(
ref_dfs: Optional reference DataFrames to use in the checks.
Returns:
- A tuple of two DataFrames: "good" (may include rows with warnings but no result columns) and
- "bad" (rows with errors or warnings and the corresponding result columns).
+ A tuple of two DataFrames: "good" (may include rows with warnings but no result columns) and "bad" (rows
+ with errors or warnings and the corresponding result columns) and an optional Observation which tracks data
+ quality summary metrics. Summary metrics are returned by any `DQEngine` with an `observer` specified.
Raises:
InvalidCheckError: If any of the checks are invalid.
@@ -440,7 +500,7 @@ def apply_checks_by_metadata(
checks: list[dict],
custom_check_functions: dict[str, Callable] | None = None,
ref_dfs: dict[str, DataFrame] | None = None,
- ) -> DataFrame:
+ ) -> DataFrame | tuple[DataFrame, Observation]:
"""Apply data quality checks defined as metadata to the given DataFrame.
Args:
@@ -454,7 +514,8 @@ def apply_checks_by_metadata(
ref_dfs: Optional reference DataFrames to use in the checks.
Returns:
- DataFrame with errors and warnings result columns.
+ A DataFrame with errors and warnings result columns and an optional Observation which tracks data quality
+ summary metrics. Summary metrics are returned by any `DQEngine` with an `observer` specified.
"""
return self._engine.apply_checks_by_metadata(df, checks, custom_check_functions, ref_dfs)
@@ -465,7 +526,7 @@ def apply_checks_by_metadata_and_split(
checks: list[dict],
custom_check_functions: dict[str, Callable] | None = None,
ref_dfs: dict[str, DataFrame] | None = None,
- ) -> tuple[DataFrame, DataFrame]:
+ ) -> tuple[DataFrame, DataFrame] | tuple[DataFrame, DataFrame, Observation]:
"""Apply data quality checks defined as metadata to the given DataFrame and split the results into
two DataFrames ("good" and "bad").
@@ -480,7 +541,9 @@ def apply_checks_by_metadata_and_split(
ref_dfs: Optional reference DataFrames to use in the checks.
Returns:
- DataFrame that includes errors and warnings result columns.
+ A tuple of two DataFrames: "good" (may include rows with warnings but no result columns) and "bad" (rows
+ with errors or warnings and the corresponding result columns) and an optional Observation which tracks data
+ quality summary metrics. Summary metrics are returned by any `DQEngine` with an `observer` specified.
"""
return self._engine.apply_checks_by_metadata_and_split(df, checks, custom_check_functions, ref_dfs)
@@ -491,6 +554,7 @@ def apply_checks_and_save_in_table(
input_config: InputConfig,
output_config: OutputConfig,
quarantine_config: OutputConfig | None = None,
+ metrics_config: OutputConfig | None = None,
ref_dfs: dict[str, DataFrame] | None = None,
) -> None:
"""
@@ -500,13 +564,17 @@ def apply_checks_and_save_in_table(
- valid records are written using *output_config*.
- invalid records are written using *quarantine_config*.
- If *quarantine_config* is not provided, write all rows (including result columns) using *output_config*.
+ If *quarantine_config* is not provided, write all rows (including result columns) using *quarantine_config*.
+
+ If *metrics_config* is provided and the `DQEngine` has a valid `observer`, data quality summary metrics will be
+ tracked and written using *metrics_config*.
Args:
checks: List of *DQRule* checks to apply.
input_config: Input configuration (e.g., table/view or file location and read options).
output_config: Output configuration (e.g., table name, mode, and write options).
quarantine_config: Optional configuration for writing invalid records.
+ metrics_config: Optional configuration for writing summary metrics.
ref_dfs: Optional reference DataFrames used by checks.
"""
logger.info(f"Applying checks to {input_config.location}")
@@ -514,16 +582,46 @@ def apply_checks_and_save_in_table(
# Read data from the specified table
df = read_input_data(self.spark, input_config)
+ if self._engine.observer and metrics_config and df.isStreaming:
+ listener = self._get_streaming_metrics_listener(
+ input_config=input_config,
+ output_config=output_config,
+ quarantine_config=quarantine_config,
+ metrics_config=metrics_config,
+ )
+ self.spark.streams.addListener(listener)
+
+ observation = None
if quarantine_config:
- # Split data into good and bad records
- good_df, bad_df = self.apply_checks_and_split(df, checks, ref_dfs)
+ split_result = self.apply_checks_and_split(df, checks, ref_dfs)
+ if self._engine.observer:
+ good_df, bad_df, observation = split_result
+ else:
+ good_df, bad_df = split_result
save_dataframe_as_table(good_df, output_config)
save_dataframe_as_table(bad_df, quarantine_config)
else:
- # Apply checks and write all data to single table
- checked_df = self.apply_checks(df, checks, ref_dfs)
+ check_result = self.apply_checks(df, checks, ref_dfs)
+ if self._engine.observer:
+ checked_df, observation = check_result
+ else:
+ checked_df = check_result
save_dataframe_as_table(checked_df, output_config)
+ if self._engine.observer and metrics_config and not df.isStreaming and observation is not None:
+ metrics_observation = DQMetricsObservation(
+ observer_name=self._engine.observer.name,
+ observed_metrics=observation.get,
+ error_column_name=self._engine.result_column_names[ColumnArguments.ERRORS],
+ warning_column_name=self._engine.result_column_names[ColumnArguments.WARNINGS],
+ input_location=input_config.location if input_config else None,
+ output_location=output_config.location if output_config else None,
+ quarantine_location=quarantine_config.location if quarantine_config else None,
+ user_metadata=self._engine.engine_user_metadata,
+ )
+ metrics_df = self._engine.observer.build_metrics_df(self.spark, metrics_observation)
+ save_dataframe_as_table(metrics_df, metrics_config)
+
@telemetry_logger("engine", "apply_checks_by_metadata_and_save_in_table")
def apply_checks_by_metadata_and_save_in_table(
self,
@@ -531,6 +629,7 @@ def apply_checks_by_metadata_and_save_in_table(
input_config: InputConfig,
output_config: OutputConfig,
quarantine_config: OutputConfig | None = None,
+ metrics_config: OutputConfig | None = None,
custom_check_functions: dict[str, Callable] | None = None,
ref_dfs: dict[str, DataFrame] | None = None,
) -> None:
@@ -552,6 +651,7 @@ def apply_checks_by_metadata_and_save_in_table(
input_config: Input configuration (e.g., table/view or file location and read options).
output_config: Output configuration (e.g., table name, mode, and write options).
quarantine_config: Optional configuration for writing invalid records.
+ metrics_config: Optional configuration for writing summary metrics.
custom_check_functions: Optional mapping of custom check function names
to callables/modules (e.g., globals()).
ref_dfs: Optional reference DataFrames used by checks.
@@ -561,16 +661,46 @@ def apply_checks_by_metadata_and_save_in_table(
# Read data from the specified table
df = read_input_data(self.spark, input_config)
+ if self._engine.observer and metrics_config and df.isStreaming:
+ listener = self._get_streaming_metrics_listener(
+ input_config=input_config,
+ output_config=output_config,
+ quarantine_config=quarantine_config,
+ metrics_config=metrics_config,
+ )
+ self.spark.streams.addListener(listener)
+
+ observation = None
if quarantine_config:
- # Split data into good and bad records
- good_df, bad_df = self.apply_checks_by_metadata_and_split(df, checks, custom_check_functions, ref_dfs)
+ split_result = self.apply_checks_by_metadata_and_split(df, checks, custom_check_functions, ref_dfs)
+ if self._engine.observer:
+ good_df, bad_df, observation = split_result
+ else:
+ good_df, bad_df = split_result
save_dataframe_as_table(good_df, output_config)
save_dataframe_as_table(bad_df, quarantine_config)
else:
- # Apply checks and write all data to single table
- checked_df = self.apply_checks_by_metadata(df, checks, custom_check_functions, ref_dfs)
+ check_result = self.apply_checks_by_metadata(df, checks, custom_check_functions, ref_dfs)
+ if self._engine.observer:
+ checked_df, observation = check_result
+ else:
+ checked_df = check_result
save_dataframe_as_table(checked_df, output_config)
+ if self._engine.observer and metrics_config and not df.isStreaming and observation is not None:
+ metrics_observation = DQMetricsObservation(
+ observer_name=self._engine.observer.name,
+ observed_metrics=observation.get,
+ error_column_name=self._engine.result_column_names[ColumnArguments.ERRORS],
+ warning_column_name=self._engine.result_column_names[ColumnArguments.WARNINGS],
+ input_location=input_config.location if input_config else None,
+ output_location=output_config.location if output_config else None,
+ quarantine_location=quarantine_config.location if quarantine_config else None,
+ user_metadata=self._engine.engine_user_metadata,
+ )
+ metrics_df = self._engine.observer.build_metrics_df(self.spark, metrics_observation)
+ save_dataframe_as_table(metrics_df, metrics_config)
+
@telemetry_logger("engine", "apply_checks_and_save_in_tables")
def apply_checks_and_save_in_tables(
self,
@@ -740,8 +870,10 @@ def save_results_in_table(
self,
output_df: DataFrame | None = None,
quarantine_df: DataFrame | None = None,
+ observation: Observation | None = None,
output_config: OutputConfig | None = None,
quarantine_config: OutputConfig | None = None,
+ metrics_config: OutputConfig | None = None,
run_config_name: str | None = "default",
product_name: str = "dqx",
assume_user: bool = True,
@@ -752,13 +884,16 @@ def save_results_in_table(
Behavior:
- If *output_df* is provided and *output_config* is None, load the run config and use its *output_config*.
- If *quarantine_df* is provided and *quarantine_config* is None, load the run config and use its *quarantine_config*.
+ - If *observation* is provided and *metrics_config* is None, load the run config and use its *metrics_config*
- A write occurs only when both a DataFrame and its corresponding config are available.
Args:
output_df: DataFrame with valid rows to be saved (optional).
quarantine_df: DataFrame with invalid rows to be saved (optional).
+ observation: Spark Observation with data quality summary metrics (optional).
output_config: Configuration describing where/how to write the valid rows. If omitted, falls back to the run config.
quarantine_config: Configuration describing where/how to write the invalid rows (optional). If omitted, falls back to the run config.
+ metrics_config: Configuration describing where/how to write the summary metrics (optional). If omitted, falls back to the run config.
run_config_name: Name of the run configuration to load when a config parameter is omitted.
product_name: Product/installation identifier used to resolve installation paths for config loading in install_folder is not provided ("dqx" as default).
assume_user: Whether to assume a per-user installation when loading the run configuration (True as default, skipped if install_folder is provided).
@@ -785,12 +920,46 @@ def save_results_in_table(
)
quarantine_config = run_config.quarantine_config
+ if observation is not None and metrics_config is None:
+ run_config = self._run_config_loader.load_run_config(
+ run_config_name=run_config_name,
+ assume_user=assume_user,
+ product_name=product_name,
+ install_folder=install_folder,
+ )
+ metrics_config = run_config.metrics_config
+
+ is_streaming_output = output_df is not None and output_df.isStreaming
+ is_streaming_quarantine = quarantine_df is not None and quarantine_df.isStreaming
+ is_streaming = is_streaming_output or is_streaming_quarantine
+
+ if self._engine.observer and observation is not None and metrics_config is not None and is_streaming:
+ listener = self._get_streaming_metrics_listener(
+ output_config=output_config,
+ quarantine_config=quarantine_config,
+ metrics_config=metrics_config,
+ )
+ self.spark.streams.addListener(listener)
+
if output_df is not None and output_config is not None:
save_dataframe_as_table(output_df, output_config)
if quarantine_df is not None and quarantine_config is not None:
save_dataframe_as_table(quarantine_df, quarantine_config)
+ if self._engine.observer and observation is not None and metrics_config is not None and not is_streaming:
+ metrics_observation = DQMetricsObservation(
+ observer_name=self._engine.observer.name,
+ observed_metrics=observation.get,
+ error_column_name=self._engine.result_column_names[ColumnArguments.ERRORS],
+ warning_column_name=self._engine.result_column_names[ColumnArguments.WARNINGS],
+ output_location=output_config.location if output_config else None,
+ quarantine_location=quarantine_config.location if quarantine_config else None,
+ user_metadata=self._engine.engine_user_metadata,
+ )
+ metrics_df = self._engine.observer.build_metrics_df(self.spark, metrics_observation)
+ save_dataframe_as_table(metrics_df, metrics_config)
+
def load_checks(self, config: BaseChecksStorageConfig) -> list[dict]:
"""Load DQ rules (checks) from the storage backend described by *config*.
@@ -881,3 +1050,40 @@ def _apply_checks_for_run_config(self, run_config: RunConfig) -> None:
custom_check_functions=custom_check_functions,
ref_dfs=ref_dfs,
)
+
+ def _get_streaming_metrics_listener(
+ self,
+ metrics_config: OutputConfig,
+ input_config: InputConfig | None = None,
+ output_config: OutputConfig | None = None,
+ quarantine_config: OutputConfig | None = None,
+ checks_config: BaseChecksStorageConfig | None = None,
+ ) -> StreamingMetricsListener:
+ """
+ Gets a `StreamingMetricsListener` object for writing metrics to an output table.
+
+ Args:
+ input_config: Optional configuration (e.g., table/view or file location and read options) for reading data.
+ output_config: Optional configuration (e.g., table name, mode, and write options) for writing records.
+ quarantine_config: Optional configuration for writing invalid records.
+ checks_config: Optional configuration for loading quality checks.
+ metrics_config: Optional configuration for writing summary metrics.
+ """
+
+ if not isinstance(self._engine, DQEngineCore):
+ raise ValueError(f"Metrics cannot be collected for engine with type '{self._engine.__class__.__name__}'")
+
+ if not self._engine.observer:
+ raise ValueError("Metrics cannot be collected for engine with no observer")
+
+ metrics_observation = DQMetricsObservation(
+ observer_name=self._engine.observer.name,
+ error_column_name=self._engine.result_column_names[ColumnArguments.ERRORS],
+ warning_column_name=self._engine.result_column_names[ColumnArguments.WARNINGS],
+ input_location=input_config.location if input_config else None,
+ output_location=output_config.location if output_config else None,
+ quarantine_location=quarantine_config.location if quarantine_config else None,
+ checks_location=checks_config.location if checks_config else None,
+ user_metadata=self._engine.engine_user_metadata,
+ )
+ return StreamingMetricsListener(metrics_config, metrics_observation, self.spark)
diff --git a/src/databricks/labs/dqx/installer/config_provider.py b/src/databricks/labs/dqx/installer/config_provider.py
index e55405525..65858179f 100644
--- a/src/databricks/labs/dqx/installer/config_provider.py
+++ b/src/databricks/labs/dqx/installer/config_provider.py
@@ -37,6 +37,8 @@ def prompt_new_installation(self, install_folder: str | None = None) -> Workspac
output_config = self._prompt_output_config(is_streaming)
quarantine_config = self._prompt_quarantine_config(is_streaming)
+ metrics_config, custom_metrics = self._prompt_metrics()
+
checks_location = self._prompts.question(
"Provide location of the quality checks definitions, either:\n"
"- a filename for storing data quality rules (e.g. checks.yml),\n"
@@ -103,6 +105,7 @@ def prompt_new_installation(self, install_folder: str | None = None) -> Workspac
input_config=input_config,
output_config=output_config,
quarantine_config=quarantine_config,
+ metrics_config=metrics_config,
checks_location=checks_location,
warehouse_id=warehouse_id,
profiler_config=profiler_config,
@@ -117,6 +120,7 @@ def prompt_new_installation(self, install_folder: str | None = None) -> Workspac
quality_checker_override_clusters=quality_checker_override_clusters,
e2e_spark_conf=e2e_spark_conf,
e2e_override_clusters=e2e_override_clusters,
+ custom_metrics=custom_metrics,
)
def _prompt_clusters_configs(self):
@@ -327,3 +331,69 @@ def _prompt_quarantine_config(self, is_streaming: bool) -> OutputConfig | None:
trigger=quarantine_trigger_options,
)
return None
+
+ def _prompt_metrics(self) -> tuple[OutputConfig | None, list[str] | None]:
+ store_summary_metrics = self._prompts.confirm(
+ "Do you want to store summary metrics from data quality checking in a table?"
+ )
+ metrics_config = None
+ custom_metrics = None
+
+ if store_summary_metrics:
+ metrics_config = self._prompt_metrics_config()
+ custom_metrics = self._prompt_custom_metrics()
+
+ return metrics_config, custom_metrics
+
+ def _prompt_metrics_config(self) -> OutputConfig:
+ """Prompt user for metrics configuration."""
+ metrics_table = self._prompts.question(
+ "Provide table for storing summary metrics in the fully qualified format `catalog.schema.table` or `schema.table`",
+ valid_regex=r"^([\w]+(?:\.[\w]+){1,2})$",
+ )
+
+ metrics_write_mode = self._prompts.question(
+ "Provide write mode for metrics table (e.g. 'append' or 'overwrite')",
+ default="append",
+ valid_regex=r"^(append|overwrite)$",
+ )
+
+ metrics_format = self._prompts.question(
+ "Provide format for the metrics data (e.g. delta, parquet)",
+ default="delta",
+ valid_regex=r"^\w.+$",
+ )
+
+ metrics_write_options = json.loads(
+ self._prompts.question(
+ "Provide additional options for writing the metrics data (e.g. {\"mergeSchema\": \"true\"})",
+ default="{}",
+ valid_regex=r"^.*$",
+ )
+ )
+
+ return OutputConfig(
+ location=metrics_table,
+ mode=metrics_write_mode,
+ format=metrics_format,
+ options=metrics_write_options,
+ )
+
+ def _prompt_custom_metrics(self) -> list[str]:
+ """Prompt user for custom metrics as Spark SQL expressions."""
+ custom_metrics_input = self._prompts.question(
+ "Provide custom metrics as a list of Spark SQL expressions "
+ "(e.g. \"[\'count(case when age > 65 then 1 end) as senior_count\', \'avg(salary) as avg_salary\']\"). "
+ "Leave blank to track the default data quality metrics.",
+ default="[]",
+ valid_regex=r"^.*$",
+ )
+
+ if custom_metrics_input.strip():
+ custom_metrics = json.loads(custom_metrics_input)
+ if not isinstance(custom_metrics, list):
+ raise ValueError(
+ "Custom metrics must be provided as a list of Spark SQL expressions (e.g. ['count(case when age > 65 then 1 end) as senior_count']"
+ )
+ return custom_metrics
+ return []
diff --git a/src/databricks/labs/dqx/metrics_listener.py b/src/databricks/labs/dqx/metrics_listener.py
new file mode 100644
index 000000000..fbddc1e9b
--- /dev/null
+++ b/src/databricks/labs/dqx/metrics_listener.py
@@ -0,0 +1,92 @@
+import logging
+from datetime import datetime
+from pyspark.sql import SparkSession
+from pyspark.sql.streaming import listener
+from databricks.labs.dqx.config import OutputConfig
+from databricks.labs.dqx.metrics_observer import DQMetricsObservation, DQMetricsObserver
+from databricks.labs.dqx.io import save_dataframe_as_table
+
+
+logger = logging.getLogger(__name__)
+
+
+class StreamingMetricsListener(listener.StreamingQueryListener):
+ """
+ Implements a Spark `StreamingQueryListener` for writing data quality summary metrics to an output destination. See
+ the [Spark documentation](https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQueryListener.html)
+ for detailed information about `StreamingQueryListener`.
+
+ Args:
+ metrics_config: Output configuration used for writing data quality summary metrics
+ metrics_observation: `DQMetricsObservation` with data quality summary information
+ spark: `SparkSession` for writing summary metrics
+ """
+
+ metrics_config: OutputConfig
+ metrics_observation: DQMetricsObservation
+ spark: SparkSession
+
+ def __init__(
+ self, metrics_config: OutputConfig, metrics_observation: DQMetricsObservation, spark: SparkSession
+ ) -> None:
+ self.metrics_config = metrics_config
+ self.metrics_observation = metrics_observation
+ self.spark = spark
+
+ def onQueryStarted(self, event: listener.QueryStartedEvent) -> None:
+ """
+ Writes a message to the standard output logs when a streaming query starts.
+
+ Args:
+ event: A `QueryStartedEvent` with details about the streaming query
+ """
+ logger.debug(f"Streaming query '{event.name}' for summary metrics started run ID '{event.runId}'")
+
+ def onQueryProgress(self, event: listener.QueryProgressEvent) -> None:
+ """
+ Writes the custom metrics from the DQObserver to the output destination.
+
+ Args:
+ event: A `QueryProgressEvent` with details about the last processed micro-batch
+ """
+ observed_metrics = event.progress.observedMetrics.get(self.metrics_observation.observer_name)
+ if not observed_metrics:
+ return
+
+ metrics_observation = DQMetricsObservation(
+ observer_name=self.metrics_observation.observer_name,
+ observed_metrics=observed_metrics.asDict(),
+ run_time=datetime.fromisoformat(event.progress.timestamp),
+ error_column_name=self.metrics_observation.error_column_name,
+ warning_column_name=self.metrics_observation.warning_column_name,
+ input_location=self.metrics_observation.input_location,
+ output_location=self.metrics_observation.output_location,
+ quarantine_location=self.metrics_observation.quarantine_location,
+ checks_location=self.metrics_observation.checks_location,
+ user_metadata=self.metrics_observation.user_metadata,
+ )
+ metrics_df = DQMetricsObserver.build_metrics_df(self.spark, metrics_observation)
+ save_dataframe_as_table(metrics_df, self.metrics_config)
+
+ def onQueryIdle(self, event: listener.QueryIdleEvent) -> None:
+ """
+ Writes a message to the standard output logs when a streaming query is idle.
+
+ Args:
+ event: A `QueryIdleEvent` with details about the streaming query
+ """
+ logger.debug(f"Streaming query run '{event.runId}' for summary metrics was reported idle")
+
+ def onQueryTerminated(self, event: listener.QueryTerminatedEvent) -> None:
+ """
+ Writes a message to the standard output logs when a streaming query stops due to cancellation or failure.
+
+ Args:
+ event: A `QueryTerminatedEvent` with details about the streaming query
+ """
+ if event.exception:
+ logger.debug(
+ f"Streaming query run '{event.runId}' for summary metrics failed with error: {event.exception}"
+ )
+
+ logger.debug(f"Streaming query run '{event.runId}' for summary metrics stopped")
diff --git a/src/databricks/labs/dqx/metrics_observer.py b/src/databricks/labs/dqx/metrics_observer.py
new file mode 100644
index 000000000..a0092228e
--- /dev/null
+++ b/src/databricks/labs/dqx/metrics_observer.py
@@ -0,0 +1,155 @@
+from dataclasses import dataclass, field
+from datetime import datetime
+from functools import cached_property
+from typing import Any
+from uuid import uuid4
+
+from pyspark.sql import DataFrame, Observation, SparkSession
+
+
+OBSERVATION_TABLE_SCHEMA = (
+ "run_name string, input_location string, output_location string, quarantine_location string, "
+ "checks_location string, metric_name string, metric_value string, run_ts timestamp, error_column_name string, "
+ "warning_column_name string, user_metadata map"
+)
+
+
+@dataclass(frozen=True)
+class DQMetricsObservation:
+ """
+ Observer metrics class used to persist summary metrics.
+
+ Args:
+ observer_name: Name of the observations (default is 'dqx').
+ observed_metrics: Dictionary of observed metrics
+ run_time: Run time when the data quality summary metrics were observed
+ error_column_name: Name of the error column when running quality checks
+ warning_column_name: Name of the warning column when running quality checks
+ input_location: Location where input data is loaded from when running quality checks (fully-qualified table
+ name or file path) used when running quality checks
+ output_location: Location where output data is persisted when running quality checks (fully-qualified table
+ name or file path)
+ quarantine_location: Location where quarantined data is persisted when running quality checks (fully-qualified
+ table name or file path)
+ checks_location: Location where checks are loaded from when running quality checks (fully-qualified table name
+ or file path) used
+ """
+
+ observer_name: str
+ error_column_name: str
+ warning_column_name: str
+ run_time: datetime | None = None
+ observed_metrics: dict[str, Any] | None = None
+ input_location: str | None = None
+ output_location: str | None = None
+ quarantine_location: str | None = None
+ checks_location: str | None = None
+ user_metadata: dict[str, str] | None = None
+
+
+@dataclass
+class DQMetricsObserver:
+ """
+ Observation class used to track summary metrics about data quality when validating datasets with DQX
+
+ Args:
+ name: Name of the observations which will be displayed in listener metrics (default is 'dqx').
+ custom_metrics: Optional list of SQL expressions defining custom, dataset-level quality metrics
+ """
+
+ name: str = "dqx"
+ custom_metrics: list[str] | None = None
+ _id: str = field(default_factory=str)
+ _error_column_name: str = "_errors"
+ _warning_column_name: str = "_warnings"
+
+ def __post_init__(self) -> None:
+ self._id = str(uuid4())
+
+ @cached_property
+ def observation_id(self) -> str:
+ """
+ Observer ID.
+
+ Returns:
+ Unique observation ID
+ """
+ return self._id
+
+ @cached_property
+ def metrics(self) -> list[str]:
+ """
+ Gets the observer metrics as Spark SQL expressions.
+
+ Returns:
+ A list of Spark SQL expressions defining the observer metrics (both default and custom).
+ """
+ default_metrics = [
+ "count(1) as input_row_count",
+ f"count(case when {self._error_column_name} is not null then 1 end) as error_row_count",
+ f"count(case when {self._warning_column_name} is not null then 1 end) as warning_row_count",
+ f"count(case when {self._error_column_name} is null and {self._warning_column_name} is null then 1 end) as valid_row_count",
+ ]
+ if self.custom_metrics:
+ default_metrics.extend(self.custom_metrics)
+ return default_metrics
+
+ @cached_property
+ def observation(self) -> Observation:
+ """
+ Spark `Observation` which can be attached to a `DataFrame` to track summary metrics. Metrics will be collected
+ when the 1st action is triggered on the attached `DataFrame`. Subsequent operations on the attached `DataFrame`
+ will not update the observed metrics. See: [PySpark Observation](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Observation.html)
+ for complete documentation.
+
+ Returns:
+ A Spark `Observation` instance
+ """
+ return Observation(name=self.name)
+
+ def set_column_names(self, error_column_name: str, warning_column_name: str) -> None:
+ """
+ Sets the default column names (e.g. `_errors` and `_warnings`) for monitoring summary metrics.
+
+ Args:
+ error_column_name: Error column name
+ warning_column_name: Warning column name
+ """
+ self._error_column_name = error_column_name
+ self._warning_column_name = warning_column_name
+
+ @staticmethod
+ def build_metrics_df(spark: SparkSession, observation: DQMetricsObservation) -> DataFrame:
+ """
+ Builds a Spark `DataFrame` from a `DQMetricsObservation`.
+
+ Args:
+ spark: `SparkSession` used to create the `DataFrame`
+ observation: `DQMetricsObservation` with summary metrics
+
+ Returns:
+ A Spark `DataFrame` with summary metrics
+ """
+
+ if not observation.observed_metrics:
+ return spark.createDataFrame([], schema=OBSERVATION_TABLE_SCHEMA)
+
+ return spark.createDataFrame(
+ [
+ [
+ observation.observer_name,
+ observation.input_location,
+ observation.output_location,
+ observation.quarantine_location,
+ observation.checks_location,
+ metric_key,
+ metric_value,
+ observation.run_time,
+ observation.error_column_name,
+ observation.warning_column_name,
+ observation.user_metadata if observation.user_metadata else None,
+ ]
+ for metric_key, metric_value in observation.observed_metrics.items()
+ ],
+ schema=OBSERVATION_TABLE_SCHEMA,
+ )
diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py
index 86dfcb1e1..8fce9081c 100644
--- a/tests/integration/conftest.py
+++ b/tests/integration/conftest.py
@@ -92,6 +92,50 @@ def delete(resource) -> None:
@pytest.fixture
+def setup_workflows_with_metrics(ws, spark, installation_ctx, make_schema, make_table, make_random):
+ """Set up workflows with metrics configuration for testing."""
+
+ def create(_spark, **kwargs):
+ installation_ctx.installation_service.run()
+
+ run_config = _setup_workflows_deps(
+ installation_ctx,
+ make_schema,
+ make_table,
+ make_random,
+ checks_location=None,
+ quarantine=kwargs.get("quarantine", False),
+ )
+
+ if kwargs.get("metrics"):
+ catalog_name = "main"
+ schema_name = run_config.output_config.location.split('.')[1]
+ metrics_table_name = f"{catalog_name}.{schema_name}.metrics_{make_random(6).lower()}"
+ run_config.metrics_config = OutputConfig(location=metrics_table_name)
+
+ custom_metrics = kwargs.get("custom_metrics")
+ if custom_metrics:
+ config = installation_ctx.config
+ config.custom_metrics = custom_metrics
+ installation_ctx.installation.save(config)
+
+ checks_location = _setup_quality_checks(installation_ctx, _spark, ws)
+ run_config.checks_location = checks_location
+ installation_ctx.installation.save(installation_ctx.config)
+
+ return installation_ctx, run_config
+
+ def delete(resource):
+ ctx, run_config = resource
+ checks_location = f"{ctx.installation.install_folder()}/{run_config.checks_location}"
+ try:
+ ws.workspace.delete(checks_location)
+ except Exception:
+ pass
+
+ yield from factory("workflows_with_metrics", lambda **kw: create(spark, **kw), delete)
+
+
def setup_workflows_with_custom_folder(
ws, spark, installation_ctx_custom_install_folder, make_schema, make_table, make_random
):
diff --git a/tests/integration/test_metrics_workflow.py b/tests/integration/test_metrics_workflow.py
new file mode 100644
index 000000000..941c9e2dc
--- /dev/null
+++ b/tests/integration/test_metrics_workflow.py
@@ -0,0 +1,382 @@
+from databricks.labs.dqx.metrics_observer import OBSERVATION_TABLE_SCHEMA
+from chispa import assert_df_equality
+
+
+def test_quality_checker_workflow_with_metrics(spark, setup_workflows_with_metrics):
+ """Test that quality checker workflow saves metrics when configured."""
+ ctx, run_config = setup_workflows_with_metrics(metrics=True)
+
+ ctx.deployed_workflows.run_workflow("quality-checker", run_config.name)
+ output_df = spark.table(run_config.output_config.location)
+ output_count = output_df.count()
+
+ expected_metrics = [
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "input_row_count",
+ "metric_value": str(output_count),
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "error_row_count",
+ "metric_value": "0",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "warning_row_count",
+ "metric_value": "0",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "valid_row_count",
+ "metric_value": str(output_count),
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ ]
+
+ expected_metrics_df = spark.createDataFrame(expected_metrics, schema=OBSERVATION_TABLE_SCHEMA).drop("run_ts")
+ actual_metrics_df = spark.table(run_config.metrics_config.location).drop("run_ts")
+ assert_df_equality(expected_metrics_df, actual_metrics_df)
+
+
+def test_quality_checker_workflow_with_quarantine_and_metrics(spark, setup_workflows_with_metrics):
+ """Test workflow with both quarantine and metrics configurations."""
+ ctx, run_config = setup_workflows_with_metrics(
+ quarantine=True, metrics=True, custom_metrics=["count(1) as total_ids"]
+ )
+
+ ctx.deployed_workflows.run_workflow("quality-checker", run_config.name)
+ output_df = spark.table(run_config.output_config.location)
+ output_count = output_df.count()
+ quarantine_df = spark.table(run_config.quarantine_config.location)
+ quarantine_count = quarantine_df.count()
+
+ expected_metrics = [
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": run_config.quarantine_config.location,
+ "checks_location": None,
+ "metric_name": "input_row_count",
+ "metric_value": str(output_count + quarantine_count),
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": run_config.quarantine_config.location,
+ "checks_location": None,
+ "metric_name": "error_row_count",
+ "metric_value": str(quarantine_count),
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": run_config.quarantine_config.location,
+ "checks_location": None,
+ "metric_name": "warning_row_count",
+ "metric_value": "0",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": run_config.quarantine_config.location,
+ "checks_location": None,
+ "metric_name": "valid_row_count",
+ "metric_value": str(output_count),
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": run_config.quarantine_config.location,
+ "checks_location": None,
+ "metric_name": "total_ids",
+ "metric_value": "4",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ ]
+
+ expected_metrics_df = spark.createDataFrame(expected_metrics, schema=OBSERVATION_TABLE_SCHEMA).drop("run_ts")
+ actual_metrics_df = spark.table(run_config.metrics_config.location).drop("run_ts")
+ assert_df_equality(expected_metrics_df, actual_metrics_df)
+
+
+def test_e2e_workflow_with_metrics(spark, setup_workflows_with_metrics):
+ """Test that e2e workflow generates checks and applies them with metrics."""
+ ctx, run_config = setup_workflows_with_metrics(
+ metrics=True, custom_metrics=["max(id) as max_id", "min(id) as min_id"]
+ )
+
+ ctx.deployed_workflows.run_workflow("e2e", run_config.name)
+ output_df = spark.table(run_config.output_config.location)
+ output_count = output_df.count()
+
+ expected_metrics = [
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "input_row_count",
+ "metric_value": str(output_count),
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "error_row_count",
+ "metric_value": "0",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "warning_row_count",
+ "metric_value": "0",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "valid_row_count",
+ "metric_value": str(output_count),
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "max_id",
+ "metric_value": "1",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "min_id",
+ "metric_value": "0",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ ]
+
+ expected_metrics_df = spark.createDataFrame(expected_metrics, schema=OBSERVATION_TABLE_SCHEMA).drop("run_ts")
+ actual_metrics_df = spark.table(run_config.metrics_config.location).drop("run_ts")
+ assert_df_equality(expected_metrics_df, actual_metrics_df)
+
+
+def test_custom_metrics_in_workflow(spark, setup_workflows_with_metrics):
+ """Test workflow with custom metrics."""
+ custom_metrics = [
+ "avg(id) as average_id",
+ "sum(id) as total_id",
+ "count(1) as total_ids",
+ "max(id) - min(id) as id_range",
+ ]
+
+ ctx, run_config = setup_workflows_with_metrics(metrics=True, custom_metrics=custom_metrics)
+
+ expected_metrics = [
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "input_row_count",
+ "metric_value": "0",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "error_row_count",
+ "metric_value": "0",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "warning_row_count",
+ "metric_value": "0",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "valid_row_count",
+ "metric_value": "0",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "average_id",
+ "metric_value": "0",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "total_id",
+ "metric_value": "0",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "total_ids",
+ "metric_value": "1",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "dqx",
+ "input_location": run_config.input_config.location,
+ "output_location": run_config.output_config.location,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "id_range",
+ "metric_value": "0",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ ]
+
+ ctx.deployed_workflows.run_workflow("quality-checker", run_config.name)
+
+ expected_metrics_df = spark.createDataFrame(expected_metrics, schema=OBSERVATION_TABLE_SCHEMA).drop("run_ts")
+ actual_metrics_df = spark.table(run_config.metrics_config.location).drop("run_ts")
+ assert_df_equality(expected_metrics_df, actual_metrics_df)
+
+
+def test_quality_checker_workflow_without_metrics_config(setup_workflows_with_metrics):
+ """Test that workflow works normally when metrics config is not provided."""
+ _, run_config = setup_workflows_with_metrics(metrics=False)
+ assert run_config.metrics_config is None
diff --git a/tests/integration/test_save_and_load_checks_from_table.py b/tests/integration/test_save_and_load_checks_from_table.py
index aeb771d1c..e8cf0e346 100644
--- a/tests/integration/test_save_and_load_checks_from_table.py
+++ b/tests/integration/test_save_and_load_checks_from_table.py
@@ -430,6 +430,8 @@ def test_save_load_checks_from_table_in_user_installation(ws, installation_ctx,
class ChecksDummyStorageConfig(BaseChecksStorageConfig):
"""Dummy storage config for testing unsupported storage type."""
+ location: str = "test_location"
+
def test_load_checks_invalid_storage_config(ws, spark):
engine = DQEngine(ws, spark)
diff --git a/tests/integration/test_summary_metrics.py b/tests/integration/test_summary_metrics.py
new file mode 100644
index 000000000..7f70693da
--- /dev/null
+++ b/tests/integration/test_summary_metrics.py
@@ -0,0 +1,841 @@
+from datetime import datetime, timezone
+from pyspark.sql.types import StructType, StructField, IntegerType, StringType
+
+from databricks.labs.dqx.config import InputConfig, OutputConfig, ExtraParams
+from databricks.labs.dqx.engine import DQEngine
+from databricks.labs.dqx.metrics_observer import DQMetricsObserver, OBSERVATION_TABLE_SCHEMA
+from chispa import assert_df_equality
+
+
+RUN_TIME = datetime(2025, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc)
+EXTRA_PARAMS = ExtraParams(run_time=RUN_TIME.isoformat())
+TEST_SCHEMA = StructType(
+ [
+ StructField("id", IntegerType(), True),
+ StructField("name", StringType(), True),
+ StructField("age", IntegerType(), True),
+ StructField("salary", IntegerType(), True),
+ ]
+)
+TEST_CHECKS = [
+ {
+ "name": "id_is_not_null",
+ "criticality": "error",
+ "check": {"function": "is_not_null", "arguments": {"column": "id"}},
+ },
+ {
+ "name": "name_is_not_null_and_not_empty",
+ "criticality": "warn",
+ "check": {"function": "is_not_null_and_not_empty", "arguments": {"column": "name"}},
+ },
+]
+
+
+def test_observer_metrics_before_action(ws, spark):
+ """Test that summary metrics are empty before running a Spark action."""
+ observer = DQMetricsObserver(name="test_observer")
+ dq_engine = DQEngine(workspace_client=ws, spark=spark, observer=observer, extra_params=EXTRA_PARAMS)
+
+ test_df = spark.createDataFrame(
+ [
+ [1, "Alice", 30, 50000],
+ [2, "Bob", 25, 45000],
+ [None, "Charlie", 35, 60000], # This will trigger an error
+ [4, None, 28, 55000], # This will trigger a warning
+ ],
+ TEST_SCHEMA,
+ )
+ _, observation = dq_engine.apply_checks_by_metadata(test_df, TEST_CHECKS)
+
+ actual_metrics = observation.get
+ assert actual_metrics == {}
+
+
+def test_observer_metrics(ws, spark):
+ """Test that summary metrics can be accessed after running a Spark action like df.count()."""
+ observer = DQMetricsObserver(name="test_observer")
+ dq_engine = DQEngine(workspace_client=ws, spark=spark, observer=observer, extra_params=EXTRA_PARAMS)
+
+ test_df = spark.createDataFrame(
+ [
+ [1, "Alice", 30, 50000],
+ [2, "Bob", 25, 45000],
+ [None, "Charlie", 35, 60000],
+ [4, None, 28, 55000],
+ ],
+ TEST_SCHEMA,
+ )
+ checked_df, observation = dq_engine.apply_checks_by_metadata(test_df, TEST_CHECKS)
+ checked_df.count() # Trigger an action to get the metrics
+
+ expected_metrics = {
+ "input_row_count": 4,
+ "error_row_count": 1,
+ "warning_row_count": 1,
+ "valid_row_count": 2,
+ }
+ actual_metrics = observation.get
+ assert actual_metrics == expected_metrics
+
+
+def test_observer_custom_metrics(ws, spark):
+ """Test that summary metrics can be accessed after running a Spark action like df.count()."""
+ custom_metrics = [
+ "avg(case when _errors is not null then age else null end) as avg_error_age",
+ "sum(case when _warnings is not null then salary else null end) as total_warning_salary",
+ ]
+ observer = DQMetricsObserver(name="test_observer", custom_metrics=custom_metrics)
+ dq_engine = DQEngine(workspace_client=ws, spark=spark, observer=observer, extra_params=EXTRA_PARAMS)
+
+ test_df = spark.createDataFrame(
+ [
+ [1, "Alice", 30, 50000],
+ [2, "Bob", 25, 45000],
+ [None, "Charlie", 35, 60000],
+ [4, None, 28, 55000],
+ ],
+ TEST_SCHEMA,
+ )
+ checked_df, observation = dq_engine.apply_checks_by_metadata(test_df, TEST_CHECKS)
+ checked_df.count() # Trigger an action to get the metrics
+
+ expected_metrics = {
+ "input_row_count": 4,
+ "error_row_count": 1,
+ "warning_row_count": 1,
+ "valid_row_count": 2,
+ "avg_error_age": 35.0,
+ "total_warning_salary": 55000,
+ }
+ actual_metrics = observation.get
+ assert actual_metrics == expected_metrics
+
+
+def test_observer_metrics_output(ws, spark, make_schema, make_random):
+ """Test that summary metrics are written to the table defined in metrics_config."""
+ catalog_name = "main"
+ schema = make_schema(catalog_name=catalog_name)
+ input_table_name = f"{catalog_name}.{schema.name}.input_{make_random(6).lower()}"
+ output_table_name = f"{catalog_name}.{schema.name}.output_{make_random(6).lower()}"
+ metrics_table_name = f"{catalog_name}.{schema.name}.metrics_{make_random(6).lower()}"
+
+ custom_metrics = [
+ "avg(case when _errors is not null then age else null end) as avg_error_age",
+ "sum(case when _warnings is not null then salary else null end) as total_warning_salary",
+ ]
+ observer = DQMetricsObserver(name="test_observer", custom_metrics=custom_metrics)
+ dq_engine = DQEngine(workspace_client=ws, spark=spark, observer=observer, extra_params=EXTRA_PARAMS)
+
+ test_df = spark.createDataFrame(
+ [
+ [1, "Alice", 30, 50000],
+ [2, "Bob", 25, 45000],
+ [None, "Charlie", 35, 60000],
+ [4, None, 28, 55000],
+ ],
+ TEST_SCHEMA,
+ )
+
+ test_df.write.saveAsTable(input_table_name)
+ input_config = InputConfig(location=input_table_name)
+ output_config = OutputConfig(location=output_table_name)
+ metrics_config = OutputConfig(location=metrics_table_name)
+
+ dq_engine.apply_checks_by_metadata_and_save_in_table(
+ checks=TEST_CHECKS, input_config=input_config, output_config=output_config, metrics_config=metrics_config
+ )
+
+ expected_metrics = [
+ {
+ "run_name": "test_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "input_row_count",
+ "metric_value": "4",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "error_row_count",
+ "metric_value": "1",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "warning_row_count",
+ "metric_value": "1",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "valid_row_count",
+ "metric_value": "2",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "avg_error_age",
+ "metric_value": "35.0",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "total_warning_salary",
+ "metric_value": "55000",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ ]
+
+ expected_metrics_df = spark.createDataFrame(expected_metrics, schema=OBSERVATION_TABLE_SCHEMA).drop("run_ts")
+ actual_metrics_df = spark.table(metrics_table_name).drop("run_ts")
+ assert_df_equality(expected_metrics_df, actual_metrics_df)
+
+
+def test_observer_metrics_output_with_quarantine(ws, spark, make_schema, make_random):
+ """Test that metrics work correctly when using both quarantine and metrics configs."""
+ catalog_name = "main"
+ schema = make_schema(catalog_name=catalog_name)
+ input_table_name = f"{catalog_name}.{schema.name}.input_{make_random(6).lower()}"
+ output_table_name = f"{catalog_name}.{schema.name}.output_{make_random(6).lower()}"
+ quarantine_table_name = f"{catalog_name}.{schema.name}.quarantine_{make_random(6).lower()}"
+ metrics_table_name = f"{catalog_name}.{schema.name}.metrics_{make_random(6).lower()}"
+
+ custom_metrics = [
+ "avg(case when _errors is not null then age else null end) as avg_error_age",
+ "sum(case when _warnings is not null then salary else null end) as total_warning_salary",
+ ]
+ observer = DQMetricsObserver(name="test_observer", custom_metrics=custom_metrics)
+ dq_engine = DQEngine(workspace_client=ws, spark=spark, observer=observer, extra_params=EXTRA_PARAMS)
+
+ test_df = spark.createDataFrame(
+ [
+ [1, "Alice", 30, 50000],
+ [2, "Bob", 25, 45000],
+ [None, "Charlie", 35, 60000],
+ [4, None, 28, 55000],
+ ],
+ TEST_SCHEMA,
+ )
+
+ test_df.write.saveAsTable(input_table_name)
+ input_config = InputConfig(location=input_table_name)
+ output_config = OutputConfig(location=output_table_name)
+ quarantine_config = OutputConfig(location=quarantine_table_name)
+ metrics_config = OutputConfig(location=metrics_table_name)
+
+ dq_engine.apply_checks_by_metadata_and_save_in_table(
+ checks=TEST_CHECKS,
+ input_config=input_config,
+ output_config=output_config,
+ quarantine_config=quarantine_config,
+ metrics_config=metrics_config,
+ )
+
+ expected_metrics = [
+ {
+ "run_name": "test_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": quarantine_table_name,
+ "checks_location": None,
+ "metric_name": "input_row_count",
+ "metric_value": "4",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": quarantine_table_name,
+ "checks_location": None,
+ "metric_name": "error_row_count",
+ "metric_value": "1",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": quarantine_table_name,
+ "checks_location": None,
+ "metric_name": "warning_row_count",
+ "metric_value": "1",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": quarantine_table_name,
+ "checks_location": None,
+ "metric_name": "valid_row_count",
+ "metric_value": "2",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": quarantine_table_name,
+ "checks_location": None,
+ "metric_name": "avg_error_age",
+ "metric_value": "35.0",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": quarantine_table_name,
+ "checks_location": None,
+ "metric_name": "total_warning_salary",
+ "metric_value": "55000",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ ]
+
+ expected_metrics_df = spark.createDataFrame(expected_metrics, schema=OBSERVATION_TABLE_SCHEMA).drop("run_ts")
+ actual_metrics_df = spark.table(metrics_table_name).drop("run_ts")
+ assert_df_equality(expected_metrics_df, actual_metrics_df)
+
+
+def test_engine_without_observer_no_metrics_saved(ws, spark, make_schema, make_random):
+ """Test that no metrics are saved when observer is not configured."""
+ catalog_name = "main"
+ schema = make_schema(catalog_name=catalog_name)
+ input_table_name = f"{catalog_name}.{schema.name}.input_{make_random(6).lower()}"
+ output_table_name = f"{catalog_name}.{schema.name}.output_{make_random(6).lower()}"
+ metrics_table_name = f"{catalog_name}.{schema.name}.metrics_{make_random(6).lower()}"
+
+ dq_engine = DQEngine(workspace_client=ws, spark=spark, extra_params=EXTRA_PARAMS)
+
+ test_df = spark.createDataFrame(
+ [
+ [1, "Alice", 30, 50000],
+ [2, "Bob", 25, 45000],
+ ],
+ TEST_SCHEMA,
+ )
+
+ test_df.write.saveAsTable(input_table_name)
+
+ input_config = InputConfig(location=input_table_name)
+ output_config = OutputConfig(location=output_table_name)
+ metrics_config = OutputConfig(location=metrics_table_name)
+
+ dq_engine.apply_checks_by_metadata_and_save_in_table(
+ checks=TEST_CHECKS, input_config=input_config, output_config=output_config, metrics_config=metrics_config
+ )
+
+ assert not ws.tables.exists(metrics_table_name).table_exists
+
+
+def test_streaming_observer_metrics_output(ws, spark, make_schema, make_random, make_volume):
+ """Test that summary metrics are written to the table when using streaming with metrics_config."""
+ catalog_name = "main"
+ schema = make_schema(catalog_name=catalog_name)
+ input_table_name = f"{catalog_name}.{schema.name}.input_{make_random(6).lower()}"
+ output_table_name = f"{catalog_name}.{schema.name}.output_{make_random(6).lower()}"
+ metrics_table_name = f"{catalog_name}.{schema.name}.metrics_{make_random(6).lower()}"
+ volume = make_volume(catalog_name=catalog_name, schema_name=schema.name)
+ checkpoint_location = f"/Volumes/{volume.catalog_name}/{volume.schema_name}/{volume.name}/{make_random(8).lower()}"
+
+ custom_metrics = [
+ "avg(case when _errors is not null then age else null end) as avg_error_age",
+ "sum(case when _warnings is not null then salary else null end) as total_warning_salary",
+ ]
+ observer = DQMetricsObserver(name="test_streaming_observer", custom_metrics=custom_metrics)
+ dq_engine = DQEngine(workspace_client=ws, spark=spark, observer=observer, extra_params=EXTRA_PARAMS)
+
+ test_df = spark.createDataFrame(
+ [
+ [1, "Alice", 30, 50000],
+ [2, "Bob", 25, 45000],
+ [None, "Charlie", 35, 60000],
+ [4, None, 28, 55000],
+ ],
+ TEST_SCHEMA,
+ )
+
+ test_df.write.saveAsTable(input_table_name)
+ input_config = InputConfig(location=input_table_name, is_streaming=True)
+ output_config = OutputConfig(
+ location=output_table_name, options={"checkPointLocation": checkpoint_location}, trigger={"availableNow": True}
+ )
+ metrics_config = OutputConfig(location=metrics_table_name)
+
+ dq_engine.apply_checks_by_metadata_and_save_in_table(
+ checks=TEST_CHECKS, input_config=input_config, output_config=output_config, metrics_config=metrics_config
+ )
+
+ expected_metrics = [
+ {
+ "run_name": "test_streaming_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "input_row_count",
+ "metric_value": "4",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_streaming_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "error_row_count",
+ "metric_value": "1",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_streaming_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "warning_row_count",
+ "metric_value": "1",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_streaming_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "valid_row_count",
+ "metric_value": "2",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_streaming_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "avg_error_age",
+ "metric_value": "35.0",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_streaming_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "total_warning_salary",
+ "metric_value": "55000",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ ]
+
+ expected_metrics_df = spark.createDataFrame(expected_metrics, schema=OBSERVATION_TABLE_SCHEMA).drop("run_ts")
+ actual_metrics_df = spark.table(metrics_table_name).drop("run_ts")
+ assert_df_equality(expected_metrics_df, actual_metrics_df)
+
+
+def test_engine_streaming_observer_metrics_output_with_quarantine(ws, spark, make_schema, make_random, make_volume):
+ """Test that streaming metrics work correctly when using both quarantine and metrics configs."""
+ catalog_name = "main"
+ schema = make_schema(catalog_name=catalog_name)
+ input_table_name = f"{catalog_name}.{schema.name}.input_{make_random(6).lower()}"
+ output_table_name = f"{catalog_name}.{schema.name}.output_{make_random(6).lower()}"
+ quarantine_table_name = f"{catalog_name}.{schema.name}.quarantine_{make_random(6).lower()}"
+ metrics_table_name = f"{catalog_name}.{schema.name}.metrics_{make_random(6).lower()}"
+ volume = make_volume(catalog_name=catalog_name, schema_name=schema.name)
+
+ custom_metrics = [
+ "avg(case when _errors is not null then age else null end) as avg_error_age",
+ "sum(case when _warnings is not null then salary else null end) as total_warning_salary",
+ ]
+ observer = DQMetricsObserver(name="test_streaming_quarantine_observer", custom_metrics=custom_metrics)
+ dq_engine = DQEngine(workspace_client=ws, spark=spark, observer=observer, extra_params=EXTRA_PARAMS)
+
+ test_df = spark.createDataFrame(
+ [
+ [1, "Alice", 30, 50000],
+ [2, "Bob", 25, 45000],
+ [None, "Charlie", 35, 60000],
+ [4, None, 28, 55000],
+ ],
+ TEST_SCHEMA,
+ )
+
+ test_df.write.saveAsTable(input_table_name)
+ input_config = InputConfig(location=input_table_name, is_streaming=True)
+ output_config = OutputConfig(
+ location=output_table_name,
+ options={"checkPointLocation": f"/Volumes/{volume.catalog_name}/{volume.schema_name}/{volume.name}/output"},
+ trigger={"availableNow": True},
+ )
+ quarantine_config = OutputConfig(
+ location=quarantine_table_name,
+ options={"checkPointLocation": f"/Volumes/{volume.catalog_name}/{volume.schema_name}/{volume.name}/quarantine"},
+ trigger={"availableNow": True},
+ )
+ metrics_config = OutputConfig(location=metrics_table_name)
+
+ dq_engine.apply_checks_by_metadata_and_save_in_table(
+ checks=TEST_CHECKS,
+ input_config=input_config,
+ output_config=output_config,
+ quarantine_config=quarantine_config,
+ metrics_config=metrics_config,
+ )
+
+ expected_metrics = [
+ {
+ "run_name": "test_streaming_quarantine_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": quarantine_table_name,
+ "checks_location": None,
+ "metric_name": "input_row_count",
+ "metric_value": "4",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_streaming_quarantine_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": quarantine_table_name,
+ "checks_location": None,
+ "metric_name": "error_row_count",
+ "metric_value": "1",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_streaming_quarantine_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": quarantine_table_name,
+ "checks_location": None,
+ "metric_name": "warning_row_count",
+ "metric_value": "1",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_streaming_quarantine_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": quarantine_table_name,
+ "checks_location": None,
+ "metric_name": "valid_row_count",
+ "metric_value": "2",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_streaming_quarantine_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": quarantine_table_name,
+ "checks_location": None,
+ "metric_name": "avg_error_age",
+ "metric_value": "35.0",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_streaming_quarantine_observer",
+ "input_location": input_table_name,
+ "output_location": output_table_name,
+ "quarantine_location": quarantine_table_name,
+ "checks_location": None,
+ "metric_name": "total_warning_salary",
+ "metric_value": "55000",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ ]
+
+ expected_metrics_df = spark.createDataFrame(expected_metrics, schema=OBSERVATION_TABLE_SCHEMA).drop("run_ts")
+ actual_metrics_df = spark.table(metrics_table_name).drop("run_ts")
+ assert_df_equality(expected_metrics_df, actual_metrics_df)
+
+
+def test_save_results_in_table_batch_with_metrics(ws, spark, make_schema, make_random):
+ """Test save_results_in_table method with metrics configured for batch processing."""
+ catalog_name = "main"
+ schema = make_schema(catalog_name=catalog_name)
+ output_table_name = f"{catalog_name}.{schema.name}.output_{make_random(6).lower()}"
+ quarantine_table_name = f"{catalog_name}.{schema.name}.quarantine_{make_random(6).lower()}"
+ metrics_table_name = f"{catalog_name}.{schema.name}.metrics_{make_random(6).lower()}"
+
+ observer = DQMetricsObserver(name="test_save_batch_observer")
+ dq_engine = DQEngine(workspace_client=ws, spark=spark, observer=observer, extra_params=EXTRA_PARAMS)
+
+ test_df = spark.createDataFrame(
+ [
+ [1, "Alice", 30, 50000],
+ [2, "Bob", 25, 45000],
+ [None, "Charlie", 35, 60000], # This will trigger an error
+ [4, None, 28, 55000], # This will trigger a warning
+ ],
+ TEST_SCHEMA,
+ )
+ output_df, quarantine_df, observation = dq_engine.apply_checks_by_metadata_and_split(test_df, TEST_CHECKS)
+
+ output_config = OutputConfig(location=output_table_name)
+ quarantine_config = OutputConfig(location=quarantine_table_name)
+ metrics_config = OutputConfig(location=metrics_table_name)
+
+ dq_engine.save_results_in_table(
+ output_df=output_df,
+ quarantine_df=quarantine_df,
+ observation=observation,
+ output_config=output_config,
+ quarantine_config=quarantine_config,
+ metrics_config=metrics_config,
+ )
+
+ expected_metrics = [
+ {
+ "run_name": "test_save_batch_observer",
+ "input_location": None,
+ "output_location": output_table_name,
+ "quarantine_location": quarantine_table_name,
+ "checks_location": None,
+ "metric_name": "input_row_count",
+ "metric_value": "4",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_save_batch_observer",
+ "input_location": None,
+ "output_location": output_table_name,
+ "quarantine_location": quarantine_table_name,
+ "checks_location": None,
+ "metric_name": "error_row_count",
+ "metric_value": "1",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_save_batch_observer",
+ "input_location": None,
+ "output_location": output_table_name,
+ "quarantine_location": quarantine_table_name,
+ "checks_location": None,
+ "metric_name": "warning_row_count",
+ "metric_value": "1",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_save_batch_observer",
+ "input_location": None,
+ "output_location": output_table_name,
+ "quarantine_location": quarantine_table_name,
+ "checks_location": None,
+ "metric_name": "valid_row_count",
+ "metric_value": "2",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ ]
+
+ expected_metrics_df = spark.createDataFrame(expected_metrics, schema=OBSERVATION_TABLE_SCHEMA).drop("run_ts")
+ actual_metrics_df = spark.table(metrics_table_name).drop("run_ts")
+ assert_df_equality(expected_metrics_df, actual_metrics_df)
+
+
+def test_save_results_in_table_streaming_with_metrics(ws, spark, make_schema, make_random, make_volume):
+ """Test save_results_in_table method with metrics configured for streaming processing."""
+ catalog_name = "main"
+ schema = make_schema(catalog_name=catalog_name)
+ input_table_name = f"{catalog_name}.{schema.name}.input_{make_random(6).lower()}"
+ output_table_name = f"{catalog_name}.{schema.name}.output_{make_random(6).lower()}"
+ metrics_table_name = f"{catalog_name}.{schema.name}.metrics_{make_random(6).lower()}"
+ volume = make_volume(catalog_name=catalog_name, schema_name=schema.name)
+
+ observer = DQMetricsObserver(name="test_save_streaming_observer")
+ dq_engine = DQEngine(workspace_client=ws, spark=spark, observer=observer, extra_params=EXTRA_PARAMS)
+
+ test_df = spark.createDataFrame(
+ [
+ [1, "Alice", 30, 50000],
+ [2, "Bob", 25, 45000],
+ [None, "Charlie", 35, 60000], # This will trigger an error
+ [4, None, 28, 55000], # This will trigger a warning
+ ],
+ TEST_SCHEMA,
+ )
+ test_df.write.saveAsTable(input_table_name)
+ streaming_df = spark.readStream.table(input_table_name)
+ checked_df, observation = dq_engine.apply_checks_by_metadata(streaming_df, TEST_CHECKS)
+
+ output_config = OutputConfig(
+ location=output_table_name,
+ options={"checkPointLocation": f"/Volumes/{volume.catalog_name}/{volume.schema_name}/{volume.name}/output"},
+ trigger={"availableNow": True},
+ )
+ metrics_config = OutputConfig(location=metrics_table_name)
+
+ dq_engine.save_results_in_table(
+ output_df=checked_df,
+ observation=observation,
+ output_config=output_config,
+ metrics_config=metrics_config,
+ )
+
+ expected_metrics = [
+ {
+ "run_name": "test_save_streaming_observer",
+ "input_location": None,
+ "output_location": output_table_name,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "input_row_count",
+ "metric_value": "4",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_save_streaming_observer",
+ "input_location": None,
+ "output_location": output_table_name,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "error_row_count",
+ "metric_value": "1",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_save_streaming_observer",
+ "input_location": None,
+ "output_location": output_table_name,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "warning_row_count",
+ "metric_value": "1",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ {
+ "run_name": "test_save_streaming_observer",
+ "input_location": None,
+ "output_location": output_table_name,
+ "quarantine_location": None,
+ "checks_location": None,
+ "metric_name": "valid_row_count",
+ "metric_value": "2",
+ "run_ts": None, # Will be set at runtime
+ "error_column_name": "_errors",
+ "warning_column_name": "_warnings",
+ "user_metadata": None,
+ },
+ ]
+
+ expected_metrics_df = spark.createDataFrame(expected_metrics, schema=OBSERVATION_TABLE_SCHEMA).drop("run_ts")
+ actual_metrics_df = spark.table(metrics_table_name).drop("run_ts")
+ assert_df_equality(expected_metrics_df, actual_metrics_df)
diff --git a/tests/unit/test_observer.py b/tests/unit/test_observer.py
new file mode 100644
index 000000000..3dfb5b4ad
--- /dev/null
+++ b/tests/unit/test_observer.py
@@ -0,0 +1,75 @@
+"""Unit tests for DQObserver class."""
+
+from pyspark.sql import Observation
+from pyspark.sql.connect.observation import Observation as SparkConnectObservation
+from databricks.labs.dqx.metrics_observer import DQMetricsObserver
+from databricks.labs.dqx.rule import DefaultColumnNames
+
+
+def test_dq_observer_default_initialization():
+ """Test DQObserver default initialization."""
+ observer = DQMetricsObserver()
+ assert observer.name == "dqx"
+ assert observer.custom_metrics is None
+
+ expected_default_metrics = [
+ "count(1) as input_row_count",
+ "count(case when _errors is not null then 1 end) as error_row_count",
+ "count(case when _warnings is not null then 1 end) as warning_row_count",
+ "count(case when _errors is null and _warnings is null then 1 end) as valid_row_count",
+ ]
+ assert observer.metrics == expected_default_metrics
+
+
+def test_dq_observer_with_custom_metrics():
+ """Test DQObserver with custom metrics."""
+ custom_metrics = ["avg(age) as avg_age", "count(case when age > 65 then 1 end) as senior_count"]
+
+ observer = DQMetricsObserver(name="custom_observer", custom_metrics=custom_metrics)
+ assert observer.name == "custom_observer"
+ assert observer.custom_metrics == custom_metrics
+
+ expected_metrics = [
+ "count(1) as input_row_count",
+ "count(case when _errors is not null then 1 end) as error_row_count",
+ "count(case when _warnings is not null then 1 end) as warning_row_count",
+ "count(case when _errors is null and _warnings is null then 1 end) as valid_row_count",
+ "avg(age) as avg_age",
+ "count(case when age > 65 then 1 end) as senior_count",
+ ]
+ assert observer.metrics == expected_metrics
+
+
+def test_dq_observer_empty_custom_metrics():
+ """Test DQObserver with empty custom metrics list."""
+ observer = DQMetricsObserver(custom_metrics=[])
+
+ expected_default_metrics = [
+ "count(1) as input_row_count",
+ "count(case when _errors is not null then 1 end) as error_row_count",
+ "count(case when _warnings is not null then 1 end) as warning_row_count",
+ "count(case when _errors is null and _warnings is null then 1 end) as valid_row_count",
+ ]
+ assert observer.metrics == expected_default_metrics
+
+
+def test_dq_observer_default_column_names():
+ """Test that DQObserver uses correct default column names."""
+ observer = DQMetricsObserver()
+ errors_column = DefaultColumnNames.ERRORS.value
+ warnings_column = DefaultColumnNames.WARNINGS.value
+
+ assert f"count(case when {errors_column} is not null then 1 end) as error_row_count" in observer.metrics
+ assert f"count(case when {warnings_column} is not null then 1 end) as warning_row_count" in observer.metrics
+ assert (
+ f"count(case when {errors_column} is null and {warnings_column} is null then 1 end) as valid_row_count"
+ in observer.metrics
+ )
+
+
+def test_dq_observer_observation_property():
+ """Test that the observation property creates a Spark Observation."""
+ observer = DQMetricsObserver(name="test_obs")
+ observation = observer.observation
+ assert observation is not None
+ assert isinstance(observation, Observation | SparkConnectObservation)
diff --git a/tests/unit/test_storage_factory.py b/tests/unit/test_storage_factory.py
index 2f161700a..2652f5d56 100644
--- a/tests/unit/test_storage_factory.py
+++ b/tests/unit/test_storage_factory.py
@@ -56,7 +56,7 @@ def test_create_unsupported_config():
class UnsupportedConfig(BaseChecksStorageConfig):
pass
- config = UnsupportedConfig()
+ config = UnsupportedConfig(location="INVALID")
with pytest.raises(InvalidConfigError, match="Unsupported storage config type"):
STORAGE_FACTORY.create(config)