Skip to content

Commit 9687025

Browse files
committed
add sqlalchemy and update writer classes
1 parent 8fe0deb commit 9687025

File tree

7 files changed

+87
-131
lines changed

7 files changed

+87
-131
lines changed

configs/writer/postgres.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@
66
"db_name": "metis_db",
77
"db_port": 5432,
88
"db_host": "localhost"
9-
}
9+
}

configs/writer/sqlite.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
"writer_name": "sqlite",
33
"table_name": "dqresults",
44
"db_name": "dq_repository/dq_repository.db"
5-
}
5+
}

metis/database_models.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from datetime import datetime
2+
from typing import List
3+
4+
from sqlalchemy import JSON, Double, func
5+
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
6+
7+
8+
class Base(DeclarativeBase):
9+
pass
10+
11+
def register_models(results_table_name: str):
12+
class DQResultModel(Base):
13+
__tablename__ = results_table_name
14+
__table_args__ = {"extend_existing": True}
15+
16+
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
17+
mes_time: Mapped[datetime] = mapped_column(server_default=func.now())
18+
dq_value: Mapped[float] = mapped_column(Double)
19+
dq_dimension: Mapped[str]
20+
dq_metric: Mapped[str]
21+
column_name: Mapped[List[str] | None] = mapped_column(JSON)
22+
row_index: Mapped[int | None]
23+
dq_annotations: Mapped[dict | None] = mapped_column(JSON)
24+
dataset: Mapped[str | None]
25+
table_name: Mapped[str | None]
26+
27+
return DQResultModel

metis/writer/database_writer.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from typing import Dict, List
2+
3+
from sqlalchemy import Engine
4+
from sqlalchemy.orm import Session
5+
6+
from metis.database_models import Base, register_models
7+
from metis.utils.result import DQResult
8+
from metis.writer.writer import DQResultWriter
9+
10+
11+
class DatabaseWriter(DQResultWriter):
12+
def __init__(self, writer_config: Dict) -> None:
13+
self.engine = self.create_engine(writer_config)
14+
15+
self.DQResultModel = register_models(writer_config.get("table_name", "dq_results"))
16+
Base.metadata.create_all(self.engine)
17+
18+
def create_engine(self, writer_config: Dict) -> Engine:
19+
raise NotImplementedError("Subclasses must implement the create_engine method.")
20+
21+
def write(self, results: List[DQResult]) -> None:
22+
with Session(self.engine) as session:
23+
db_entities = [
24+
self.DQResultModel(
25+
mes_time=result.mesTime.to_pydatetime(),
26+
dq_value=result.DQvalue,
27+
dq_dimension=result.DQdimension,
28+
dq_metric=result.DQmetric,
29+
column_name=result.columnNames,
30+
row_index=result.rowIndex,
31+
dq_annotations=result.DQannotations,
32+
dataset=result.dataset,
33+
table_name=result.tableName,
34+
)
35+
for result in results
36+
]
37+
session.add_all(db_entities)
38+
session.commit()

metis/writer/postgres_writer.py

Lines changed: 11 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,80 +1,17 @@
1-
import json
2-
import psycopg2
3-
from typing import List
1+
from sqlalchemy import Engine, create_engine
42

5-
from metis.writer.writer import DQResultWriter
6-
from metis.utils.result import DQResult
3+
from metis.writer.database_writer import DatabaseWriter
74

8-
class PostgresWriter(DQResultWriter):
9-
def __init__(self, writer_config) -> None:
5+
6+
class PostgresWriter(DatabaseWriter):
7+
def create_engine(self, writer_config) -> Engine:
108
required_keys = ("db_user", "db_pass", "db_name", "db_host", "db_port")
119
if not all(k in writer_config for k in required_keys):
12-
raise ValueError("Postgres writer config must include 'db_user', 'db_pass', 'db_name', 'db_host', and 'db_port' fields.")
13-
14-
self.table_name = writer_config.get("table_name")
15-
self.DB_USER = writer_config.get("db_user")
16-
self.DB_PASS = writer_config.get("db_pass")
17-
self.DB_NAME = writer_config.get("db_name")
18-
self.DB_HOST = writer_config.get("db_host")
19-
self.DB_PORT = writer_config.get("db_port")
20-
21-
conn = self.connect()
22-
self.create_db_schema(conn)
23-
conn.close()
10+
raise ValueError(
11+
"Postgres writer config must include 'db_user', 'db_pass', 'db_name', 'db_host', and 'db_port' fields."
12+
)
2413

