Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions cbc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,21 @@ entrypoints:
csv_output_S3_SECRET_KEY: null
description: CSV of the queries result
type: file
query_information:
config:
query_information_DB_DSN: null
query_information_DB_SCHEMA: null
query_information_DB_SOURCE_DESCRIPTION: null
query_information_DB_TABLE: null
description: A table containing information about the executed query
type: database_table
query_result_as_table:
config:
query_result_as_table_DB_DSN: null
query_result_as_table_DB_SCHEMA: null
query_result_as_table_DB_TABLE: null
description: A table containing the queries result
type: database_table
run_query_from_string:
description: Run the query directly by passing an SQL Query
envs:
Expand All @@ -58,3 +73,18 @@ entrypoints:
csv_output_S3_SECRET_KEY: null
description: CSV of the queries result
type: file
query_information:
config:
query_information_DB_DSN: null
query_information_DB_SCHEMA: null
query_information_DB_SOURCE_DESCRIPTION: null
query_information_DB_TABLE: null
description: A table containing information about the executed query
type: database_table
query_result_as_table:
config:
query_result_as_table_DB_DSN: null
query_result_as_table_DB_SCHEMA: null
query_result_as_table_DB_TABLE: null
description: A table containing the queries result
type: database_table
5 changes: 4 additions & 1 deletion interactions/query.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import logging
import sys

import pandas as pd

from scystream.sdk.database_handling.database_manager import (
PandasDatabaseOperations,
)


def execute_query_to_csv(
query: str, dsn: str, output_file: str, schema: str | None
) -> None:
) -> pd.DataFrame:
try:
db = PandasDatabaseOperations(dsn, schema)
df = db.read(query=query)
df.to_csv(output_file, index=False)
return df
except Exception as e:
logging.error(f"Database query failed: {e}")
sys.exit(1)
73 changes: 51 additions & 22 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
from datetime import datetime
import logging
import sys
import pandas as pd

from scystream.sdk.core import entrypoint
from scystream.sdk.env.settings import (
EnvSettings,
InputSettings,
OutputSettings,
FileSettings,
DatabaseSettings,
)
from scystream.sdk.database_handling.database_manager import (
PandasDatabaseOperations,
)
from scystream.sdk.file_handling.s3_manager import S3Operations
from interactions.query import execute_query_to_csv
Expand Down Expand Up @@ -52,32 +58,70 @@ class CSVOutput(FileSettings, OutputSettings):
FILE_EXT: str = "csv"


class QueryInformationOutput(DatabaseSettings, OutputSettings):
__identifier__ = "query_information"

DB_SOURCE_DESCRIPTION: str | None


class QueryResultAsTable(DatabaseSettings, OutputSettings):
__identifier__ = "query_result_as_table"


class QueryDatabaseFromFileEntrypointSettings(EnvSettings):
DB_DSN: str
DB_SCHEMA: str | None = None

query_file: QueryFileInput

csv_output: CSVOutput
query_information: QueryInformationOutput
query_result_as_table: QueryResultAsTable


class QueryDatabaseEntrypointSettings(EnvSettings):
DB_DSN: str
DB_SCHEMA: str | None = None

query_str: QueryStrInput

csv_output: CSVOutput
query_information: QueryInformationOutput
query_result_as_table: QueryResultAsTable


def write_query_info(query: str, source: str, settings: DatabaseSettings):
db = PandasDatabaseOperations(settings.DB_DSN, settings.DB_SCHEMA)

df = pd.DataFrame(
[{"query": query, "source": source, "created_at": datetime.now()}]
)

db.write(table=settings.DB_TABLE, data=df, mode="overwrite")


def write_df_to_table(df: pd.DataFrame, settings: DatabaseSettings) -> None:
db = PandasDatabaseOperations(settings.DB_DSN, settings.DB_SCHEMA)

db.write(table=settings.DB_TABLE, data=df, mode="overwrite")


@entrypoint(QueryDatabaseEntrypointSettings)
def run_query_from_string(settings):
target_csv = "output.csv"
execute_query_to_csv(
df = execute_query_to_csv(
query=settings.query_str.QUERY,
dsn=settings.DB_DSN,
output_file=target_csv,
schema=settings.DB_SCHEMA,
)
write_df_to_table(df, settings.query_result_as_table)
upload_to_s3(target_csv, settings.csv_output)
write_query_info(
query=settings.query_str.QUERY,
source=settings.query_information.DB_SOURCE_DESCRIPTION,
settings=settings.query_information,
)


@entrypoint(QueryDatabaseFromFileEntrypointSettings)
Expand All @@ -93,31 +137,16 @@ def run_query_from_file(settings):
query = read_query_file(local_file)
target_csv = "output.csv"

execute_query_to_csv(
df = execute_query_to_csv(
query=query,
dsn=settings.DB_DSN,
output_file=target_csv,
schema=settings.DB_SCHEMA,
)
write_df_to_table(df, settings.query_result_as_table)
upload_to_s3(target_csv, settings.csv_output)


"""
if __name__ == "__main__":
test = QueryDatabaseEntrypointSettings(
DB_DSN="postgresql+psycopg2://postgres:postgres@localhost:5432/postgres",
query_str=QueryStrInput(QUERY="SELECT * FROM test_table;"),
csv_output=CSVOutput(
S3_HOST="http://localhost",
S3_PORT="9000",
S3_ACCESS_KEY="minioadmin",
S3_SECRET_KEY="minioadmin",
BUCKET_NAME="output-bucket",
FILE_PATH="output_file_path",
FILE_NAME="csv_file",
FILE_EXT="csv",
),
write_query_info(
query=query,
source=settings.query_information.DB_SOURCE_DESCRIPTION,
settings=settings.query_information,
)

run_query_from_string(test)
"""
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ duckdb==1.4.1
pyodbc==5.2.0
snowflake-sqlalchemy==1.7.7
oracledb==3.4.2
pandas==3.0.3
Loading