diff --git a/.gitignore b/.gitignore index 1ea9651f..690cce51 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ dist/ test_data/ poetry.lock +client_secrets.json diff --git a/.vscode/settings.json b/.vscode/settings.json index b4767b67..b2f56af5 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -10,7 +10,7 @@ ], "[python]": { "editor.defaultFormatter": "ms-python.black-formatter", - "editor.formatOnSave": true, + "editor.formatOnSave": false, "editor.codeActionsOnSave": { "source.organizeImports": "explicit" }, diff --git a/datapipe/store/bigquery.py b/datapipe/store/bigquery.py new file mode 100644 index 00000000..bbaa5335 --- /dev/null +++ b/datapipe/store/bigquery.py @@ -0,0 +1,182 @@ +from typing import Iterator, List, Optional, Union + +from datapipe.store.table_store import TableStore +from datapipe.run_config import RunConfig +from datapipe.types import DataDF, DataSchema, IndexDF, MetaSchema, data_to_index + +from google.cloud import bigquery +from google.oauth2 import service_account +from google.cloud.exceptions import NotFound + +from sqlalchemy import Column + + +# sqlalchemy types to GoogleSQL data types +SCHEMA_MAPPING = { + "ARRAY": "ARRAY", + "BIGINT": "INT64", + "BINARY": "BYTES", + "BLOB": "STRING", + "BOOLEAN": "BOOL", + "CHAR": "STRING", + "CLOB": "STRING", + "DATE": "DATE", + "DATETIME": "DATETIME", + "DECIMAL": "FLOAT64", + "DOUBLE": "FLOAT64", + "DOUBLE_PRECISION": "FLOAT64", + "FLOAT": "FLOAT64", + "INT": "INT64", + "JSON": "JSON", + "INTEGER": "INT64", + "NCHAR": "STRING", + "NVARCHAR": "STRING", + "NUMERIC": "NUMERIC", + "REAL": "FLOAT64", + "SMALLINT": "INT64", + "TEXT": "STRING", + "TIME": "TIME", + "TIMESTAMP": "TIMESTAMP", + "UUID": "STRING", + "VARBINARY": "BYTES", + "VARCHAR": "STRING", +} + + +def is_table_exists(client: bigquery.Client, table) -> bool: + try: + client.get_table(table) + return True + except NotFound: + return False + + +def is_dataset_exists( + client: bigquery.Client, dataset: bigquery.Client.dataset +) -> bool: + try: + client.get_dataset(dataset) + return True + except NotFound: + return False + + +class BQClient: + def __init__(self, service_account_file): + self.bq_client = bigquery.Client( + credentials=service_account.Credentials.from_service_account_file( + service_account_file + ) + ) + + def __call__(self, *args, **kwds): + return self.bq_client(*args, **kwds) + + +class TableStoreBQ(TableStore): + def __init__( + self, + bq_client: bigquery.Client, + name: str, + data_sql_schema: List[Column], + dataset_id: str, + table_id: str, + ) -> None: + self.bq_client = bq_client + self.name = name + self.data_sql_schema = data_sql_schema + + # Труба оперирует партициями BQ - так наиболее эффективно и экономно. + # Для трубы это индекс, для BQ это партиция. + # IndexDF - pandas dataframe, где указаны индексы всех строк, в которых были изменения и которые нужно пересчитать. + + # При инициализации TableStoreBQ можно указывать ключи кластеризации для ещё большей оптимизации. + + dataset_ref = self.bq_client.dataset(dataset_id) + table_ref = dataset_ref.table(str(table_id)) + + if not is_dataset_exists(self.bq_client, dataset_ref): + self.bq_client.create_dataset(dataset_ref) + + prim_keys = [ + column for column in self.data_sql_schema if column.primary_key + ] + value_cols = [ + column for column in self.data_sql_schema if not column.primary_key + ] + + schema_prim_keys = [bigquery.SchemaField(column.name, SCHEMA_MAPPING.get(f"{column.type}", "STRING"), mode="REQUIRED") for column in prim_keys] + schema_value_cols = [bigquery.SchemaField(column.name, SCHEMA_MAPPING.get(f"{column.type}", "STRING"), mode="NULLABLE") for column in value_cols] + + self.table = bigquery.Table( + table_ref=table_ref, + schema=schema_prim_keys+schema_value_cols + ) + + self.table.clustering_fields = [column.name for column in self.data_sql_schema if column.primary_key][0:4] + self.table = self.bq_client.create_table(self.table, exists_ok=True) + + self.job_config = bigquery.LoadJobConfig() + self.job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND + + + def get_primary_schema(self) -> DataSchema: + # Нужно реализовать + raise NotImplementedError + + + def get_meta_schema(self) -> MetaSchema: + raise NotImplementedError + + + def get_schema(self) -> DataSchema: + # Нужно реализовать + raise NotImplementedError + + + @property + def primary_keys(self) -> List[str]: + return [i.name for i in self.get_primary_schema()] + + + def delete_rows(self, idx: IndexDF) -> None: + dml = f"DELETE FROM `{self.table.project}`.`{self.table.dataset_id}`.`{self.table.table_id}` WHERE TRUE;" + self.bq_client.query(dml) + + + def insert_rows(self, df: DataDF) -> None: + self.bq_client.load_table_from_dataframe( + df, + self.table, + job_config=self.job_config, + ).result() + + + def update_rows(self, df: DataDF) -> None: + if df.empty: + return + + self.delete_rows(data_to_index(df, self.primary_keys)) + self.insert_rows(df) + + + def read_rows(self, idx: Optional[IndexDF] = None) -> DataDF: + sql = f"SELECT * FROM `{self.table.project}`.`{self.table.dataset_id}`.`{self.table.table_id}`;" + result = self.bq_client.query_and_wait(sql) + df = result.to_dataframe() + + return df + + + def read_rows_meta_pseudo_df( + self, chunksize: int = 1000, run_config: Optional[RunConfig] = None + ) -> Iterator[DataDF]: + # FIXME сделать честную чанкированную реализацию во всех сторах + + # Актуализация метаданных о таблице, которая может быть обновлена не только трубой - для BQ возможно пригодится. + # Подсчёт чек-суммы и сравнить: изменились ли данные. + # То есть, если таблица-источник изменилась в части некоторых партиций, то эти партиции нужно пересчитать. + + # Понять, как дёшего получать информацию об изменениях в партициях. + + yield self.read_rows() diff --git a/pyproject.toml b/pyproject.toml index 4557b41a..49893121 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,11 @@ click = ">=7.1.2" rich = "^13.3.2" ray = { version = "^2.5.0", optional = true, extras = ["default"] } +google = "^3.0.0" +google-auth = "^2.35.0" +google-cloud-bigquery = "^3.26.0" +pyarrow = "^17.0.0" +db-dtypes = "^1.3.0" [tool.poetry.extras] diff --git a/test.py b/test.py new file mode 100644 index 00000000..966f203b --- /dev/null +++ b/test.py @@ -0,0 +1,47 @@ +from datapipe.store.bigquery import BQClient +from datapipe.store.bigquery import TableStoreBQ + +import pandas as pd + +from sqlalchemy import Column +from sqlalchemy import types + + +BQ_CREDENTIALS = r"./client_secrets.json" + +STORE_NAME = r"test_transformation" +STORE_DATA_SQL_SCHEMA = [ + Column("col_1", types.BIGINT, primary_key=True), + Column("col_2", types.CHAR), + Column("col_3", types.BOOLEAN), +] +STORE_DATASET_ID = r"datapipe_test" +STORE_TABLE_ID = r"test_2" + + +bq_client = BQClient(service_account_file=BQ_CREDENTIALS) + +table_store_bq = TableStoreBQ( + bq_client=bq_client.bq_client, + name=STORE_NAME, + data_sql_schema=STORE_DATA_SQL_SCHEMA, + dataset_id=STORE_DATASET_ID, + table_id=STORE_TABLE_ID, +) + + +df = pd.DataFrame( + data={ + "col_1": [1, 2, 3], + "col_2": ["a", "b", "cc"], + "col_3": [False, True, None], + } +) + +print(df) + + +table_store_bq.insert_rows(df) + +# df = table_store_bq.read_rows() +# print(df)