Skip to content
Open
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,14 @@ environment variable is set either in the terminal context or in the `.env` file

### Source Authentication and Authorization

<!--
Developer TODO: If your tap requires special access on the source system, or any special authentication requirements, provide those here.
-->
Credential resolution order (Application Default Credentials–style):

1. **Config** `google_application_credentials` – JSON string, JSON object, or path to a service account key file.
2. **Env** `GOOGLE_APPLICATION_CREDENTIALS_STRING` – JSON string of the service account key (e.g. secret injected by Airflow).
3. **Env** `GOOGLE_APPLICATION_CREDENTIALS` – Path to a service account key file.
4. **Application Default Credentials (ADC)** – No config or env set: workload identity (GKE/Cloud Run/Airflow pod), `gcloud auth application-default login`, or metadata server.

For Airflow/GKE with an attached service account, omit `google_application_credentials` and do not set the env vars; the tap uses ADC.

## Usage

Expand Down
63 changes: 50 additions & 13 deletions tap_bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@

import json
import math
import os
import tempfile
from functools import cached_property
from pathlib import Path
from typing import TYPE_CHECKING

from gcsfs import GCSFileSystem
from google.auth import default as default_credentials
from google.cloud import bigquery
from singer_sdk import SQLStream
from singer_sdk.helpers._batch import JSONLinesEncoding
Expand All @@ -22,28 +24,63 @@
if TYPE_CHECKING:
from singer_sdk.helpers import types

CREDENTIALS_STRING_ENV = "GOOGLE_APPLICATION_CREDENTIALS_STRING"
CREDENTIALS_PATH_ENV = "GOOGLE_APPLICATION_CREDENTIALS"

class BigQueryStream(SQLStream):
"""Stream class for BigQuery streams."""

connector_class = BigQueryConnector
def _get_bigquery_client(config: dict, project_id: str | None = None) -> bigquery.Client:
"""Build a BigQuery client. Credential resolution order:

@cached_property
def client(self):
credentials: str | dict = self.config["google_application_credentials"]
1. config['google_application_credentials'] (JSON or path)
2. GOOGLE_APPLICATION_CREDENTIALS_STRING env (JSON)
3. GOOGLE_APPLICATION_CREDENTIALS env (path)
4. Application Default Credentials (ADC) – workload identity, gcloud, etc.
"""
import logging

logger = logging.getLogger(__name__)
credentials = config.get("google_application_credentials")
if not credentials and os.environ.get(CREDENTIALS_STRING_ENV):
credentials = os.environ[CREDENTIALS_STRING_ENV]
if not credentials and os.environ.get(CREDENTIALS_PATH_ENV):
return bigquery.Client.from_service_account_json(
os.environ[CREDENTIALS_PATH_ENV],
project=project_id,
)

