diff --git a/README.md b/README.md index 3b074e3..232d2fb 100644 --- a/README.md +++ b/README.md @@ -70,9 +70,14 @@ environment variable is set either in the terminal context or in the `.env` file ### Source Authentication and Authorization - +Credential resolution order (Application Default Credentials–style): + +1. **Config** `google_application_credentials` – JSON string, JSON object, or path to a service account key file. +2. **Env** `GOOGLE_APPLICATION_CREDENTIALS_STRING` – JSON string of the service account key (e.g. secret injected by Airflow). +3. **Env** `GOOGLE_APPLICATION_CREDENTIALS` – Path to a service account key file. +4. **Application Default Credentials (ADC)** – No config or env set: workload identity (GKE/Cloud Run/Airflow pod), `gcloud auth application-default login`, or metadata server. + +For Airflow/GKE with an attached service account, omit `google_application_credentials` and do not set the env vars; the tap uses ADC. ## Usage diff --git a/tap_bigquery/client.py b/tap_bigquery/client.py index b3bb993..cdf1652 100644 --- a/tap_bigquery/client.py +++ b/tap_bigquery/client.py @@ -7,12 +7,14 @@ import json import math +import os import tempfile from functools import cached_property from pathlib import Path from typing import TYPE_CHECKING from gcsfs import GCSFileSystem +from google.auth import default as default_credentials from google.cloud import bigquery from singer_sdk import SQLStream from singer_sdk.helpers._batch import JSONLinesEncoding @@ -22,28 +24,63 @@ if TYPE_CHECKING: from singer_sdk.helpers import types +CREDENTIALS_STRING_ENV = "GOOGLE_APPLICATION_CREDENTIALS_STRING" +CREDENTIALS_PATH_ENV = "GOOGLE_APPLICATION_CREDENTIALS" -class BigQueryStream(SQLStream): - """Stream class for BigQuery streams.""" - connector_class = BigQueryConnector +def _get_bigquery_client(config: dict, project_id: str | None = None) -> bigquery.Client: + """Build a BigQuery client. Credential resolution order: - @cached_property - def client(self): - credentials: str | dict = self.config["google_application_credentials"] + 1. config['google_application_credentials'] (JSON or path) + 2. GOOGLE_APPLICATION_CREDENTIALS_STRING env (JSON) + 3. GOOGLE_APPLICATION_CREDENTIALS env (path) + 4. Application Default Credentials (ADC) – workload identity, gcloud, etc. + """ + import logging + logger = logging.getLogger(__name__) + credentials = config.get("google_application_credentials") + if not credentials and os.environ.get(CREDENTIALS_STRING_ENV): + credentials = os.environ[CREDENTIALS_STRING_ENV] + if not credentials and os.environ.get(CREDENTIALS_PATH_ENV): + return bigquery.Client.from_service_account_json( + os.environ[CREDENTIALS_PATH_ENV], + project=project_id, + ) + + if credentials: try: return bigquery.Client.from_service_account_info( - json.loads(credentials) - if isinstance(credentials, str) - else credentials, + json.loads(credentials) if isinstance(credentials, str) else credentials, + project=project_id, ) - except (TypeError, json.decoder.JSONDecodeError): - self.logger.debug( - "`google_application_credentials` is not valid JSON, trying as path", + except (TypeError, json.decoder.JSONDecodeError) as e: + logger.debug( + "Credentials not valid JSON (trying as file path): %s", str(e) ) + return bigquery.Client.from_service_account_json(credentials, project=project_id) + + creds, default_project = default_credentials( + scopes=["https://www.googleapis.com/auth/bigquery"] + ) + return bigquery.Client( + credentials=creds, + project=project_id or default_project, + ) + - return bigquery.Client.from_service_account_json(credentials) +class BigQueryStream(SQLStream): + """Stream class for BigQuery streams.""" + + connector_class = BigQueryConnector + selected_by_default = True + + @cached_property + def client(self): + return _get_bigquery_client( + self.config, + project_id=self.config.get("project_id"), + ) def prepare_serialisation(self, _dict, _keychain = []): """ diff --git a/tap_bigquery/connector.py b/tap_bigquery/connector.py index 5452681..08fc8e4 100644 --- a/tap_bigquery/connector.py +++ b/tap_bigquery/connector.py @@ -4,6 +4,7 @@ import fnmatch import json +import os import typing as t import sqlalchemy @@ -29,6 +30,14 @@ class BigQueryConnector(SQLConnector): """Connects to the BigQuery SQL source.""" + @staticmethod + def _normalize_table_name(schema_name: str, table_name: str) -> str: + """Strip duplicated schema prefixes from table names.""" + schema_prefix = f"{schema_name}." + if table_name.startswith(schema_prefix): + return table_name[len(schema_prefix):] + return table_name + def create_engine(self) -> Engine: """Creates and returns a new engine. Do not call outside of _engine. @@ -43,10 +52,20 @@ def create_engine(self) -> Engine: Returns: A new SQLAlchemy Engine. """ - credentials : str | dict = self.config.get("google_application_credentials") + credentials: str | dict | None = self.config.get("google_application_credentials") + if not credentials and os.environ.get("GOOGLE_APPLICATION_CREDENTIALS_STRING"): + credentials = os.environ["GOOGLE_APPLICATION_CREDENTIALS_STRING"] + if not credentials and os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"): + return sqlalchemy.create_engine( + self.sqlalchemy_url, + echo=False, + credentials_path=os.environ["GOOGLE_APPLICATION_CREDENTIALS"], + ) if credentials: try: + # Note: json_serializer/json_deserializer commented out - not needed for + # BigQuery dialect as it handles JSON types natively return sqlalchemy.create_engine( self.sqlalchemy_url, echo=False, @@ -55,27 +74,18 @@ def create_engine(self) -> Engine: if isinstance(credentials, str) else credentials ), - # json_serializer=self.serialize_json, - # json_deserializer=self.deserialize_json, ) except (TypeError, json.decoder.JSONDecodeError): - self.logger.warning( - "'google_application_credentials' not valid json trying path", + self.logger.debug( + "Credentials not valid JSON (trying as file path)", ) return sqlalchemy.create_engine( self.sqlalchemy_url, echo=False, credentials_path=credentials, - # json_serializer=self.serialize_json, - # json_deserializer=self.deserialize_json, ) - else: - return sqlalchemy.create_engine( - self.sqlalchemy_url, - echo=False, - # json_serializer=self.serialize_json, - # json_deserializer=self.deserialize_json, - ) + # No credentials: use Application Default Credentials (ADC) + return sqlalchemy.create_engine(self.sqlalchemy_url, echo=False) def to_array_type( self, @@ -180,6 +190,8 @@ def discover_catalog_entry( Returns: `CatalogEntry` object for the given table or a view """ + table_name = self._normalize_table_name(schema_name, table_name) + # Initialize unique stream name unique_stream_id = f"{schema_name}-{table_name}" @@ -247,6 +259,66 @@ def discover_catalog_entry( replication_key=None, # Must be defined by user ) + def discover_catalog_entries( + self, + *args, + **kwargs, + ) -> list[dict[str, t.Any]]: + """Discover catalog entries using per-table reflection. + + BigQuery's SQLAlchemy dialect can return table names prefixed with the + dataset, and bulk reflection may then resolve dataset names as project + IDs. Discovering tables one-by-one avoids that path and keeps the + project ID stable. + """ + del args, kwargs # Connector discovery does not require passthrough args. + engine = self._engine + inspected = sqlalchemy.inspect(engine) + + catalog_entries: list[dict[str, t.Any]] = [] + for schema_name in self.get_schema_names(engine, inspected): + for table_name, is_view in self.get_object_names( + engine, + inspected, + schema_name, + ): + normalized_table_name = self._normalize_table_name( + schema_name, + table_name, + ) + entry_dict = self.discover_catalog_entry( + engine=engine, + inspected=inspected, + schema_name=schema_name, + table_name=normalized_table_name, + is_view=is_view, + ).to_dict() + self._ensure_selected_by_default(entry_dict) + catalog_entries.append(entry_dict) + + self.logger.info( + "Discovered %d BigQuery stream(s): %s", + len(catalog_entries), + [entry.get("tap_stream_id") for entry in catalog_entries], + ) + return catalog_entries + + @staticmethod + def _ensure_selected_by_default(catalog_entry: dict[str, t.Any]) -> None: + """Force stream-level selected-by-default metadata on discovered entries.""" + metadata_entries = catalog_entry.get("metadata", []) + for metadata_entry in metadata_entries: + if metadata_entry.get("breadcrumb") == []: + metadata_entry.setdefault("metadata", {})["selected-by-default"] = True + return + + metadata_entries.append( + { + "breadcrumb": [], + "metadata": {"selected-by-default": True}, + }, + ) + catalog_entry["metadata"] = metadata_entries def get_sqlalchemy_url(self, config: dict) -> str: """Concatenate a SQLAlchemy URL for use in connecting to the source.""" return f"bigquery://{config['project_id']}" @@ -265,7 +337,7 @@ def get_object_names( # Let's strip `schema_name` prefix on the inspection objects = [ - (table_name.split(".")[-1], is_view) + (self._normalize_table_name(schema_name, table_name), is_view) for (table_name, is_view) in super().get_object_names( engine, inspected, diff --git a/tap_bigquery/tap.py b/tap_bigquery/tap.py index 8d90efe..098c045 100644 --- a/tap_bigquery/tap.py +++ b/tap_bigquery/tap.py @@ -12,6 +12,15 @@ class TapBigQuery(SQLTap): """Google BigQuery tap.""" name = "tap-bigquery" + capabilities = [ + "about", + "catalog", + "discover", + "state", + "stream-maps", + "schema-flattening", + "batch", + ] config_jsonschema = th.PropertiesList( th.Property( @@ -26,9 +35,13 @@ class TapBigQuery(SQLTap): th.StringType, th.ObjectType(), ), - required=True, + required=False, secret=True, - description="JSON content or path to service account credentials.", + description=( + "Optional. JSON content or path to service account credentials. " + "If unset, Application Default Credentials (ADC) are used: " + "GOOGLE_APPLICATION_CREDENTIALS_STRING, GOOGLE_APPLICATION_CREDENTIALS, or workload identity (e.g. GKE/Airflow)." + ), ), th.Property( "google_storage_bucket",