25-
def connect(self):
26-
conn = psycopg2.connect(
27-
dbname=self.DB_NAME,
28-
user=self.DB_USER,
29-
password=self.DB_PASS,
30-
host=self.DB_HOST,
31-
port=self.DB_PORT
14+
return create_engine(
15+
f"postgresql://{writer_config['db_user']}:{writer_config['db_pass']}@{writer_config['db_host']}:{writer_config['db_port']}/{writer_config['db_name']}",
16+
echo=True,
3217
)
33-
return conn
34-
35-
def create_db_schema(self, conn):
36-
query = f"""
37-
CREATE TABLE IF NOT EXISTS {self.table_name} (
38-
id SERIAL PRIMARY KEY,
39-
mes_time TIMESTAMP WITH TIME ZONE NOT NULL,
40-
dq_value DOUBLE PRECISION NOT NULL,
41-
dq_dimension TEXT NOT NULL,
42-
dq_metric TEXT NOT NULL,
43-
column_name JSONB,
44-
row_index INTEGER,
45-
dq_annotations JSONB,
46-
dataset TEXT,
47-
table_name TEXT
48-
);
49-
"""
50-
try:
51-
cursor = conn.cursor()
52-
cursor.execute(query=query)
53-
conn.commit()
54-
cursor.close()
55-
except Exception as e:
56-
print(f'Error when saving or connecting to DB: {e}')
57-
58-
def write(self, results: List[DQResult]) -> None:
59-
conn = self.connect()
60-
cur = conn.cursor()
61-
62-
for result in results:
63-
print(f"Writing result: {result.as_json()}")
64-
cur.execute(f'''
65-
INSERT INTO {self.table_name} (mes_time, dq_value, dq_dimension, dq_metric, column_name, row_index, dq_annotations, dataset, table_name)
66-
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
67-
''', (
68-
result.mesTime.to_pydatetime(),
69-
result.DQvalue,
70-
result.DQdimension,
71-
result.DQmetric,
72-
json.dumps(result.columnNames),
73-
result.rowIndex,
74-
json.dumps(result.DQannotations),
75-
result.dataset,
76-
result.tableName
77-
))
78-
79-
conn.commit()
80-
conn.close()

metis/writer/sqlite_writer.py

Lines changed: 7 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,11 @@
1-
import sqlite3
2-
import os
3-
import json
4-
from typing import List
1+
from sqlalchemy import Engine, create_engine
52

6-
from metis.writer.writer import DQResultWriter
7-
from metis.utils.result import DQResult
3+
from metis.writer.database_writer import DatabaseWriter
84

9-
class SQLiteWriter(DQResultWriter):
10-
def __init__(self, writer_config) -> None:
11-
if not "db_name" in writer_config or not "table_name" in writer_config:
12-
raise ValueError("SQLite writer config must include 'db_name' and 'table_name' fields.")
13-
self.db_name = writer_config["db_name"]
14-
self.table_name = writer_config["table_name"]
155

16-
if not os.path.exists(self.db_name): #TODO: Make this modular for different databases
17-
conn = sqlite3.connect(self.db_name)
18-
cur = conn.cursor()
19-
cur.execute(f'''
20-
CREATE TABLE IF NOT EXISTS {self.table_name} (
21-
id INTEGER PRIMARY KEY AUTOINCREMENT,
22-
mes_time TEXT NOT NULL,
23-
dq_value REAL NOT NULL,
24-
dq_dimension TEXT NOT NULL,
25-
dq_metric TEXT NOT NULL,
26-
column_name JSONB,
27-
row_index INTEGER,
28-
dq_annotations JSONB,
29-
dataset TEXT,
30-
table_name TEXT
31-
)
32-
''')
33-
conn.commit()
34-
conn.close()
6+
class SQLiteWriter(DatabaseWriter):
7+
def create_engine(self, writer_config) -> Engine:
8+
if "db_name" not in writer_config:
9+
raise ValueError("SQLite writer config must include 'db_name' field.")
3510

36-
def write(self, results: List[DQResult]) -> None:
37-
conn = sqlite3.connect(self.db_name)
38-
cur = conn.cursor()
39-
40-
for result in results:
41-
print(f"Writing result: {result.as_json()}")
42-
cur.execute(f'''
43-
INSERT INTO {self.table_name} (mes_time, dq_value, dq_dimension, dq_metric, column_name, row_index, dq_annotations, dataset, table_name)
44-
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
45-
''', (
46-
result.mesTime.to_pydatetime(),
47-
result.DQvalue,
48-
result.DQdimension,
49-
result.DQmetric,
50-
json.dumps(result.columnNames),
51-
result.rowIndex,
52-
json.dumps(result.DQannotations),
53-
result.dataset,
54-
result.tableName
55-
))
56-
57-
conn.commit()
58-
conn.close()
11+
return create_engine(f"sqlite:///{writer_config['db_name']}", echo=True)

requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pandas
22
psycopg2-binary
3-
sqlite3 ; sys_platform == "win32" # sqlite3 is included with Python, but this line is for completeness
3+
sqlite3 ; sys_platform == "win32" # sqlite3 is included with Python, but this line is for completeness
4+
sqlalchemy==2.0.44

0 commit comments

Comments
 (0)