diff --git a/core-api/src/envconfig.rs b/core-api/src/envconfig.rs index 90797dbeb..a43d89727 100644 --- a/core-api/src/envconfig.rs +++ b/core-api/src/envconfig.rs @@ -1129,7 +1129,7 @@ namespace = "my-namespace" ..Default::default() }; - let profile = load_client_config_profile(options, None).unwrap(); + let profile = load_client_config_profile(options, Some(&HashMap::new())).unwrap(); assert_eq!(profile.address.as_ref().unwrap(), "my-address"); assert_eq!(profile.namespace.as_ref().unwrap(), "my-namespace"); } @@ -1519,7 +1519,7 @@ address = "localhost:7233" }; // Should not error, just returns an empty profile - let profile = load_client_config_profile(options, None).unwrap(); + let profile = load_client_config_profile(options, Some(&HashMap::new())).unwrap(); assert_eq!(profile, ClientConfigProfile::default()); } @@ -1595,7 +1595,7 @@ api_key = "my-api-key" ..Default::default() }; - let profile = load_client_config_profile(options, None).unwrap(); + let profile = load_client_config_profile(options, Some(&HashMap::new())).unwrap(); // TLS should be enabled due to API key presence assert!(profile.tls.is_some()); @@ -1616,7 +1616,7 @@ address = "some-address" ..Default::default() }; - let profile = load_client_config_profile(options, None).unwrap(); + let profile = load_client_config_profile(options, Some(&HashMap::new())).unwrap(); // TLS should not be enabled assert!(profile.tls.is_none()); @@ -1624,6 +1624,9 @@ address = "some-address" #[test] fn test_load_client_config_profile_from_system_env() { + // WARNING: This test modifies system environment variables which can cause + // test pollution if tests run in parallel. + // Set up system env vars. These tests can't be run in parallel. unsafe { std::env::set_var("TEMPORAL_ADDRESS", "system-address"); diff --git a/core-c-bridge/Cargo.toml b/core-c-bridge/Cargo.toml index a52c5bf3c..0e8764c66 100644 --- a/core-c-bridge/Cargo.toml +++ b/core-c-bridge/Cargo.toml @@ -19,6 +19,7 @@ prost = { workspace = true } # cause non-determinism. rand = "0.8.5" rand_pcg = "0.3.1" +serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tokio = "1.26" tokio-stream = "0.1" @@ -36,6 +37,7 @@ features = ["ephemeral-server"] [dependencies.temporal-sdk-core-api] path = "../core-api" +features = ["envconfig"] [dependencies.temporal-sdk-core-protos] path = "../sdk-core-protos" diff --git a/core-c-bridge/include/temporal-sdk-core-c-bridge.h b/core-c-bridge/include/temporal-sdk-core-c-bridge.h index 19ce209ce..d16878234 100644 --- a/core-c-bridge/include/temporal-sdk-core-c-bridge.h +++ b/core-c-bridge/include/temporal-sdk-core-c-bridge.h @@ -240,6 +240,14 @@ typedef void (*TemporalCoreClientRpcCallCallback)(void *user_data, const struct TemporalCoreByteArray *failure_message, const struct TemporalCoreByteArray *failure_details); +/** + * Callback for client config load operations. + * If success or fail are not null, they must be manually freed when done. + */ +typedef void (*TemporalCoreClientConfigCallback)(void *user_data, + const struct TemporalCoreByteArray *success, + const struct TemporalCoreByteArray *fail); + typedef union TemporalCoreMetricAttributeValue { struct TemporalCoreByteArrayRef string_value; int64_t int_value; @@ -769,6 +777,30 @@ void temporal_core_client_rpc_call(struct TemporalCoreClient *client, void *user_data, TemporalCoreClientRpcCallCallback callback); +/** + * Load all client profiles from given sources + */ +void temporal_core_client_config_load(const char *path, + struct TemporalCoreByteArrayRef data, + bool disable_file, + bool config_file_strict, + struct TemporalCoreByteArrayRef env_vars, + void *user_data, + TemporalCoreClientConfigCallback callback); + +/** + * Load a single client profile from given sources with env overrides + */ +void temporal_core_client_config_profile_load(const char *profile, + const char *path, + struct TemporalCoreByteArrayRef data, + bool disable_file, + bool disable_env, + bool config_file_strict, + struct TemporalCoreByteArrayRef env_vars, + void *user_data, + TemporalCoreClientConfigCallback callback); + struct TemporalCoreMetricMeter *temporal_core_metric_meter_new(struct TemporalCoreRuntime *runtime); void temporal_core_metric_meter_free(struct TemporalCoreMetricMeter *meter); diff --git a/core-c-bridge/src/envconfig.rs b/core-c-bridge/src/envconfig.rs new file mode 100644 index 000000000..f686c6b17 --- /dev/null +++ b/core-c-bridge/src/envconfig.rs @@ -0,0 +1,251 @@ +use crate::{ByteArray, ByteArrayRef}; +use serde::Serialize; +use std::collections::HashMap; +use std::ffi::CStr; +use temporal_sdk_core_api::envconfig::{ + self, ClientConfig as CoreClientConfig, ClientConfigCodec as CoreClientConfigCodec, + ClientConfigProfile as CoreClientConfigProfile, ClientConfigTLS as CoreClientConfigTLS, + DataSource as CoreDataSource, LoadClientConfigOptions, LoadClientConfigProfileOptions, +}; + +/// Callback for client config load operations. +/// If success or fail are not null, they must be manually freed when done. +pub type ClientConfigCallback = unsafe extern "C" fn( + user_data: *mut libc::c_void, + success: *const ByteArray, + fail: *const ByteArray, +); + +// Wrapper types for JSON serialization +#[derive(Serialize)] +struct ClientConfig { + profiles: HashMap, +} + +impl From for ClientConfig { + fn from(c: CoreClientConfig) -> Self { + Self { + profiles: c.profiles.into_iter().map(|(k, v)| (k, v.into())).collect(), + } + } +} + +#[derive(Serialize)] +struct ClientConfigProfile { + #[serde(skip_serializing_if = "Option::is_none")] + address: Option, + #[serde(skip_serializing_if = "Option::is_none")] + namespace: Option, + #[serde(skip_serializing_if = "Option::is_none")] + api_key: Option, + #[serde(skip_serializing_if = "Option::is_none")] + tls: Option, + #[serde(skip_serializing_if = "Option::is_none")] + codec: Option, + #[serde(skip_serializing_if = "HashMap::is_empty")] + grpc_meta: HashMap, +} + +impl From for ClientConfigProfile { + fn from(c: CoreClientConfigProfile) -> Self { + Self { + address: c.address, + namespace: c.namespace, + api_key: c.api_key, + tls: c.tls.map(Into::into), + codec: c.codec.map(Into::into), + grpc_meta: c.grpc_meta, + } + } +} + +#[derive(Serialize)] +struct ClientConfigTLS { + #[serde(skip_serializing_if = "std::ops::Not::not")] + disabled: bool, + #[serde(skip_serializing_if = "Option::is_none")] + server_name: Option, + #[serde(skip_serializing_if = "Option::is_none")] + server_ca_cert: Option, + #[serde(skip_serializing_if = "Option::is_none")] + client_cert: Option, + #[serde(skip_serializing_if = "Option::is_none")] + client_key: Option, +} + +impl From for ClientConfigTLS { + fn from(c: CoreClientConfigTLS) -> Self { + Self { + disabled: c.disabled, + server_name: c.server_name, + server_ca_cert: c.server_ca_cert.map(Into::into), + client_cert: c.client_cert.map(Into::into), + client_key: c.client_key.map(Into::into), + } + } +} + +#[derive(Serialize)] +struct ClientConfigCodec { + #[serde(skip_serializing_if = "Option::is_none")] + endpoint: Option, + #[serde(skip_serializing_if = "Option::is_none")] + auth: Option, +} + +impl From for ClientConfigCodec { + fn from(c: CoreClientConfigCodec) -> Self { + Self { + endpoint: c.endpoint, + auth: c.auth, + } + } +} + +#[derive(Serialize)] +struct DataSource { + #[serde(skip_serializing_if = "Option::is_none")] + path: Option, + #[serde(skip_serializing_if = "Option::is_none")] + data: Option, +} + +impl From for DataSource { + fn from(c: CoreDataSource) -> Self { + match c { + CoreDataSource::Path(p) => Self { + path: Some(p), + data: None, + }, + CoreDataSource::Data(d) => Self { + path: None, + data: Some(String::from_utf8_lossy(&d).to_string()), + }, + } + } +} + +// Helper functions +fn parse_config_source( + path: *const libc::c_char, + data: ByteArrayRef, +) -> Result, String> { + if !path.is_null() { + match unsafe { CStr::from_ptr(path) }.to_str() { + Ok(path_str) => Ok(Some(CoreDataSource::Path(path_str.to_string()))), + Err(e) => Err(format!("Invalid path UTF-8: {e}")), + } + } else if !data.data.is_null() && data.size > 0 { + Ok(Some(CoreDataSource::Data(data.to_vec()))) + } else { + Ok(None) + } +} + +fn parse_env_vars(env_vars: ByteArrayRef) -> Result>, String> { + if env_vars.data.is_null() || env_vars.size == 0 { + return Ok(None); + } + + let env_json = std::str::from_utf8(env_vars.to_slice()) + .map_err(|e| format!("Invalid env vars UTF-8: {e}"))?; + + serde_json::from_str(env_json) + .map(Some) + .map_err(|e| format!("Invalid env vars JSON: {e}")) +} + +fn send_result( + result: Result, + user_data: *mut libc::c_void, + callback: ClientConfigCallback, +) { + match result { + Ok(data) => match serde_json::to_vec(&data) { + Ok(json_bytes) => { + let result = ByteArray::from_vec(json_bytes); + unsafe { callback(user_data, result.into_raw(), std::ptr::null()) }; + } + Err(e) => { + let err = ByteArray::from_utf8(format!("Failed to serialize: {e}")); + unsafe { callback(user_data, std::ptr::null(), err.into_raw()) }; + } + }, + Err(e) => { + let err = ByteArray::from_utf8(e); + unsafe { callback(user_data, std::ptr::null(), err.into_raw()) }; + } + } +} + +/// Load all client profiles from given sources +#[unsafe(no_mangle)] +pub extern "C" fn temporal_core_client_config_load( + path: *const libc::c_char, + data: ByteArrayRef, + disable_file: bool, + config_file_strict: bool, + env_vars: ByteArrayRef, + user_data: *mut libc::c_void, + callback: ClientConfigCallback, +) { + let result = || -> Result { + let config_source = parse_config_source(path, data)?; + let env_vars_map = parse_env_vars(env_vars)?; + + let options = LoadClientConfigOptions { + config_source: if disable_file { None } else { config_source }, + config_file_strict, + }; + + let core_config = envconfig::load_client_config(options, env_vars_map.as_ref()) + .map_err(|e| e.to_string())?; + + Ok(core_config.into()) + }; + + send_result(result(), user_data, callback); +} + +/// Load a single client profile from given sources with env overrides +#[unsafe(no_mangle)] +pub extern "C" fn temporal_core_client_config_profile_load( + profile: *const libc::c_char, + path: *const libc::c_char, + data: ByteArrayRef, + disable_file: bool, + disable_env: bool, + config_file_strict: bool, + env_vars: ByteArrayRef, + user_data: *mut libc::c_void, + callback: ClientConfigCallback, +) { + let result = || -> Result { + let profile_name = if !profile.is_null() { + match unsafe { CStr::from_ptr(profile) }.to_str() { + Ok(s) => Some(s.to_string()), + Err(e) => return Err(format!("Invalid profile UTF-8: {e}")), + } + } else { + None + }; + + let config_source = parse_config_source(path, data)?; + let env_vars_map = parse_env_vars(env_vars)?; + + let options = LoadClientConfigProfileOptions { + config_source, + config_file_profile: profile_name, + config_file_strict, + disable_file, + disable_env, + }; + + let profile = envconfig::load_client_config_profile(options, env_vars_map.as_ref()) + .map_err(|e| e.to_string())?; + + Ok(profile.into()) + }; + + send_result(result(), user_data, callback); +} diff --git a/core-c-bridge/src/lib.rs b/core-c-bridge/src/lib.rs index a4427f3b0..643310b37 100644 --- a/core-c-bridge/src/lib.rs +++ b/core-c-bridge/src/lib.rs @@ -9,6 +9,7 @@ )] pub mod client; +pub mod envconfig; pub mod metric; pub mod random; pub mod runtime;