diff --git a/connectorx/src/get_arrow.rs b/connectorx/src/get_arrow.rs index c665c9c7f1..ef094b9706 100644 --- a/connectorx/src/get_arrow.rs +++ b/connectorx/src/get_arrow.rs @@ -228,7 +228,12 @@ pub fn get_arrow( #[cfg(feature = "src_bigquery")] SourceType::BigQuery => { let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime")); - let source = BigQuerySource::new(rt, &source_conn.conn[..])?; + let source = match &source_conn.bq_adc_config { + Some(cfg) => { + BigQuerySource::new_with_user_adc(rt, cfg.secret_path, cfg.project_id)? + } + None => BigQuerySource::new(rt, &source_conn.conn[..])?, + }; let dispatcher = Dispatcher::<_, _, BigQueryArrowTransport>::new( source, &mut destination, diff --git a/connectorx/src/source_router.rs b/connectorx/src/source_router.rs index ad3aec489e..1337d16e77 100644 --- a/connectorx/src/source_router.rs +++ b/connectorx/src/source_router.rs @@ -18,11 +18,18 @@ pub enum SourceType { Unknown, } +#[derive(Debug, Clone)] +pub struct BigQueryAdcConfig { + pub secret_path: String, + pub project_id: String, +} + #[derive(Debug, Clone)] pub struct SourceConn { pub ty: SourceType, pub conn: Url, pub proto: String, + pub bq_adc_config: Option, } impl TryFrom<&str> for SourceConn { @@ -67,11 +74,25 @@ impl TryFrom<&str> for SourceConn { impl SourceConn { pub fn new(ty: SourceType, conn: Url, proto: String) -> Self { - Self { ty, conn, proto } + Self { + ty, + conn, + proto, + bq_adc_config: None, + } } pub fn set_protocol(&mut self, protocol: &str) { self.proto = protocol.to_string(); } + pub fn new_bq_user_adc(ty: SourceType, proto: String, config: BigQueryAdcConfig) -> Self { + let secret_path_url = url::from_file_path(config.secret_path.clone()); + Self { + ty, + conn: secret_path_url, + proto, + bq_adc_config: Some(config), + } + } } #[throws(ConnectorXError)] diff --git a/connectorx/src/sources/bigquery/mod.rs b/connectorx/src/sources/bigquery/mod.rs index c4ccc167cd..f2a3116c27 100644 --- a/connectorx/src/sources/bigquery/mod.rs +++ b/connectorx/src/sources/bigquery/mod.rs @@ -81,6 +81,22 @@ impl BigQuerySource { schema: vec![], } } + + #[throws(BigQuerySourceError)] + pub fn new_with_user_adc(rt: Arc, secret_path: String, project_id: String) -> Self { + let client = Arc::new(rt.block_on( + gcp_bigquery_client::Client::from_authorized_user_secret(&secret_path), + )?); + Self { + rt, + client, + project_id, + origin_query: None, + queries: vec![], + names: vec![], + schema: vec![], + } + } } impl Source for BigQuerySource