1- from typing import List
2- import pandas as pd
31import json
4- import os
5- import sqlite3
2+ from typing import Dict , List , Type
3+
4+ import pandas as pd
65
7- import metis .globals
6+ from metis .loader . csv_loader import CSVLoader
87from metis .metric import Metric
98from metis .utils .data_config import DataConfig
109from metis .utils .result import DQResult
11- from metis .loader .csv_loader import CSVLoader
12- from metis .writer .sqlite_writer import SQLiteWriter
13- from metis .writer .postgres_writer import PostgresWriter
1410from metis .writer .console_writer import ConsoleWriter
11+ from metis .writer .postgres_writer import PostgresWriter
12+ from metis .writer .sqlite_writer import SQLiteWriter
13+
1514
1615class DQOrchestrator :
17- def __init__ (self , writer_config = None ) -> None :
18- self .dataframes = {}
19- self .data_paths = {}
20- self .results = {} #TODO: Decide what to do with these in memory results
16+ def __init__ (self , writer_config_path : str | None = None ) -> None :
17+ self .dataframes : Dict [str , pd .DataFrame ] = {}
18+ self .reference_dataframes : Dict [str , pd .DataFrame ] = {}
19+ self .data_paths : Dict [str , str ] = {}
20+ self .results : Dict [str , DQResult ] = (
21+ {}
22+ ) # TODO: Decide what to do with these in memory results
2123
2224 self .writer = ConsoleWriter ({})
23- if writer_config :
24- with open (writer_config , 'r' ) as f :
25+ if writer_config_path :
26+ with open (writer_config_path , "r" ) as f :
2527 writer_config = json .load (f )
2628 if not "writer_name" in writer_config :
2729 raise ValueError ("Writer config must include 'writer_name' field." )
@@ -32,36 +34,46 @@ def __init__(self, writer_config=None) -> None:
3234
3335 def load (self , data_loader_configs : List [str ]) -> None :
3436 for config_path in data_loader_configs :
35- with open (config_path , 'r' ) as f :
37+ with open (config_path , "r" ) as f :
3638 config_data = json .load (f )
3739 config = DataConfig (config_data )
38- config . file_name = os . path . join ( metis . globals . data_root , config . file_name )
40+
3941 if config .loader == "CSV" :
4042 loader = CSVLoader ()
4143 dataframe = loader .load (config )
4244 self .dataframes [config .name ] = dataframe
4345 self .data_paths [config .name ] = config_path
4446
47+ if config .reference_file_name :
48+ reference_config = DataConfig (config_data )
49+ reference_config .file_name = config .reference_file_name
50+ reference_dataframe = loader .load (reference_config )
51+ self .reference_dataframes [config .name ] = reference_dataframe
4552 else :
46- raise ValueError (f"Unsupported loader type: { config_data .get ('loader' , None )} " )
47-
48- def assess (self , metrics : List [str ], metric_configs : List [str ]) -> None :
53+ raise ValueError (
54+ f"Unsupported loader type: { config_data .get ('loader' , None )} "
55+ )
56+
57+ def assess (self , metrics : List [str ], metric_configs : List [str | None ]) -> None :
4958 results = []
50-
59+
5160 for metric , metric_config in zip (metrics , metric_configs ):
52- metric_class = Metric .registry .get (metric )
61+ metric_class : Type [ Metric ] | None = Metric .registry .get (metric )
5362 if not metric_class :
5463 raise ValueError (f"Metric { metric } is not registered." )
55- metric_instance = metric_class ()
64+ metric_instance : Metric = metric_class ()
5665 for df_name , df in self .dataframes .items ():
57- incomplete_metric_results = metric_instance .assess (df , metric_config = metric_config ) #TODO: Add reference data support
66+ incomplete_metric_results = metric_instance .assess (
67+ data = df ,
68+ reference = self .reference_dataframes .get (df_name ),
69+ metric_config = metric_config ,
70+ )
5871 for result in incomplete_metric_results :
5972 result .tableName = df_name
6073 result .dataset = self .data_paths [df_name ]
6174 results .append (result )
6275
6376 self .writer .write (results )
6477
65-
66- def getDQResult (query : str ) -> List [DQResult ]:
67- pass
78+ def get_dq_result (self , query : str ) -> List [DQResult ]:
79+ return []
0 commit comments