if credentials:
try:
return bigquery.Client.from_service_account_info(
json.loads(credentials)
if isinstance(credentials, str)
else credentials,
json.loads(credentials) if isinstance(credentials, str) else credentials,
project=project_id,
)
except (TypeError, json.decoder.JSONDecodeError):
self.logger.debug(
"`google_application_credentials` is not valid JSON, trying as path",
except (TypeError, json.decoder.JSONDecodeError) as e:
logger.debug(
"Credentials not valid JSON (trying as file path): %s", str(e)
)
return bigquery.Client.from_service_account_json(credentials, project=project_id)

creds, default_project = default_credentials(
scopes=["https://www.googleapis.com/auth/bigquery"]
)
return bigquery.Client(
credentials=creds,
project=project_id or default_project,
)


return bigquery.Client.from_service_account_json(credentials)
class BigQueryStream(SQLStream):
"""Stream class for BigQuery streams."""

connector_class = BigQueryConnector
selected_by_default = True

@cached_property
def client(self):
return _get_bigquery_client(
self.config,
project_id=self.config.get("project_id"),
)

def prepare_serialisation(self, _dict, _keychain = []):
"""
Expand Down
102 changes: 87 additions & 15 deletions tap_bigquery/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import fnmatch
import json
import os
import typing as t

import sqlalchemy
Expand All @@ -29,6 +30,14 @@
class BigQueryConnector(SQLConnector):
"""Connects to the BigQuery SQL source."""

@staticmethod
def _normalize_table_name(schema_name: str, table_name: str) -> str:
"""Strip duplicated schema prefixes from table names."""
schema_prefix = f"{schema_name}."
if table_name.startswith(schema_prefix):
return table_name[len(schema_prefix):]
return table_name

def create_engine(self) -> Engine:
"""Creates and returns a new engine. Do not call outside of _engine.

Expand All @@ -43,10 +52,20 @@ def create_engine(self) -> Engine:
Returns:
A new SQLAlchemy Engine.
"""
credentials : str | dict = self.config.get("google_application_credentials")
credentials: str | dict | None = self.config.get("google_application_credentials")
if not credentials and os.environ.get("GOOGLE_APPLICATION_CREDENTIALS_STRING"):
credentials = os.environ["GOOGLE_APPLICATION_CREDENTIALS_STRING"]
if not credentials and os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"):
return sqlalchemy.create_engine(
self.sqlalchemy_url,
echo=False,
credentials_path=os.environ["GOOGLE_APPLICATION_CREDENTIALS"],
)

if credentials:
try:
# Note: json_serializer/json_deserializer commented out - not needed for
# BigQuery dialect as it handles JSON types natively
return sqlalchemy.create_engine(
self.sqlalchemy_url,
echo=False,
Expand All @@ -55,27 +74,18 @@ def create_engine(self) -> Engine:
if isinstance(credentials, str)
else credentials
),
# json_serializer=self.serialize_json,
# json_deserializer=self.deserialize_json,
)
except (TypeError, json.decoder.JSONDecodeError):
self.logger.warning(
"'google_application_credentials' not valid json trying path",
self.logger.debug(
"Credentials not valid JSON (trying as file path)",
)
return sqlalchemy.create_engine(
self.sqlalchemy_url,
echo=False,
credentials_path=credentials,
# json_serializer=self.serialize_json,
# json_deserializer=self.deserialize_json,
)
else:
return sqlalchemy.create_engine(
self.sqlalchemy_url,
echo=False,
# json_serializer=self.serialize_json,
# json_deserializer=self.deserialize_json,
)
# No credentials: use Application Default Credentials (ADC)
return sqlalchemy.create_engine(self.sqlalchemy_url, echo=False)

def to_array_type(
self,
Expand Down Expand Up @@ -180,6 +190,8 @@ def discover_catalog_entry(
Returns:
`CatalogEntry` object for the given table or a view
"""
table_name = self._normalize_table_name(schema_name, table_name)

# Initialize unique stream name
unique_stream_id = f"{schema_name}-{table_name}"

Expand Down Expand Up @@ -247,6 +259,66 @@ def discover_catalog_entry(
replication_key=None, # Must be defined by user
)

def discover_catalog_entries(
self,
*args,
**kwargs,
) -> list[dict[str, t.Any]]:
"""Discover catalog entries using per-table reflection.

BigQuery's SQLAlchemy dialect can return table names prefixed with the
dataset, and bulk reflection may then resolve dataset names as project
IDs. Discovering tables one-by-one avoids that path and keeps the
project ID stable.
"""
del args, kwargs # Connector discovery does not require passthrough args.
engine = self._engine
inspected = sqlalchemy.inspect(engine)

catalog_entries: list[dict[str, t.Any]] = []
for schema_name in self.get_schema_names(engine, inspected):
for table_name, is_view in self.get_object_names(
engine,
inspected,
schema_name,
):
normalized_table_name = self._normalize_table_name(
schema_name,
table_name,
)
entry_dict = self.discover_catalog_entry(
engine=engine,
inspected=inspected,
schema_name=schema_name,
table_name=normalized_table_name,
is_view=is_view,
).to_dict()
self._ensure_selected_by_default(entry_dict)
catalog_entries.append(entry_dict)

self.logger.info(
"Discovered %d BigQuery stream(s): %s",
len(catalog_entries),
[entry.get("tap_stream_id") for entry in catalog_entries],
)
return catalog_entries

@staticmethod
def _ensure_selected_by_default(catalog_entry: dict[str, t.Any]) -> None:
"""Force stream-level selected-by-default metadata on discovered entries."""
metadata_entries = catalog_entry.get("metadata", [])
for metadata_entry in metadata_entries:
if metadata_entry.get("breadcrumb") == []:
metadata_entry.setdefault("metadata", {})["selected-by-default"] = True
return

metadata_entries.append(
{
"breadcrumb": [],
"metadata": {"selected-by-default": True},
},
)
catalog_entry["metadata"] = metadata_entries
def get_sqlalchemy_url(self, config: dict) -> str:
"""Concatenate a SQLAlchemy URL for use in connecting to the source."""
return f"bigquery://{config['project_id']}"
Expand All @@ -265,7 +337,7 @@ def get_object_names(
# Let's strip `schema_name` prefix on the inspection

objects = [
(table_name.split(".")[-1], is_view)
(self._normalize_table_name(schema_name, table_name), is_view)
for (table_name, is_view) in super().get_object_names(
engine,
inspected,
Expand Down
17 changes: 15 additions & 2 deletions tap_bigquery/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ class TapBigQuery(SQLTap):
"""Google BigQuery tap."""

name = "tap-bigquery"
capabilities = [
"about",
"catalog",
"discover",
"state",
"stream-maps",
"schema-flattening",
"batch",
]

config_jsonschema = th.PropertiesList(
th.Property(
Expand All @@ -26,9 +35,13 @@ class TapBigQuery(SQLTap):
th.StringType,
th.ObjectType(),
),
required=True,
required=False,
secret=True,
description="JSON content or path to service account credentials.",
description=(
"Optional. JSON content or path to service account credentials. "
"If unset, Application Default Credentials (ADC) are used: "
"GOOGLE_APPLICATION_CREDENTIALS_STRING, GOOGLE_APPLICATION_CREDENTIALS, or workload identity (e.g. GKE/Airflow)."
),
),
th.Property(
"google_storage_bucket",
Expand Down