diff --git a/cbc.yaml b/cbc.yaml index 69df890..d2422f2 100644 --- a/cbc.yaml +++ b/cbc.yaml @@ -34,6 +34,21 @@ entrypoints: csv_output_S3_SECRET_KEY: null description: CSV of the queries result type: file + query_information: + config: + query_information_DB_DSN: null + query_information_DB_SCHEMA: null + query_information_DB_SOURCE_DESCRIPTION: null + query_information_DB_TABLE: null + description: A table containing information about the executed query + type: database_table + query_result_as_table: + config: + query_result_as_table_DB_DSN: null + query_result_as_table_DB_SCHEMA: null + query_result_as_table_DB_TABLE: null + description: A table containing the queries result + type: database_table run_query_from_string: description: Run the query directly by passing an SQL Query envs: @@ -58,3 +73,18 @@ entrypoints: csv_output_S3_SECRET_KEY: null description: CSV of the queries result type: file + query_information: + config: + query_information_DB_DSN: null + query_information_DB_SCHEMA: null + query_information_DB_SOURCE_DESCRIPTION: null + query_information_DB_TABLE: null + description: A table containing information about the executed query + type: database_table + query_result_as_table: + config: + query_result_as_table_DB_DSN: null + query_result_as_table_DB_SCHEMA: null + query_result_as_table_DB_TABLE: null + description: A table containing the queries result + type: database_table diff --git a/interactions/query.py b/interactions/query.py index ae1564d..2d720ae 100644 --- a/interactions/query.py +++ b/interactions/query.py @@ -1,6 +1,8 @@ import logging import sys +import pandas as pd + from scystream.sdk.database_handling.database_manager import ( PandasDatabaseOperations, ) @@ -8,11 +10,12 @@ def execute_query_to_csv( query: str, dsn: str, output_file: str, schema: str | None -) -> None: +) -> pd.DataFrame: try: db = PandasDatabaseOperations(dsn, schema) df = db.read(query=query) df.to_csv(output_file, index=False) + return df except Exception as e: logging.error(f"Database query failed: {e}") sys.exit(1) diff --git a/main.py b/main.py index d8a1f09..45c1a65 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,7 @@ +from datetime import datetime import logging import sys +import pandas as pd from scystream.sdk.core import entrypoint from scystream.sdk.env.settings import ( @@ -7,6 +9,10 @@ InputSettings, OutputSettings, FileSettings, + DatabaseSettings, +) +from scystream.sdk.database_handling.database_manager import ( + PandasDatabaseOperations, ) from scystream.sdk.file_handling.s3_manager import S3Operations from interactions.query import execute_query_to_csv @@ -52,12 +58,25 @@ class CSVOutput(FileSettings, OutputSettings): FILE_EXT: str = "csv" +class QueryInformationOutput(DatabaseSettings, OutputSettings): + __identifier__ = "query_information" + + DB_SOURCE_DESCRIPTION: str | None + + +class QueryResultAsTable(DatabaseSettings, OutputSettings): + __identifier__ = "query_result_as_table" + + class QueryDatabaseFromFileEntrypointSettings(EnvSettings): DB_DSN: str DB_SCHEMA: str | None = None query_file: QueryFileInput + csv_output: CSVOutput + query_information: QueryInformationOutput + query_result_as_table: QueryResultAsTable class QueryDatabaseEntrypointSettings(EnvSettings): @@ -65,19 +84,44 @@ class QueryDatabaseEntrypointSettings(EnvSettings): DB_SCHEMA: str | None = None query_str: QueryStrInput + csv_output: CSVOutput + query_information: QueryInformationOutput + query_result_as_table: QueryResultAsTable + + +def write_query_info(query: str, source: str, settings: DatabaseSettings): + db = PandasDatabaseOperations(settings.DB_DSN, settings.DB_SCHEMA) + + df = pd.DataFrame( + [{"query": query, "source": source, "created_at": datetime.now()}] + ) + + db.write(table=settings.DB_TABLE, data=df, mode="overwrite") + + +def write_df_to_table(df: pd.DataFrame, settings: DatabaseSettings) -> None: + db = PandasDatabaseOperations(settings.DB_DSN, settings.DB_SCHEMA) + + db.write(table=settings.DB_TABLE, data=df, mode="overwrite") @entrypoint(QueryDatabaseEntrypointSettings) def run_query_from_string(settings): target_csv = "output.csv" - execute_query_to_csv( + df = execute_query_to_csv( query=settings.query_str.QUERY, dsn=settings.DB_DSN, output_file=target_csv, schema=settings.DB_SCHEMA, ) + write_df_to_table(df, settings.query_result_as_table) upload_to_s3(target_csv, settings.csv_output) + write_query_info( + query=settings.query_str.QUERY, + source=settings.query_information.DB_SOURCE_DESCRIPTION, + settings=settings.query_information, + ) @entrypoint(QueryDatabaseFromFileEntrypointSettings) @@ -93,31 +137,16 @@ def run_query_from_file(settings): query = read_query_file(local_file) target_csv = "output.csv" - execute_query_to_csv( + df = execute_query_to_csv( query=query, dsn=settings.DB_DSN, output_file=target_csv, schema=settings.DB_SCHEMA, ) + write_df_to_table(df, settings.query_result_as_table) upload_to_s3(target_csv, settings.csv_output) - - -""" -if __name__ == "__main__": - test = QueryDatabaseEntrypointSettings( - DB_DSN="postgresql+psycopg2://postgres:postgres@localhost:5432/postgres", - query_str=QueryStrInput(QUERY="SELECT * FROM test_table;"), - csv_output=CSVOutput( - S3_HOST="http://localhost", - S3_PORT="9000", - S3_ACCESS_KEY="minioadmin", - S3_SECRET_KEY="minioadmin", - BUCKET_NAME="output-bucket", - FILE_PATH="output_file_path", - FILE_NAME="csv_file", - FILE_EXT="csv", - ), + write_query_info( + query=query, + source=settings.query_information.DB_SOURCE_DESCRIPTION, + settings=settings.query_information, ) - - run_query_from_string(test) -""" diff --git a/requirements.txt b/requirements.txt index 23133f8..91327f9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,4 @@ duckdb==1.4.1 pyodbc==5.2.0 snowflake-sqlalchemy==1.7.7 oracledb==3.4.2 +pandas==3.0.3