diff --git a/crates/application/src/cron_jobs/mod.rs b/crates/application/src/cron_jobs/mod.rs index af447367b..67ee32e79 100644 --- a/crates/application/src/cron_jobs/mod.rs +++ b/crates/application/src/cron_jobs/mod.rs @@ -639,6 +639,7 @@ impl CronJobContext { request_id.clone().unwrap_or_else(RequestId::new), execution_id.clone().unwrap_or_else(ExecutionId::new), caller.parent_scheduled_job(), + caller.remote_ip(), caller.is_root(), ); sentry::configure_scope(|scope| context.add_sentry_tags(scope)); diff --git a/crates/application/src/scheduled_jobs/mod.rs b/crates/application/src/scheduled_jobs/mod.rs index 27d7a7cb2..fffefa29b 100644 --- a/crates/application/src/scheduled_jobs/mod.rs +++ b/crates/application/src/scheduled_jobs/mod.rs @@ -832,6 +832,7 @@ impl ScheduledJobContext { request_id.clone().unwrap_or_else(RequestId::new), execution_id.clone().unwrap_or_else(ExecutionId::new), caller.parent_scheduled_job(), + caller.remote_ip(), caller.is_root(), ); sentry::configure_scope(|scope| context.add_sentry_tags(scope)); diff --git a/crates/application/src/tests/http_action.rs b/crates/application/src/tests/http_action.rs index 89f628e17..54eeadd1d 100644 --- a/crates/application/src/tests/http_action.rs +++ b/crates/application/src/tests/http_action.rs @@ -77,7 +77,7 @@ async fn test_http_action_basic(rt: TestRuntime) -> anyhow::Result<()> { common::RequestId::new(), http_request, Identity::system(), - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(None), response_streamer, ) .await?; @@ -134,7 +134,7 @@ async fn test_http_action_error(rt: TestRuntime) -> anyhow::Result<()> { common::RequestId::new(), http_request, Identity::system(), - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(None), response_streamer, ) .await?; @@ -191,7 +191,7 @@ async fn test_http_action_not_found(rt: TestRuntime) -> anyhow::Result<()> { common::RequestId::new(), http_request, Identity::system(), - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(None), response_streamer, ) .await?; @@ -243,7 +243,7 @@ async fn test_http_action_disconnect_before_head( common::RequestId::new(), http_request, Identity::system(), - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(None), response_streamer, )); select! { @@ -307,7 +307,7 @@ async fn test_http_action_disconnect_while_streaming( common::RequestId::new(), http_request, Identity::system(), - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(None), response_streamer, ) .await @@ -382,7 +382,7 @@ async fn test_http_action_continues_after_client_disconnects( common::RequestId::new(), http_request, Identity::system(), - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(None), response_streamer, ) .await @@ -414,7 +414,7 @@ async fn test_http_action_continues_after_client_disconnects( udf_path: "functions:didWrite".parse()?, }, vec![json!({})], - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(None), ExecuteQueryTimestamp::Latest, None, ) diff --git a/crates/application/src/tests/logging.rs b/crates/application/src/tests/logging.rs index e55654bba..e6fd65ba6 100644 --- a/crates/application/src/tests/logging.rs +++ b/crates/application/src/tests/logging.rs @@ -83,7 +83,7 @@ async fn test_udf_logs(rt: TestRuntime) -> anyhow::Result<()> { PublicFunctionPath::Component(path), vec![], Identity::system(), - FunctionCaller::SyncWorker(ClientVersion::unknown()), + FunctionCaller::SyncWorker(ClientVersion::unknown(), None), ) .await?; assert!(result.result.is_ok()); diff --git a/crates/application/src/tests/returns_validation.rs b/crates/application/src/tests/returns_validation.rs index d5264e588..9b4ea4536 100644 --- a/crates/application/src/tests/returns_validation.rs +++ b/crates/application/src/tests/returns_validation.rs @@ -40,7 +40,7 @@ async fn run_zero_arg_mutation( vec![obj], Identity::user(UserIdentity::test()), None, - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(None), None, ) .await @@ -60,7 +60,7 @@ async fn run_zero_arg_query( }), vec![obj], Identity::user(UserIdentity::test()), - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(None), ) .await } @@ -79,7 +79,7 @@ async fn run_zero_arg_action( }), vec![obj], Identity::user(UserIdentity::test()), - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(None), ) .await } diff --git a/crates/common/src/execution_context.rs b/crates/common/src/execution_context.rs index 5e3e33dae..1dfc9a1f9 100644 --- a/crates/common/src/execution_context.rs +++ b/crates/common/src/execution_context.rs @@ -3,6 +3,7 @@ use std::{ Display, Formatter, }, + net::SocketAddr, str::FromStr, }; @@ -39,6 +40,8 @@ pub struct ExecutionContext { pub execution_id: ExecutionId, /// The id of the scheduled job that triggered this UDF, if any. pub parent_scheduled_job: Option<(ComponentId, DeveloperDocumentId)>, + /// The remote IP address of the client that initiated this request + pub remote_ip: Option, /// False if this function was called as part of a request (e.g. action /// calling a mutation) TODO: This is a stop gap solution. The richer /// version of this would be something like parent_execution_id: @@ -48,10 +51,26 @@ pub struct ExecutionContext { impl ExecutionContext { pub fn new(request_id: RequestId, caller: &FunctionCaller) -> Self { + let remote_ip = caller.remote_ip(); Self { request_id, execution_id: ExecutionId::new(), parent_scheduled_job: caller.parent_scheduled_job(), + remote_ip, + is_root: caller.is_root(), + } + } + + pub fn new_with_remote_ip( + request_id: RequestId, + caller: &FunctionCaller, + remote_ip: Option, + ) -> Self { + Self { + request_id, + execution_id: ExecutionId::new(), + parent_scheduled_job: caller.parent_scheduled_job(), + remote_ip, is_root: caller.is_root(), } } @@ -60,12 +79,14 @@ impl ExecutionContext { request_id: RequestId, execution_id: ExecutionId, parent_scheduled_job: Option<(ComponentId, DeveloperDocumentId)>, + remote_ip: Option, is_root: bool, ) -> Self { Self { request_id, execution_id, parent_scheduled_job, + remote_ip, is_root, } } @@ -80,6 +101,7 @@ impl ExecutionContext { request_id: RequestId::new(), execution_id: ExecutionId::new(), parent_scheduled_job: None, + remote_ip: None, is_root: true, } } @@ -97,6 +119,9 @@ impl HeapSize for ExecutionContext { + self .parent_scheduled_job .map_or(0, |(_, document_id)| document_id.heap_size()) + + self + .remote_ip + .map_or(0, |_| std::mem::size_of::()) + self.is_root.heap_size() } } @@ -259,6 +284,7 @@ impl From for pb::common::ExecutionContext { pb::common::ExecutionContext { request_id: Some(value.request_id.into()), execution_id: Some(value.execution_id.to_string()), + remote_ip: value.remote_ip.map(|addr| addr.to_string()), parent_scheduled_job_component_id: parent_component_id .and_then(|id| id.serialize_to_string()), parent_scheduled_job: parent_document_id.map(Into::into), @@ -275,6 +301,11 @@ impl TryFrom for ExecutionContext { value.parent_scheduled_job_component_id.as_deref(), )?; let parent_document_id = value.parent_scheduled_job.map(|s| s.parse()).transpose()?; + let remote_ip = value + .remote_ip + .map(|s| s.parse()) + .transpose() + .context("Invalid remote IP address")?; Ok(Self { request_id: RequestId::from_str(&value.request_id.context("Missing request id")?)?, execution_id: match &value.execution_id { @@ -282,6 +313,7 @@ impl TryFrom for ExecutionContext { None => ExecutionId::new(), }, parent_scheduled_job: parent_document_id.map(|id| (parent_component_id, id)), + remote_ip, is_root: value.is_root.unwrap_or_default(), }) } @@ -290,10 +322,12 @@ impl TryFrom for ExecutionContext { impl From for JsonValue { fn from(value: ExecutionContext) -> Self { let (parent_component_id, parent_document_id) = value.parent_scheduled_job.unzip(); + let remote_ip_str = value.remote_ip.map(|addr| addr.to_string()); json!({ "requestId": String::from(value.request_id), "executionId": value.execution_id.to_string(), "isRoot": value.is_root, + "remoteIp": remote_ip_str, "parentScheduledJob": parent_document_id.map(|id| id.to_string()), "parentScheduledJobComponentId": parent_component_id.unwrap_or(ComponentId::Root).serialize_to_string(), }) diff --git a/crates/common/src/types/functions.rs b/crates/common/src/types/functions.rs index e839b9a86..b1c8121d9 100644 --- a/crates/common/src/types/functions.rs +++ b/crates/common/src/types/functions.rs @@ -3,6 +3,7 @@ use std::{ self, Debug, }, + net::SocketAddr, str::FromStr, }; @@ -145,13 +146,13 @@ pub enum AllowedVisibility { #[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)] #[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub enum FunctionCaller { - SyncWorker(ClientVersion), - HttpApi(ClientVersion), + SyncWorker(ClientVersion, Option), + HttpApi(ClientVersion, Option), /// Used by function tester in the dashboard Tester(ClientVersion), // This is a user defined http actions called externally. If the http action // calls other functions, their caller would be `Action`. - HttpEndpoint, + HttpEndpoint(Option), Cron, Scheduler { job_id: DeveloperDocumentId, @@ -168,10 +169,10 @@ pub enum FunctionCaller { impl FunctionCaller { pub fn client_version(&self) -> Option { match self { - FunctionCaller::SyncWorker(c) => Some(c), - FunctionCaller::HttpApi(c) => Some(c), + FunctionCaller::SyncWorker(c, _) => Some(c), + FunctionCaller::HttpApi(c, _) => Some(c), FunctionCaller::Tester(c) => Some(c), - FunctionCaller::HttpEndpoint + FunctionCaller::HttpEndpoint(_) | FunctionCaller::Cron | FunctionCaller::Scheduler { .. } | FunctionCaller::Action { .. } => None, @@ -183,10 +184,10 @@ impl FunctionCaller { pub fn parent_scheduled_job(&self) -> Option<(ComponentId, DeveloperDocumentId)> { match self { - FunctionCaller::SyncWorker(_) - | FunctionCaller::HttpApi(_) + FunctionCaller::SyncWorker(_, _) + | FunctionCaller::HttpApi(_, _) | FunctionCaller::Tester(_) - | FunctionCaller::HttpEndpoint + | FunctionCaller::HttpEndpoint(_) | FunctionCaller::Cron => None, #[cfg(any(test, feature = "testing"))] FunctionCaller::Test => None, @@ -200,12 +201,27 @@ impl FunctionCaller { } } + pub fn remote_ip(&self) -> Option { + let remote_ip = match self { + FunctionCaller::SyncWorker(_, remote_ip) + | FunctionCaller::HttpApi(_, remote_ip) + | FunctionCaller::HttpEndpoint(remote_ip) => *remote_ip, + FunctionCaller::Tester(_) + | FunctionCaller::Cron + | FunctionCaller::Scheduler { .. } + | FunctionCaller::Action { .. } => None, + #[cfg(any(test, feature = "testing"))] + FunctionCaller::Test => None, + }; + remote_ip + } + pub fn is_root(&self) -> bool { match self { - FunctionCaller::SyncWorker(_) - | FunctionCaller::HttpApi(_) + FunctionCaller::SyncWorker(_, _) + | FunctionCaller::HttpApi(_, _) | FunctionCaller::Tester(_) - | FunctionCaller::HttpEndpoint + | FunctionCaller::HttpEndpoint(_) | FunctionCaller::Cron | FunctionCaller::Scheduler { .. } => true, FunctionCaller::Action { .. } => false, @@ -219,9 +235,9 @@ impl FunctionCaller { // to run it even if the client goes away. However, we preserve the right // to interrupt actions if the backend restarts. match self { - FunctionCaller::SyncWorker(_) - | FunctionCaller::HttpApi(_) - | FunctionCaller::HttpEndpoint + FunctionCaller::SyncWorker(_, _) + | FunctionCaller::HttpApi(_, _) + | FunctionCaller::HttpEndpoint(_) | FunctionCaller::Tester(_) => true, FunctionCaller::Cron | FunctionCaller::Scheduler { .. } @@ -233,13 +249,13 @@ impl FunctionCaller { pub fn allowed_visibility(&self) -> AllowedVisibility { match self { - FunctionCaller::SyncWorker(_) | FunctionCaller::HttpApi(_) => { + FunctionCaller::SyncWorker(_, _) | FunctionCaller::HttpApi(_, _) => { AllowedVisibility::PublicOnly }, // NOTE: Allowed visibility doesn't make sense in the context of an // user defined http action since all http actions are public, and // we shouldn't be checking visibility. We define this for completeness. - FunctionCaller::HttpEndpoint => AllowedVisibility::PublicOnly, + FunctionCaller::HttpEndpoint(_) => AllowedVisibility::PublicOnly, FunctionCaller::Tester(_) | FunctionCaller::Cron | FunctionCaller::Scheduler { .. } @@ -253,10 +269,10 @@ impl FunctionCaller { impl fmt::Display for FunctionCaller { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let s = match self { - FunctionCaller::SyncWorker(_) => "SyncWorker", - FunctionCaller::HttpApi(_) => "HttpApi", + FunctionCaller::SyncWorker(_, _) => "SyncWorker", + FunctionCaller::HttpApi(_, _) => "HttpApi", FunctionCaller::Tester(_) => "Tester", - FunctionCaller::HttpEndpoint => "HttpEndpoint", + FunctionCaller::HttpEndpoint(_) => "HttpEndpoint", FunctionCaller::Cron => "Cron", FunctionCaller::Scheduler { .. } => "Scheduler", FunctionCaller::Action { .. } => "Action", @@ -270,16 +286,16 @@ impl fmt::Display for FunctionCaller { impl From for pb::common::FunctionCaller { fn from(caller: FunctionCaller) -> Self { let caller = match caller { - FunctionCaller::SyncWorker(client_version) => { + FunctionCaller::SyncWorker(client_version, _) => { pb::common::function_caller::Caller::SyncWorker(client_version.into()) }, - FunctionCaller::HttpApi(client_version) => { + FunctionCaller::HttpApi(client_version, _) => { pb::common::function_caller::Caller::HttpApi(client_version.into()) }, FunctionCaller::Tester(client_version) => { pb::common::function_caller::Caller::Tester(client_version.into()) }, - FunctionCaller::HttpEndpoint => pb::common::function_caller::Caller::HttpEndpoint(()), + FunctionCaller::HttpEndpoint(_) => pb::common::function_caller::Caller::HttpEndpoint(()), FunctionCaller::Cron => pb::common::function_caller::Caller::Cron(()), FunctionCaller::Scheduler { job_id, @@ -318,16 +334,16 @@ impl TryFrom for FunctionCaller { fn try_from(msg: pb::common::FunctionCaller) -> anyhow::Result { let caller = match msg.caller { Some(pb::common::function_caller::Caller::SyncWorker(client_version)) => { - FunctionCaller::SyncWorker(client_version.try_into()?) + FunctionCaller::SyncWorker(client_version.try_into()?, None) }, Some(pb::common::function_caller::Caller::HttpApi(client_version)) => { - FunctionCaller::HttpApi(client_version.try_into()?) + FunctionCaller::HttpApi(client_version.try_into()?, None) }, Some(pb::common::function_caller::Caller::Tester(client_version)) => { FunctionCaller::Tester(client_version.try_into()?) }, Some(pb::common::function_caller::Caller::HttpEndpoint(())) => { - FunctionCaller::HttpEndpoint + FunctionCaller::HttpEndpoint(None) }, Some(pb::common::function_caller::Caller::Cron(())) => FunctionCaller::Cron, Some(pb::common::function_caller::Caller::Scheduler(caller)) => { diff --git a/crates/isolate/src/environment/udf/mod.rs b/crates/isolate/src/environment/udf/mod.rs index e339b3b49..24ccc79f1 100644 --- a/crates/isolate/src/environment/udf/mod.rs +++ b/crates/isolate/src/environment/udf/mod.rs @@ -999,4 +999,8 @@ impl DatabaseUdfEnvironment { } Ok(()) } + + pub fn execution_context(&self) -> &ExecutionContext { + &self.context + } } diff --git a/crates/isolate/src/isolate2/callback_context.rs b/crates/isolate/src/isolate2/callback_context.rs index 46e8b9ed1..741a80620 100644 --- a/crates/isolate/src/isolate2/callback_context.rs +++ b/crates/isolate/src/isolate2/callback_context.rs @@ -595,5 +595,10 @@ mod op_provider { fn remove_text_decoder(&mut self, uuid: &Uuid) -> anyhow::Result { self.context_state()?.remove_text_decoder(uuid) } + + fn get_execution_context(&mut self) -> anyhow::Result> { + let state = self.context_state()?; + state.environment.get_execution_context() + } } } diff --git a/crates/isolate/src/isolate2/context.rs b/crates/isolate/src/isolate2/context.rs index ce8f27851..953f38089 100644 --- a/crates/isolate/src/isolate2/context.rs +++ b/crates/isolate/src/isolate2/context.rs @@ -83,6 +83,7 @@ impl Context { let op_key = strings::op.create(&mut scope)?; convex_value.set(&mut scope, op_key.into(), op_value.into()); + let async_op_template = v8::FunctionTemplate::new(&mut scope, CallbackContext::start_async_op); let async_op_value = async_op_template diff --git a/crates/isolate/src/isolate2/environment.rs b/crates/isolate/src/isolate2/environment.rs index 75b09f3f0..24652ec8c 100644 --- a/crates/isolate/src/isolate2/environment.rs +++ b/crates/isolate/src/isolate2/environment.rs @@ -44,4 +44,6 @@ pub trait Environment { fn finish_execution(&mut self) -> anyhow::Result; fn get_all_table_mappings(&mut self) -> anyhow::Result; + + fn get_execution_context(&mut self) -> anyhow::Result>; } diff --git a/crates/isolate/src/isolate2/runner.rs b/crates/isolate/src/isolate2/runner.rs index 921d17a12..96e5bddf9 100644 --- a/crates/isolate/src/isolate2/runner.rs +++ b/crates/isolate/src/isolate2/runner.rs @@ -243,6 +243,8 @@ struct UdfEnvironment { #[allow(unused)] env_vars: PreloadedEnvironmentVariables, + + execution_context: ExecutionContext, } impl UdfEnvironment { @@ -254,6 +256,7 @@ impl UdfEnvironment { shared: UdfShared, env_vars: PreloadedEnvironmentVariables, log_line_sender: spsc::Sender, + execution_context: ExecutionContext, ) -> Self { let rng = ChaCha12Rng::from_seed(import_time_seed.rng_seed); Self { @@ -270,6 +273,7 @@ impl UdfEnvironment { shared, env_vars, + execution_context, } } @@ -470,6 +474,11 @@ impl Environment for UdfEnvironment { self.check_executing()?; Ok(self.shared.get_all_table_mappings()) } + + fn get_execution_context(&mut self) -> anyhow::Result> { + let context_json: serde_json::Value = self.execution_context.clone().into(); + Ok(Some(context_json)) + } } async fn run_request( @@ -1063,6 +1072,7 @@ pub async fn run_isolate_v2_udf( shared.clone(), env_vars, log_line_sender, + context.clone(), ); // The protocol is synchronous, so there should never be more than diff --git a/crates/isolate/src/ops/mod.rs b/crates/isolate/src/ops/mod.rs index 535f39448..9ac00ee7c 100644 --- a/crates/isolate/src/ops/mod.rs +++ b/crates/isolate/src/ops/mod.rs @@ -43,10 +43,14 @@ use crypto::{ op_crypto_encrypt, }; use deno_core::{ + serde_v8, v8, ModuleSpecifier, }; use rand_chacha::ChaCha12Rng; +use runtime::prod::ProdRuntime; +#[cfg(any(test, feature = "testing"))] +use runtime::testing::TestRuntime; use sourcemap::SourceMap; use structured_clone::op_structured_clone; use uuid::Uuid; @@ -201,6 +205,8 @@ pub trait OpProvider<'b> { -> anyhow::Result>; fn get_all_table_mappings(&mut self) -> anyhow::Result; + + fn get_execution_context(&mut self) -> anyhow::Result>; } impl<'a, 'b: 'a, RT: Runtime, E: IsolateEnvironment> OpProvider<'b> @@ -350,6 +356,45 @@ impl<'a, 'b: 'a, RT: Runtime, E: IsolateEnvironment> OpProvider<'b> let state = self.state_mut()?; state.environment.get_all_table_mappings() } + + fn get_execution_context(&mut self) -> anyhow::Result> { + use std::any::{ + Any, + TypeId, + }; + + use crate::environment::udf::DatabaseUdfEnvironment; + + let state = self.state_mut()?; + + // Check if this is a DatabaseUdfEnvironment by checking type ID + let environment_any = &state.environment as &dyn Any; + let type_id = environment_any.type_id(); + + // Try common runtime types + if type_id == TypeId::of::>() { + if let Some(udf_env) = + environment_any.downcast_ref::>() + { + let context_json: serde_json::Value = udf_env.execution_context().clone().into(); + return Ok(Some(context_json)); + } + } + + // Add other runtime types as needed + #[cfg(any(test, feature = "testing"))] + if type_id == TypeId::of::>() { + if let Some(udf_env) = + environment_any.downcast_ref::>() + { + let context_json: serde_json::Value = udf_env.execution_context().clone().into(); + return Ok(Some(context_json)); + } + } + + // Not in UDF environment - return None + Ok(None) + } } pub fn run_op<'b, P: OpProvider<'b>>( @@ -428,6 +473,7 @@ pub fn run_op<'b, P: OpProvider<'b>>( "crypto/exportPkcs8X25519" => op_crypto_export_pkcs8_x25519(provider, args, rv)?, "crypto/generateKeyPair" => op_crypto_generate_keypair(provider, args, rv)?, "crypto/generateKeyBytes" => op_crypto_generate_key_bytes(provider, args, rv)?, + "getRemoteIp" => op_get_remote_ip(provider, args, rv)?, _ => { anyhow::bail!(ErrorMetadata::bad_request( "UnknownOperation", @@ -473,3 +519,29 @@ pub fn start_async_op<'b, P: OpProvider<'b>>( rv.set(promise.into()); Ok(()) } + +// Get remote IP address for the current request +pub fn op_get_remote_ip<'b, P: OpProvider<'b>>( + provider: &mut P, + _args: v8::FunctionCallbackArguments, + mut rv: v8::ReturnValue, +) -> anyhow::Result<()> { + // Get the execution context and extract remote IP + let context_json = provider.get_execution_context()?; + + let result = if let Some(ctx) = context_json { + if let Some(ip) = ctx.get("remoteIp").and_then(|ip| ip.as_str()) { + serde_json::Value::String(ip.to_string()) + } else { + serde_json::Value::Null + } + } else { + serde_json::Value::Null + }; + + // Convert to V8 value and return + let v8_value = serde_v8::to_v8(provider.scope(), result)?; + rv.set(v8_value); + + Ok(()) +} diff --git a/crates/local_backend/src/http_actions.rs b/crates/local_backend/src/http_actions.rs index 3bf5c2112..fae5140ff 100644 --- a/crates/local_backend/src/http_actions.rs +++ b/crates/local_backend/src/http_actions.rs @@ -155,6 +155,7 @@ impl FromRequest for ExtractHttpRequestMetadata { #[debug_handler] pub async fn http_any_method( State(st): State, + remote_addr: axum::extract::ConnectInfo, TryExtractIdentity(identity_result): TryExtractIdentity, ExtractRequestId(request_id): ExtractRequestId, ExtractResolvedHostname(host): ExtractResolvedHostname, @@ -172,6 +173,7 @@ pub async fn http_any_method( http_request_metadata, identity, st.api.clone(), + remote_addr.0, ); let head = http_response_stream.try_next().await?; let Some(HttpActionResponsePart::Head(response_head)) = head else { @@ -212,6 +214,7 @@ async fn stream_http_response( http_request_metadata: HttpActionRequest, identity: Identity, application: Arc, + remote_addr: std::net::SocketAddr, ) { let (http_response_sender, http_response_receiver) = mpsc::unbounded_channel(); @@ -222,7 +225,7 @@ async fn stream_http_response( request_id, http_request_metadata, identity, - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(Some(remote_addr)), HttpActionResponseStreamer::new(http_response_sender), ) .fuse(); diff --git a/crates/local_backend/src/node_action_callbacks.rs b/crates/local_backend/src/node_action_callbacks.rs index 9726b2d07..ceb58aeb3 100644 --- a/crates/local_backend/src/node_action_callbacks.rs +++ b/crates/local_backend/src/node_action_callbacks.rs @@ -716,6 +716,7 @@ impl FromRequestParts for ExtractExecutionContext { request_id, execution_id, parent_job_id.map(|id| (parent_component_id, id)), + None, // remote_ip not available in node action callbacks is_root, ))) } diff --git a/crates/local_backend/src/public_api.rs b/crates/local_backend/src/public_api.rs index 72a70d78d..28abc2ebc 100644 --- a/crates/local_backend/src/public_api.rs +++ b/crates/local_backend/src/public_api.rs @@ -183,6 +183,7 @@ impl UdfResponse { /// Executes an arbitrary query/mutation/action from its name. pub async fn public_function_post( State(st): State, + remote_addr: axum::extract::ConnectInfo, ExtractResolvedHostname(host): ExtractResolvedHostname, ExtractRequestId(request_id): ExtractRequestId, ExtractAuthenticationToken(auth_token): ExtractAuthenticationToken, @@ -212,7 +213,7 @@ pub async fn public_function_post( identity, component_function_path, req.args.into_arg_vec(), - FunctionCaller::HttpApi(client_version.clone()), + FunctionCaller::HttpApi(client_version.clone(), Some(remote_addr.0)), ) .await?; let value_format = req.format.as_ref().map(|f| f.parse()).transpose()?; @@ -242,6 +243,7 @@ pub struct UdfPostRequestArgsOnly { /// request and doesn't require admin auth. pub async fn public_function_post_with_path( State(st): State, + remote_addr: axum::extract::ConnectInfo, ExtractResolvedHostname(host): ExtractResolvedHostname, Path(path): Path, ExtractRequestId(request_id): ExtractRequestId, @@ -289,7 +291,7 @@ pub async fn public_function_post_with_path( udf_path, }, req.args.into_arg_vec(), - FunctionCaller::HttpApi(client_version.clone()), + FunctionCaller::HttpApi(client_version.clone(), Some(remote_addr.0)), ) .await?; // Default to ConvexCleanJSON if no format is provided. @@ -328,6 +330,7 @@ pub fn export_value( #[fastrace::trace(properties = { "udf_type": "query"})] pub async fn public_query_get( State(st): State, + remote_addr: axum::extract::ConnectInfo, Query(req): Query, ExtractResolvedHostname(host): ExtractResolvedHostname, ExtractRequestId(request_id): ExtractRequestId, @@ -353,7 +356,7 @@ pub async fn public_query_get( identity, export_path, args, - FunctionCaller::HttpApi(client_version.clone()), + FunctionCaller::HttpApi(client_version.clone(), Some(remote_addr.0)), ExecuteQueryTimestamp::Latest, journal, ) @@ -373,6 +376,7 @@ pub async fn public_query_get( #[fastrace::trace(properties = { "udf_type": "query"})] pub async fn public_query_post( State(st): State, + remote_addr: axum::extract::ConnectInfo, ExtractResolvedHostname(host): ExtractResolvedHostname, ExtractRequestId(request_id): ExtractRequestId, ExtractAuthenticationToken(auth_token): ExtractAuthenticationToken, @@ -397,7 +401,7 @@ pub async fn public_query_post( identity, udf_path, req.args.into_arg_vec(), - FunctionCaller::HttpApi(client_version.clone()), + FunctionCaller::HttpApi(client_version.clone(), Some(remote_addr.0)), ExecuteQueryTimestamp::Latest, journal, ) @@ -416,9 +420,9 @@ pub async fn public_query_post( } pub async fn public_get_query_ts( + State(st): State, ExtractResolvedHostname(host): ExtractResolvedHostname, ExtractRequestId(request_id): ExtractRequestId, - State(st): State, ) -> Result { let ts = *(st.api.latest_timestamp(&host, request_id).await?); Ok(Json(Ts { ts: ts.into() })) @@ -427,6 +431,7 @@ pub async fn public_get_query_ts( #[fastrace::trace(properties = { "udf_type": "query"})] pub async fn public_query_at_ts_post( State(st): State, + remote_addr: axum::extract::ConnectInfo, ExtractResolvedHostname(host): ExtractResolvedHostname, ExtractRequestId(request_id): ExtractRequestId, ExtractAuthenticationToken(auth_token): ExtractAuthenticationToken, @@ -452,7 +457,7 @@ pub async fn public_query_at_ts_post( identity, export_path, req.args.into_arg_vec(), - FunctionCaller::HttpApi(client_version.clone()), + FunctionCaller::HttpApi(client_version.clone(), Some(remote_addr.0)), ExecuteQueryTimestamp::At(ts), journal, ) @@ -482,6 +487,7 @@ pub struct QueryBatchResponse { pub async fn public_query_batch_post( State(st): State, + remote_addr: axum::extract::ConnectInfo, ExtractResolvedHostname(host): ExtractResolvedHostname, ExtractRequestId(request_id): ExtractRequestId, ExtractAuthenticationToken(auth_token): ExtractAuthenticationToken, @@ -506,7 +512,7 @@ pub async fn public_query_batch_post( identity.clone(), export_path, req.args.into_arg_vec(), - FunctionCaller::HttpApi(client_version.clone()), + FunctionCaller::HttpApi(client_version.clone(), Some(remote_addr.0)), ExecuteQueryTimestamp::At(*ts), None, ) @@ -531,6 +537,7 @@ pub async fn public_query_batch_post( #[fastrace::trace(properties = { "udf_type": "mutation"})] pub async fn public_mutation_post( State(st): State, + remote_addr: axum::extract::ConnectInfo, ExtractResolvedHostname(host): ExtractResolvedHostname, ExtractRequestId(request_id): ExtractRequestId, ExtractAuthenticationToken(auth_token): ExtractAuthenticationToken, @@ -554,7 +561,7 @@ pub async fn public_mutation_post( identity, export_path, req.args.into_arg_vec(), - FunctionCaller::HttpApi(client_version.clone()), + FunctionCaller::HttpApi(client_version.clone(), Some(remote_addr.0)), None, None, ) @@ -578,6 +585,7 @@ pub async fn public_mutation_post( #[fastrace::trace(properties = { "udf_type": "action"})] pub async fn public_action_post( State(st): State, + remote_addr: axum::extract::ConnectInfo, ExtractResolvedHostname(host): ExtractResolvedHostname, ExtractRequestId(request_id): ExtractRequestId, ExtractAuthenticationToken(auth_token): ExtractAuthenticationToken, @@ -602,7 +610,7 @@ pub async fn public_action_post( identity, export_path, req.args.into_arg_vec(), - FunctionCaller::HttpApi(client_version.clone()), + FunctionCaller::HttpApi(client_version.clone(), Some(remote_addr.0)), ) .await?; let value_format = req.format.as_ref().map(|f| f.parse()).transpose()?; diff --git a/crates/local_backend/src/subs/mod.rs b/crates/local_backend/src/subs/mod.rs index 96dc9ab57..63d1a3201 100644 --- a/crates/local_backend/src/subs/mod.rs +++ b/crates/local_backend/src/subs/mod.rs @@ -130,8 +130,11 @@ impl Drop for DebugSyncSocketDropToken { // on a close frame and close the WebSocket. They can also signal clean shutdown // by returning `Ok(())`, and once all of them have cleanly exited, we'll // gracefully close the socket. -async fn run_sync_socket( + + +async fn run_sync_socket_with_remote_ip( st: RouterState, + remote_ip: Option, host: ResolvedHostname, config: SyncWorkerConfig, socket: WebSocket, @@ -232,10 +235,11 @@ async fn run_sync_socket( let mut identity_version: Option = None; let sync_worker_go = async { let _sync_worker_drop_token = DebugSyncSocketDropToken::new("sync_worker"); - let mut sync_worker = SyncWorker::new( + let mut sync_worker = SyncWorker::new_with_remote_ip( st.api.clone(), st.runtime.clone(), host, + remote_ip, config.clone(), client_rx, server_tx, @@ -348,6 +352,17 @@ pub async fn sync_handler( client_version: ClientVersion, ws: WebSocketUpgrade, on_connect: Box, +) -> Result { + sync_handler_with_remote_ip(st, None, host, client_version, ws, on_connect).await +} + +pub async fn sync_handler_with_remote_ip( + st: RouterState, + remote_ip: Option, + host: ResolvedHostname, + client_version: ClientVersion, + ws: WebSocketUpgrade, + on_connect: Box, ) -> Result { let config = new_sync_worker_config(client_version)?; // Make a copy of the Sentry scope, which contains the request metadata. @@ -359,18 +374,19 @@ pub async fn sync_handler( upgrade_timer.finish(); let monitor = ProdRuntime::task_monitor("sync_socket"); monitor.instrument( - run_sync_socket(st, host, config, ws, sentry_scope, on_connect).bind_hub(hub), + run_sync_socket_with_remote_ip(st, remote_ip, host, config, ws, sentry_scope, on_connect).bind_hub(hub), ) })) } pub async fn sync( State(st): State, + remote_addr: axum::extract::ConnectInfo, ExtractResolvedHostname(host): ExtractResolvedHostname, ExtractClientVersion(client_version): ExtractClientVersion, ws: WebSocketUpgrade, ) -> Result { - sync_handler(st, host, client_version, ws, Box::new(|_session_id| ())).await + sync_handler_with_remote_ip(st, Some(remote_addr.0), host, client_version, ws, Box::new(|_session_id| ())).await } #[cfg(test)] diff --git a/crates/pb/protos/common.proto b/crates/pb/protos/common.proto index 515ccb2fa..bf6c395ad 100644 --- a/crates/pb/protos/common.proto +++ b/crates/pb/protos/common.proto @@ -98,6 +98,7 @@ message ExecutionContext { optional string request_id = 2; optional string execution_id = 3; optional bool is_root = 4; + optional string remote_ip = 6; } enum UdfType { diff --git a/crates/sync/src/tests.rs b/crates/sync/src/tests.rs index f955cb649..f28e8d802 100644 --- a/crates/sync/src/tests.rs +++ b/crates/sync/src/tests.rs @@ -873,7 +873,7 @@ async fn test_udf_cache_out_of_order(rt: TestRuntime) -> anyhow::Result<()> { Identity::Unknown(None), ts2, None, - FunctionCaller::SyncWorker(ClientVersion::unknown()), + FunctionCaller::SyncWorker(ClientVersion::unknown(), None), ) .await?; assert_eq!(result1.result?.unpack(), ConvexValue::from(5.0)); @@ -887,7 +887,7 @@ async fn test_udf_cache_out_of_order(rt: TestRuntime) -> anyhow::Result<()> { Identity::Unknown(None), ts1, None, - FunctionCaller::SyncWorker(ClientVersion::unknown()), + FunctionCaller::SyncWorker(ClientVersion::unknown(), None), ) .await?; assert_eq!(result2.result?.unpack(), ConvexValue::from(0.0)); diff --git a/crates/sync/src/worker.rs b/crates/sync/src/worker.rs index 64cb00ac3..b6b1d360a 100644 --- a/crates/sync/src/worker.rs +++ b/crates/sync/src/worker.rs @@ -216,6 +216,7 @@ pub struct SyncWorker { rt: RT, state: SyncState, host: ResolvedHostname, + remote_ip: Option, rx: mpsc::UnboundedReceiver<(ClientMessage, tokio::time::Instant)>, tx: SingleFlightSender, @@ -265,6 +266,19 @@ impl SyncWorker { rx: mpsc::UnboundedReceiver<(ClientMessage, tokio::time::Instant)>, tx: SingleFlightSender, on_connect: Box, + ) -> Self { + Self::new_with_remote_ip(api, rt, host, None, config, rx, tx, on_connect) + } + + pub fn new_with_remote_ip( + api: Arc, + rt: RT, + host: ResolvedHostname, + remote_ip: Option, + config: SyncWorkerConfig, + rx: mpsc::UnboundedReceiver<(ClientMessage, tokio::time::Instant)>, + tx: SingleFlightSender, + on_connect: Box, ) -> Self { let (mutation_sender, receiver) = mpsc::channel(OPERATION_QUEUE_BUFFER_SIZE); let mutation_futures = ReceiverStream::new(receiver).buffered(1); // Execute at most one operation at a time. @@ -274,6 +288,7 @@ impl SyncWorker { rt, state: SyncState::new(), host, + remote_ip, rx, tx, mutation_futures, @@ -499,7 +514,8 @@ impl SyncWorker { let timer = mutation_queue_timer(); let api = self.api.clone(); let host = self.host.clone(); - let caller = FunctionCaller::SyncWorker(client_version); + let remote_ip = self.remote_ip; + let caller = FunctionCaller::SyncWorker(client_version, remote_ip); let mutation_queue_size = self.mutation_sender.max_capacity() - self.mutation_sender.capacity(); @@ -587,6 +603,7 @@ impl SyncWorker { let api = self.api.clone(); let host = self.host.clone(); let client_version = self.config.client_version.clone(); + let remote_ip = self.remote_ip; let server_request_id = match self.state.session_id() { Some(id) => RequestId::new_for_ws_session(id, request_id), None => RequestId::new(), @@ -601,7 +618,7 @@ impl SyncWorker { }, ); let future = async move { - let caller = FunctionCaller::SyncWorker(client_version); + let caller = FunctionCaller::SyncWorker(client_version, remote_ip); let result = match component_path { None => { api.execute_public_action( @@ -764,6 +781,7 @@ impl SyncWorker { let need_fetch: Vec<_> = self.state.need_fetch().collect(); let host = self.host.clone(); let client_version = self.config.client_version.clone(); + let remote_ip = self.remote_ip; Ok(async move { let future_results: anyhow::Result> = try_join_buffer_unordered( "update_query", @@ -791,7 +809,7 @@ impl SyncWorker { None => { // We failed to refresh the subscription or it was invalid to start // with. Rerun the query. - let caller = FunctionCaller::SyncWorker(client_version); + let caller = FunctionCaller::SyncWorker(client_version, remote_ip); let ts = ExecuteQueryTimestamp::At(new_ts); // This query run might have been triggered due to invalidation diff --git a/npm-packages/convex/src/server/impl/registration_impl.ts b/npm-packages/convex/src/server/impl/registration_impl.ts index aaa422387..f675c94f3 100644 --- a/npm-packages/convex/src/server/impl/registration_impl.ts +++ b/npm-packages/convex/src/server/impl/registration_impl.ts @@ -40,6 +40,28 @@ import { performAsyncSyscall } from "./syscall.js"; import { asObjectValidator } from "../../values/validator.js"; import { getFunctionAddress } from "../components/paths.js"; +// Declare the global Convex object +declare const Convex: { + syscall: (op: string, jsonArgs: string) => string; + asyncSyscall: (op: string, jsonArgs: string) => Promise; + jsSyscall: (op: string, args: Record) => any; + op: (opName: string, ...args: any[]) => any; +}; + +// Get remote IP address for the current request (lazy loaded) +function getRemoteIp(): string | null { + try { + if (typeof Convex === "undefined" || Convex.op === undefined) { + return null; + } + // Use synchronous op call - fast and only called when needed + const remoteIp = Convex.op("getRemoteIp"); + return remoteIp; + } catch (error) { + return null; + } +} + async function invokeMutation< F extends (ctx: GenericMutationCtx, ...args: any) => any, >(func: F, argsStr: string) { @@ -56,6 +78,9 @@ async function invokeMutation< runQuery: (reference: any, args?: any) => runUdf("query", reference, args), runMutation: (reference: any, args?: any) => runUdf("mutation", reference, args), + + // Lazy-loaded remote IP access + getRemoteIp, }; const result = await invokeFunction(func, mutationCtx, args as any); validateReturnValue(result); @@ -260,6 +285,9 @@ async function invokeQuery< auth: setupAuth(requestId), storage: setupStorageReader(requestId), runQuery: (reference: any, args?: any) => runUdf("query", reference, args), + + // Lazy-loaded remote IP access + getRemoteIp, }; const result = await invokeFunction(func, queryCtx, args as any); validateReturnValue(result);