From 32c83b6d80fe6c6182a69c9c5788ae33a4a2b9b2 Mon Sep 17 00:00:00 2001 From: Lars Thomas Denstad Date: Fri, 27 Jun 2025 14:59:59 +0200 Subject: [PATCH 1/8] Claude Code first pass: Remote IP in context --- crates/application/src/tests/http_action.rs | 14 ++-- crates/application/src/tests/logging.rs | 2 +- .../src/tests/returns_validation.rs | 6 +- crates/common/src/execution_context.rs | 23 +++++++ crates/common/src/types/functions.rs | 67 ++++++++++++------- crates/local_backend/src/http_actions.rs | 3 +- crates/local_backend/src/public_api.rs | 27 +++++--- crates/local_backend/src/subs/mod.rs | 31 ++++++++- crates/pb/protos/common.proto | 1 + crates/sync/src/tests.rs | 4 +- crates/sync/src/worker.rs | 21 +++++- 11 files changed, 144 insertions(+), 55 deletions(-) diff --git a/crates/application/src/tests/http_action.rs b/crates/application/src/tests/http_action.rs index 89f628e17..54eeadd1d 100644 --- a/crates/application/src/tests/http_action.rs +++ b/crates/application/src/tests/http_action.rs @@ -77,7 +77,7 @@ async fn test_http_action_basic(rt: TestRuntime) -> anyhow::Result<()> { common::RequestId::new(), http_request, Identity::system(), - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(None), response_streamer, ) .await?; @@ -134,7 +134,7 @@ async fn test_http_action_error(rt: TestRuntime) -> anyhow::Result<()> { common::RequestId::new(), http_request, Identity::system(), - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(None), response_streamer, ) .await?; @@ -191,7 +191,7 @@ async fn test_http_action_not_found(rt: TestRuntime) -> anyhow::Result<()> { common::RequestId::new(), http_request, Identity::system(), - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(None), response_streamer, ) .await?; @@ -243,7 +243,7 @@ async fn test_http_action_disconnect_before_head( common::RequestId::new(), http_request, Identity::system(), - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(None), response_streamer, )); select! { @@ -307,7 +307,7 @@ async fn test_http_action_disconnect_while_streaming( common::RequestId::new(), http_request, Identity::system(), - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(None), response_streamer, ) .await @@ -382,7 +382,7 @@ async fn test_http_action_continues_after_client_disconnects( common::RequestId::new(), http_request, Identity::system(), - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(None), response_streamer, ) .await @@ -414,7 +414,7 @@ async fn test_http_action_continues_after_client_disconnects( udf_path: "functions:didWrite".parse()?, }, vec![json!({})], - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(None), ExecuteQueryTimestamp::Latest, None, ) diff --git a/crates/application/src/tests/logging.rs b/crates/application/src/tests/logging.rs index e55654bba..e6fd65ba6 100644 --- a/crates/application/src/tests/logging.rs +++ b/crates/application/src/tests/logging.rs @@ -83,7 +83,7 @@ async fn test_udf_logs(rt: TestRuntime) -> anyhow::Result<()> { PublicFunctionPath::Component(path), vec![], Identity::system(), - FunctionCaller::SyncWorker(ClientVersion::unknown()), + FunctionCaller::SyncWorker(ClientVersion::unknown(), None), ) .await?; assert!(result.result.is_ok()); diff --git a/crates/application/src/tests/returns_validation.rs b/crates/application/src/tests/returns_validation.rs index d5264e588..9b4ea4536 100644 --- a/crates/application/src/tests/returns_validation.rs +++ b/crates/application/src/tests/returns_validation.rs @@ -40,7 +40,7 @@ async fn run_zero_arg_mutation( vec![obj], Identity::user(UserIdentity::test()), None, - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(None), None, ) .await @@ -60,7 +60,7 @@ async fn run_zero_arg_query( }), vec![obj], Identity::user(UserIdentity::test()), - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(None), ) .await } @@ -79,7 +79,7 @@ async fn run_zero_arg_action( }), vec![obj], Identity::user(UserIdentity::test()), - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(None), ) .await } diff --git a/crates/common/src/execution_context.rs b/crates/common/src/execution_context.rs index 5e3e33dae..217433869 100644 --- a/crates/common/src/execution_context.rs +++ b/crates/common/src/execution_context.rs @@ -3,6 +3,7 @@ use std::{ Display, Formatter, }, + net::SocketAddr, str::FromStr, }; @@ -39,6 +40,8 @@ pub struct ExecutionContext { pub execution_id: ExecutionId, /// The id of the scheduled job that triggered this UDF, if any. pub parent_scheduled_job: Option<(ComponentId, DeveloperDocumentId)>, + /// The remote IP address of the client that initiated this request + pub remote_ip: Option, /// False if this function was called as part of a request (e.g. action /// calling a mutation) TODO: This is a stop gap solution. The richer /// version of this would be something like parent_execution_id: @@ -52,6 +55,17 @@ impl ExecutionContext { request_id, execution_id: ExecutionId::new(), parent_scheduled_job: caller.parent_scheduled_job(), + remote_ip: caller.remote_ip(), + is_root: caller.is_root(), + } + } + + pub fn new_with_remote_ip(request_id: RequestId, caller: &FunctionCaller, remote_ip: Option) -> Self { + Self { + request_id, + execution_id: ExecutionId::new(), + parent_scheduled_job: caller.parent_scheduled_job(), + remote_ip, is_root: caller.is_root(), } } @@ -60,12 +74,14 @@ impl ExecutionContext { request_id: RequestId, execution_id: ExecutionId, parent_scheduled_job: Option<(ComponentId, DeveloperDocumentId)>, + remote_ip: Option, is_root: bool, ) -> Self { Self { request_id, execution_id, parent_scheduled_job, + remote_ip, is_root, } } @@ -80,6 +96,7 @@ impl ExecutionContext { request_id: RequestId::new(), execution_id: ExecutionId::new(), parent_scheduled_job: None, + remote_ip: None, is_root: true, } } @@ -97,6 +114,7 @@ impl HeapSize for ExecutionContext { + self .parent_scheduled_job .map_or(0, |(_, document_id)| document_id.heap_size()) + + self.remote_ip.map_or(0, |_| std::mem::size_of::()) + self.is_root.heap_size() } } @@ -259,6 +277,7 @@ impl From for pb::common::ExecutionContext { pb::common::ExecutionContext { request_id: Some(value.request_id.into()), execution_id: Some(value.execution_id.to_string()), + remote_ip: value.remote_ip.map(|addr| addr.to_string()), parent_scheduled_job_component_id: parent_component_id .and_then(|id| id.serialize_to_string()), parent_scheduled_job: parent_document_id.map(Into::into), @@ -275,6 +294,8 @@ impl TryFrom for ExecutionContext { value.parent_scheduled_job_component_id.as_deref(), )?; let parent_document_id = value.parent_scheduled_job.map(|s| s.parse()).transpose()?; + let remote_ip = value.remote_ip.map(|s| s.parse()).transpose() + .context("Invalid remote IP address")?; Ok(Self { request_id: RequestId::from_str(&value.request_id.context("Missing request id")?)?, execution_id: match &value.execution_id { @@ -282,6 +303,7 @@ impl TryFrom for ExecutionContext { None => ExecutionId::new(), }, parent_scheduled_job: parent_document_id.map(|id| (parent_component_id, id)), + remote_ip, is_root: value.is_root.unwrap_or_default(), }) } @@ -294,6 +316,7 @@ impl From for JsonValue { "requestId": String::from(value.request_id), "executionId": value.execution_id.to_string(), "isRoot": value.is_root, + "remoteIp": value.remote_ip.map(|addr| addr.to_string()), "parentScheduledJob": parent_document_id.map(|id| id.to_string()), "parentScheduledJobComponentId": parent_component_id.unwrap_or(ComponentId::Root).serialize_to_string(), }) diff --git a/crates/common/src/types/functions.rs b/crates/common/src/types/functions.rs index e839b9a86..a7d71c06d 100644 --- a/crates/common/src/types/functions.rs +++ b/crates/common/src/types/functions.rs @@ -3,6 +3,7 @@ use std::{ self, Debug, }, + net::SocketAddr, str::FromStr, }; @@ -145,13 +146,13 @@ pub enum AllowedVisibility { #[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)] #[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub enum FunctionCaller { - SyncWorker(ClientVersion), - HttpApi(ClientVersion), + SyncWorker(ClientVersion, Option), + HttpApi(ClientVersion, Option), /// Used by function tester in the dashboard Tester(ClientVersion), // This is a user defined http actions called externally. If the http action // calls other functions, their caller would be `Action`. - HttpEndpoint, + HttpEndpoint(Option), Cron, Scheduler { job_id: DeveloperDocumentId, @@ -168,10 +169,10 @@ pub enum FunctionCaller { impl FunctionCaller { pub fn client_version(&self) -> Option { match self { - FunctionCaller::SyncWorker(c) => Some(c), - FunctionCaller::HttpApi(c) => Some(c), + FunctionCaller::SyncWorker(c, _) => Some(c), + FunctionCaller::HttpApi(c, _) => Some(c), FunctionCaller::Tester(c) => Some(c), - FunctionCaller::HttpEndpoint + FunctionCaller::HttpEndpoint(_) | FunctionCaller::Cron | FunctionCaller::Scheduler { .. } | FunctionCaller::Action { .. } => None, @@ -183,10 +184,10 @@ impl FunctionCaller { pub fn parent_scheduled_job(&self) -> Option<(ComponentId, DeveloperDocumentId)> { match self { - FunctionCaller::SyncWorker(_) - | FunctionCaller::HttpApi(_) + FunctionCaller::SyncWorker(_, _) + | FunctionCaller::HttpApi(_, _) | FunctionCaller::Tester(_) - | FunctionCaller::HttpEndpoint + | FunctionCaller::HttpEndpoint(_) | FunctionCaller::Cron => None, #[cfg(any(test, feature = "testing"))] FunctionCaller::Test => None, @@ -200,12 +201,26 @@ impl FunctionCaller { } } + pub fn remote_ip(&self) -> Option { + match self { + FunctionCaller::SyncWorker(_, remote_ip) + | FunctionCaller::HttpApi(_, remote_ip) + | FunctionCaller::HttpEndpoint(remote_ip) => *remote_ip, + FunctionCaller::Tester(_) + | FunctionCaller::Cron + | FunctionCaller::Scheduler { .. } + | FunctionCaller::Action { .. } => None, + #[cfg(any(test, feature = "testing"))] + FunctionCaller::Test => None, + } + } + pub fn is_root(&self) -> bool { match self { - FunctionCaller::SyncWorker(_) - | FunctionCaller::HttpApi(_) + FunctionCaller::SyncWorker(_, _) + | FunctionCaller::HttpApi(_, _) | FunctionCaller::Tester(_) - | FunctionCaller::HttpEndpoint + | FunctionCaller::HttpEndpoint(_) | FunctionCaller::Cron | FunctionCaller::Scheduler { .. } => true, FunctionCaller::Action { .. } => false, @@ -219,9 +234,9 @@ impl FunctionCaller { // to run it even if the client goes away. However, we preserve the right // to interrupt actions if the backend restarts. match self { - FunctionCaller::SyncWorker(_) - | FunctionCaller::HttpApi(_) - | FunctionCaller::HttpEndpoint + FunctionCaller::SyncWorker(_, _) + | FunctionCaller::HttpApi(_, _) + | FunctionCaller::HttpEndpoint(_) | FunctionCaller::Tester(_) => true, FunctionCaller::Cron | FunctionCaller::Scheduler { .. } @@ -233,13 +248,13 @@ impl FunctionCaller { pub fn allowed_visibility(&self) -> AllowedVisibility { match self { - FunctionCaller::SyncWorker(_) | FunctionCaller::HttpApi(_) => { + FunctionCaller::SyncWorker(_, _) | FunctionCaller::HttpApi(_, _) => { AllowedVisibility::PublicOnly }, // NOTE: Allowed visibility doesn't make sense in the context of an // user defined http action since all http actions are public, and // we shouldn't be checking visibility. We define this for completeness. - FunctionCaller::HttpEndpoint => AllowedVisibility::PublicOnly, + FunctionCaller::HttpEndpoint(_) => AllowedVisibility::PublicOnly, FunctionCaller::Tester(_) | FunctionCaller::Cron | FunctionCaller::Scheduler { .. } @@ -253,10 +268,10 @@ impl FunctionCaller { impl fmt::Display for FunctionCaller { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let s = match self { - FunctionCaller::SyncWorker(_) => "SyncWorker", - FunctionCaller::HttpApi(_) => "HttpApi", + FunctionCaller::SyncWorker(_, _) => "SyncWorker", + FunctionCaller::HttpApi(_, _) => "HttpApi", FunctionCaller::Tester(_) => "Tester", - FunctionCaller::HttpEndpoint => "HttpEndpoint", + FunctionCaller::HttpEndpoint(_) => "HttpEndpoint", FunctionCaller::Cron => "Cron", FunctionCaller::Scheduler { .. } => "Scheduler", FunctionCaller::Action { .. } => "Action", @@ -270,16 +285,16 @@ impl fmt::Display for FunctionCaller { impl From for pb::common::FunctionCaller { fn from(caller: FunctionCaller) -> Self { let caller = match caller { - FunctionCaller::SyncWorker(client_version) => { + FunctionCaller::SyncWorker(client_version, _) => { pb::common::function_caller::Caller::SyncWorker(client_version.into()) }, - FunctionCaller::HttpApi(client_version) => { + FunctionCaller::HttpApi(client_version, _) => { pb::common::function_caller::Caller::HttpApi(client_version.into()) }, FunctionCaller::Tester(client_version) => { pb::common::function_caller::Caller::Tester(client_version.into()) }, - FunctionCaller::HttpEndpoint => pb::common::function_caller::Caller::HttpEndpoint(()), + FunctionCaller::HttpEndpoint(_) => pb::common::function_caller::Caller::HttpEndpoint(()), FunctionCaller::Cron => pb::common::function_caller::Caller::Cron(()), FunctionCaller::Scheduler { job_id, @@ -318,16 +333,16 @@ impl TryFrom for FunctionCaller { fn try_from(msg: pb::common::FunctionCaller) -> anyhow::Result { let caller = match msg.caller { Some(pb::common::function_caller::Caller::SyncWorker(client_version)) => { - FunctionCaller::SyncWorker(client_version.try_into()?) + FunctionCaller::SyncWorker(client_version.try_into()?, None) }, Some(pb::common::function_caller::Caller::HttpApi(client_version)) => { - FunctionCaller::HttpApi(client_version.try_into()?) + FunctionCaller::HttpApi(client_version.try_into()?, None) }, Some(pb::common::function_caller::Caller::Tester(client_version)) => { FunctionCaller::Tester(client_version.try_into()?) }, Some(pb::common::function_caller::Caller::HttpEndpoint(())) => { - FunctionCaller::HttpEndpoint + FunctionCaller::HttpEndpoint(None) }, Some(pb::common::function_caller::Caller::Cron(())) => FunctionCaller::Cron, Some(pb::common::function_caller::Caller::Scheduler(caller)) => { diff --git a/crates/local_backend/src/http_actions.rs b/crates/local_backend/src/http_actions.rs index 3bf5c2112..6f07bd3af 100644 --- a/crates/local_backend/src/http_actions.rs +++ b/crates/local_backend/src/http_actions.rs @@ -155,6 +155,7 @@ impl FromRequest for ExtractHttpRequestMetadata { #[debug_handler] pub async fn http_any_method( State(st): State, + remote_addr: axum::extract::ConnectInfo, TryExtractIdentity(identity_result): TryExtractIdentity, ExtractRequestId(request_id): ExtractRequestId, ExtractResolvedHostname(host): ExtractResolvedHostname, @@ -222,7 +223,7 @@ async fn stream_http_response( request_id, http_request_metadata, identity, - FunctionCaller::HttpEndpoint, + FunctionCaller::HttpEndpoint(Some(remote_addr.0)), HttpActionResponseStreamer::new(http_response_sender), ) .fuse(); diff --git a/crates/local_backend/src/public_api.rs b/crates/local_backend/src/public_api.rs index 72a70d78d..6f642b6c3 100644 --- a/crates/local_backend/src/public_api.rs +++ b/crates/local_backend/src/public_api.rs @@ -183,6 +183,7 @@ impl UdfResponse { /// Executes an arbitrary query/mutation/action from its name. pub async fn public_function_post( State(st): State, + remote_addr: axum::extract::ConnectInfo, ExtractResolvedHostname(host): ExtractResolvedHostname, ExtractRequestId(request_id): ExtractRequestId, ExtractAuthenticationToken(auth_token): ExtractAuthenticationToken, @@ -212,7 +213,7 @@ pub async fn public_function_post( identity, component_function_path, req.args.into_arg_vec(), - FunctionCaller::HttpApi(client_version.clone()), + FunctionCaller::HttpApi(client_version.clone(), Some(remote_addr.0)), ) .await?; let value_format = req.format.as_ref().map(|f| f.parse()).transpose()?; @@ -242,6 +243,7 @@ pub struct UdfPostRequestArgsOnly { /// request and doesn't require admin auth. pub async fn public_function_post_with_path( State(st): State, + remote_addr: axum::extract::ConnectInfo, ExtractResolvedHostname(host): ExtractResolvedHostname, Path(path): Path, ExtractRequestId(request_id): ExtractRequestId, @@ -289,7 +291,7 @@ pub async fn public_function_post_with_path( udf_path, }, req.args.into_arg_vec(), - FunctionCaller::HttpApi(client_version.clone()), + FunctionCaller::HttpApi(client_version.clone(), Some(remote_addr.0)), ) .await?; // Default to ConvexCleanJSON if no format is provided. @@ -328,6 +330,7 @@ pub fn export_value( #[fastrace::trace(properties = { "udf_type": "query"})] pub async fn public_query_get( State(st): State, + remote_addr: axum::extract::ConnectInfo, Query(req): Query, ExtractResolvedHostname(host): ExtractResolvedHostname, ExtractRequestId(request_id): ExtractRequestId, @@ -353,7 +356,7 @@ pub async fn public_query_get( identity, export_path, args, - FunctionCaller::HttpApi(client_version.clone()), + FunctionCaller::HttpApi(client_version.clone(), Some(remote_addr.0)), ExecuteQueryTimestamp::Latest, journal, ) @@ -373,6 +376,7 @@ pub async fn public_query_get( #[fastrace::trace(properties = { "udf_type": "query"})] pub async fn public_query_post( State(st): State, + remote_addr: axum::extract::ConnectInfo, ExtractResolvedHostname(host): ExtractResolvedHostname, ExtractRequestId(request_id): ExtractRequestId, ExtractAuthenticationToken(auth_token): ExtractAuthenticationToken, @@ -397,7 +401,7 @@ pub async fn public_query_post( identity, udf_path, req.args.into_arg_vec(), - FunctionCaller::HttpApi(client_version.clone()), + FunctionCaller::HttpApi(client_version.clone(), Some(remote_addr.0)), ExecuteQueryTimestamp::Latest, journal, ) @@ -416,9 +420,10 @@ pub async fn public_query_post( } pub async fn public_get_query_ts( + State(st): State, + remote_addr: axum::extract::ConnectInfo, ExtractResolvedHostname(host): ExtractResolvedHostname, ExtractRequestId(request_id): ExtractRequestId, - State(st): State, ) -> Result { let ts = *(st.api.latest_timestamp(&host, request_id).await?); Ok(Json(Ts { ts: ts.into() })) @@ -427,6 +432,7 @@ pub async fn public_get_query_ts( #[fastrace::trace(properties = { "udf_type": "query"})] pub async fn public_query_at_ts_post( State(st): State, + remote_addr: axum::extract::ConnectInfo, ExtractResolvedHostname(host): ExtractResolvedHostname, ExtractRequestId(request_id): ExtractRequestId, ExtractAuthenticationToken(auth_token): ExtractAuthenticationToken, @@ -452,7 +458,7 @@ pub async fn public_query_at_ts_post( identity, export_path, req.args.into_arg_vec(), - FunctionCaller::HttpApi(client_version.clone()), + FunctionCaller::HttpApi(client_version.clone(), Some(remote_addr.0)), ExecuteQueryTimestamp::At(ts), journal, ) @@ -482,6 +488,7 @@ pub struct QueryBatchResponse { pub async fn public_query_batch_post( State(st): State, + remote_addr: axum::extract::ConnectInfo, ExtractResolvedHostname(host): ExtractResolvedHostname, ExtractRequestId(request_id): ExtractRequestId, ExtractAuthenticationToken(auth_token): ExtractAuthenticationToken, @@ -506,7 +513,7 @@ pub async fn public_query_batch_post( identity.clone(), export_path, req.args.into_arg_vec(), - FunctionCaller::HttpApi(client_version.clone()), + FunctionCaller::HttpApi(client_version.clone(), Some(remote_addr.0)), ExecuteQueryTimestamp::At(*ts), None, ) @@ -531,6 +538,7 @@ pub async fn public_query_batch_post( #[fastrace::trace(properties = { "udf_type": "mutation"})] pub async fn public_mutation_post( State(st): State, + remote_addr: axum::extract::ConnectInfo, ExtractResolvedHostname(host): ExtractResolvedHostname, ExtractRequestId(request_id): ExtractRequestId, ExtractAuthenticationToken(auth_token): ExtractAuthenticationToken, @@ -554,7 +562,7 @@ pub async fn public_mutation_post( identity, export_path, req.args.into_arg_vec(), - FunctionCaller::HttpApi(client_version.clone()), + FunctionCaller::HttpApi(client_version.clone(), Some(remote_addr.0)), None, None, ) @@ -578,6 +586,7 @@ pub async fn public_mutation_post( #[fastrace::trace(properties = { "udf_type": "action"})] pub async fn public_action_post( State(st): State, + remote_addr: axum::extract::ConnectInfo, ExtractResolvedHostname(host): ExtractResolvedHostname, ExtractRequestId(request_id): ExtractRequestId, ExtractAuthenticationToken(auth_token): ExtractAuthenticationToken, @@ -602,7 +611,7 @@ pub async fn public_action_post( identity, export_path, req.args.into_arg_vec(), - FunctionCaller::HttpApi(client_version.clone()), + FunctionCaller::HttpApi(client_version.clone(), Some(remote_addr.0)), ) .await?; let value_format = req.format.as_ref().map(|f| f.parse()).transpose()?; diff --git a/crates/local_backend/src/subs/mod.rs b/crates/local_backend/src/subs/mod.rs index 96dc9ab57..0ce3953a3 100644 --- a/crates/local_backend/src/subs/mod.rs +++ b/crates/local_backend/src/subs/mod.rs @@ -137,6 +137,18 @@ async fn run_sync_socket( socket: WebSocket, sentry_scope: sentry::Scope, on_connect: Box, +) { + run_sync_socket_with_remote_ip(st, None, host, config, socket, sentry_scope, on_connect).await +} + +async fn run_sync_socket_with_remote_ip( + st: RouterState, + remote_ip: Option, + host: ResolvedHostname, + config: SyncWorkerConfig, + socket: WebSocket, + sentry_scope: sentry::Scope, + on_connect: Box, ) { let _drop_token = SyncSocketDropToken::new(); @@ -232,10 +244,11 @@ async fn run_sync_socket( let mut identity_version: Option = None; let sync_worker_go = async { let _sync_worker_drop_token = DebugSyncSocketDropToken::new("sync_worker"); - let mut sync_worker = SyncWorker::new( + let mut sync_worker = SyncWorker::new_with_remote_ip( st.api.clone(), st.runtime.clone(), host, + remote_ip, config.clone(), client_rx, server_tx, @@ -348,6 +361,17 @@ pub async fn sync_handler( client_version: ClientVersion, ws: WebSocketUpgrade, on_connect: Box, +) -> Result { + sync_handler_with_remote_ip(st, None, host, client_version, ws, on_connect).await +} + +pub async fn sync_handler_with_remote_ip( + st: RouterState, + remote_ip: Option, + host: ResolvedHostname, + client_version: ClientVersion, + ws: WebSocketUpgrade, + on_connect: Box, ) -> Result { let config = new_sync_worker_config(client_version)?; // Make a copy of the Sentry scope, which contains the request metadata. @@ -359,18 +383,19 @@ pub async fn sync_handler( upgrade_timer.finish(); let monitor = ProdRuntime::task_monitor("sync_socket"); monitor.instrument( - run_sync_socket(st, host, config, ws, sentry_scope, on_connect).bind_hub(hub), + run_sync_socket_with_remote_ip(st, remote_ip, host, config, ws, sentry_scope, on_connect).bind_hub(hub), ) })) } pub async fn sync( State(st): State, + remote_addr: axum::extract::ConnectInfo, ExtractResolvedHostname(host): ExtractResolvedHostname, ExtractClientVersion(client_version): ExtractClientVersion, ws: WebSocketUpgrade, ) -> Result { - sync_handler(st, host, client_version, ws, Box::new(|_session_id| ())).await + sync_handler_with_remote_ip(st, Some(remote_addr.0), host, client_version, ws, Box::new(|_session_id| ())).await } #[cfg(test)] diff --git a/crates/pb/protos/common.proto b/crates/pb/protos/common.proto index 515ccb2fa..bf6c395ad 100644 --- a/crates/pb/protos/common.proto +++ b/crates/pb/protos/common.proto @@ -98,6 +98,7 @@ message ExecutionContext { optional string request_id = 2; optional string execution_id = 3; optional bool is_root = 4; + optional string remote_ip = 6; } enum UdfType { diff --git a/crates/sync/src/tests.rs b/crates/sync/src/tests.rs index f955cb649..f28e8d802 100644 --- a/crates/sync/src/tests.rs +++ b/crates/sync/src/tests.rs @@ -873,7 +873,7 @@ async fn test_udf_cache_out_of_order(rt: TestRuntime) -> anyhow::Result<()> { Identity::Unknown(None), ts2, None, - FunctionCaller::SyncWorker(ClientVersion::unknown()), + FunctionCaller::SyncWorker(ClientVersion::unknown(), None), ) .await?; assert_eq!(result1.result?.unpack(), ConvexValue::from(5.0)); @@ -887,7 +887,7 @@ async fn test_udf_cache_out_of_order(rt: TestRuntime) -> anyhow::Result<()> { Identity::Unknown(None), ts1, None, - FunctionCaller::SyncWorker(ClientVersion::unknown()), + FunctionCaller::SyncWorker(ClientVersion::unknown(), None), ) .await?; assert_eq!(result2.result?.unpack(), ConvexValue::from(0.0)); diff --git a/crates/sync/src/worker.rs b/crates/sync/src/worker.rs index 64cb00ac3..cef8de1d2 100644 --- a/crates/sync/src/worker.rs +++ b/crates/sync/src/worker.rs @@ -216,6 +216,7 @@ pub struct SyncWorker { rt: RT, state: SyncState, host: ResolvedHostname, + remote_ip: Option, rx: mpsc::UnboundedReceiver<(ClientMessage, tokio::time::Instant)>, tx: SingleFlightSender, @@ -265,6 +266,19 @@ impl SyncWorker { rx: mpsc::UnboundedReceiver<(ClientMessage, tokio::time::Instant)>, tx: SingleFlightSender, on_connect: Box, + ) -> Self { + Self::new_with_remote_ip(api, rt, host, None, config, rx, tx, on_connect) + } + + pub fn new_with_remote_ip( + api: Arc, + rt: RT, + host: ResolvedHostname, + remote_ip: Option, + config: SyncWorkerConfig, + rx: mpsc::UnboundedReceiver<(ClientMessage, tokio::time::Instant)>, + tx: SingleFlightSender, + on_connect: Box, ) -> Self { let (mutation_sender, receiver) = mpsc::channel(OPERATION_QUEUE_BUFFER_SIZE); let mutation_futures = ReceiverStream::new(receiver).buffered(1); // Execute at most one operation at a time. @@ -274,6 +288,7 @@ impl SyncWorker { rt, state: SyncState::new(), host, + remote_ip, rx, tx, mutation_futures, @@ -499,7 +514,7 @@ impl SyncWorker { let timer = mutation_queue_timer(); let api = self.api.clone(); let host = self.host.clone(); - let caller = FunctionCaller::SyncWorker(client_version); + let caller = FunctionCaller::SyncWorker(client_version, self.remote_ip); let mutation_queue_size = self.mutation_sender.max_capacity() - self.mutation_sender.capacity(); @@ -601,7 +616,7 @@ impl SyncWorker { }, ); let future = async move { - let caller = FunctionCaller::SyncWorker(client_version); + let caller = FunctionCaller::SyncWorker(client_version, self.remote_ip); let result = match component_path { None => { api.execute_public_action( @@ -791,7 +806,7 @@ impl SyncWorker { None => { // We failed to refresh the subscription or it was invalid to start // with. Rerun the query. - let caller = FunctionCaller::SyncWorker(client_version); + let caller = FunctionCaller::SyncWorker(client_version, self.remote_ip); let ts = ExecuteQueryTimestamp::At(new_ts); // This query run might have been triggered due to invalidation From 89c0d5b46a5c705aab70ea9b9e250b1f5ce212c4 Mon Sep 17 00:00:00 2001 From: Lars Thomas Denstad Date: Fri, 27 Jun 2025 15:56:55 +0200 Subject: [PATCH 2/8] Make it compile (there is an introduced warning, will fix) --- crates/application/src/cron_jobs/mod.rs | 1 + crates/application/src/scheduled_jobs/mod.rs | 1 + crates/local_backend/src/http_actions.rs | 4 +++- crates/local_backend/src/node_action_callbacks.rs | 1 + crates/sync/src/worker.rs | 9 ++++++--- 5 files changed, 12 insertions(+), 4 deletions(-) diff --git a/crates/application/src/cron_jobs/mod.rs b/crates/application/src/cron_jobs/mod.rs index af447367b..67ee32e79 100644 --- a/crates/application/src/cron_jobs/mod.rs +++ b/crates/application/src/cron_jobs/mod.rs @@ -639,6 +639,7 @@ impl CronJobContext { request_id.clone().unwrap_or_else(RequestId::new), execution_id.clone().unwrap_or_else(ExecutionId::new), caller.parent_scheduled_job(), + caller.remote_ip(), caller.is_root(), ); sentry::configure_scope(|scope| context.add_sentry_tags(scope)); diff --git a/crates/application/src/scheduled_jobs/mod.rs b/crates/application/src/scheduled_jobs/mod.rs index 27d7a7cb2..fffefa29b 100644 --- a/crates/application/src/scheduled_jobs/mod.rs +++ b/crates/application/src/scheduled_jobs/mod.rs @@ -832,6 +832,7 @@ impl ScheduledJobContext { request_id.clone().unwrap_or_else(RequestId::new), execution_id.clone().unwrap_or_else(ExecutionId::new), caller.parent_scheduled_job(), + caller.remote_ip(), caller.is_root(), ); sentry::configure_scope(|scope| context.add_sentry_tags(scope)); diff --git a/crates/local_backend/src/http_actions.rs b/crates/local_backend/src/http_actions.rs index 6f07bd3af..fae5140ff 100644 --- a/crates/local_backend/src/http_actions.rs +++ b/crates/local_backend/src/http_actions.rs @@ -173,6 +173,7 @@ pub async fn http_any_method( http_request_metadata, identity, st.api.clone(), + remote_addr.0, ); let head = http_response_stream.try_next().await?; let Some(HttpActionResponsePart::Head(response_head)) = head else { @@ -213,6 +214,7 @@ async fn stream_http_response( http_request_metadata: HttpActionRequest, identity: Identity, application: Arc, + remote_addr: std::net::SocketAddr, ) { let (http_response_sender, http_response_receiver) = mpsc::unbounded_channel(); @@ -223,7 +225,7 @@ async fn stream_http_response( request_id, http_request_metadata, identity, - FunctionCaller::HttpEndpoint(Some(remote_addr.0)), + FunctionCaller::HttpEndpoint(Some(remote_addr)), HttpActionResponseStreamer::new(http_response_sender), ) .fuse(); diff --git a/crates/local_backend/src/node_action_callbacks.rs b/crates/local_backend/src/node_action_callbacks.rs index 9726b2d07..ceb58aeb3 100644 --- a/crates/local_backend/src/node_action_callbacks.rs +++ b/crates/local_backend/src/node_action_callbacks.rs @@ -716,6 +716,7 @@ impl FromRequestParts for ExtractExecutionContext { request_id, execution_id, parent_job_id.map(|id| (parent_component_id, id)), + None, // remote_ip not available in node action callbacks is_root, ))) } diff --git a/crates/sync/src/worker.rs b/crates/sync/src/worker.rs index cef8de1d2..b6b1d360a 100644 --- a/crates/sync/src/worker.rs +++ b/crates/sync/src/worker.rs @@ -514,7 +514,8 @@ impl SyncWorker { let timer = mutation_queue_timer(); let api = self.api.clone(); let host = self.host.clone(); - let caller = FunctionCaller::SyncWorker(client_version, self.remote_ip); + let remote_ip = self.remote_ip; + let caller = FunctionCaller::SyncWorker(client_version, remote_ip); let mutation_queue_size = self.mutation_sender.max_capacity() - self.mutation_sender.capacity(); @@ -602,6 +603,7 @@ impl SyncWorker { let api = self.api.clone(); let host = self.host.clone(); let client_version = self.config.client_version.clone(); + let remote_ip = self.remote_ip; let server_request_id = match self.state.session_id() { Some(id) => RequestId::new_for_ws_session(id, request_id), None => RequestId::new(), @@ -616,7 +618,7 @@ impl SyncWorker { }, ); let future = async move { - let caller = FunctionCaller::SyncWorker(client_version, self.remote_ip); + let caller = FunctionCaller::SyncWorker(client_version, remote_ip); let result = match component_path { None => { api.execute_public_action( @@ -779,6 +781,7 @@ impl SyncWorker { let need_fetch: Vec<_> = self.state.need_fetch().collect(); let host = self.host.clone(); let client_version = self.config.client_version.clone(); + let remote_ip = self.remote_ip; Ok(async move { let future_results: anyhow::Result> = try_join_buffer_unordered( "update_query", @@ -806,7 +809,7 @@ impl SyncWorker { None => { // We failed to refresh the subscription or it was invalid to start // with. Rerun the query. - let caller = FunctionCaller::SyncWorker(client_version, self.remote_ip); + let caller = FunctionCaller::SyncWorker(client_version, remote_ip); let ts = ExecuteQueryTimestamp::At(new_ts); // This query run might have been triggered due to invalidation From bf293fc8f20ea31d9dbd6e8fd48eb72f9431f8a8 Mon Sep 17 00:00:00 2001 From: Lars Thomas Denstad Date: Fri, 27 Jun 2025 22:29:26 +0200 Subject: [PATCH 3/8] Commit with working stuff, need to cleanup --- crates/common/src/execution_context.rs | 9 ++- crates/common/src/types/functions.rs | 6 +- crates/isolate/src/environment/udf/mod.rs | 4 ++ .../isolate/src/isolate2/callback_context.rs | 5 ++ crates/isolate/src/isolate2/environment.rs | 2 + crates/isolate/src/isolate2/runner.rs | 10 +++ crates/isolate/src/ops/mod.rs | 62 +++++++++++++++++++ crates/local_backend/src/subs/mod.rs | 2 + crates/sync/src/worker.rs | 1 + .../src/server/impl/registration_impl.ts | 42 +++++++++++++ 10 files changed, 139 insertions(+), 4 deletions(-) diff --git a/crates/common/src/execution_context.rs b/crates/common/src/execution_context.rs index 217433869..a53c98d9a 100644 --- a/crates/common/src/execution_context.rs +++ b/crates/common/src/execution_context.rs @@ -51,11 +51,13 @@ pub struct ExecutionContext { impl ExecutionContext { pub fn new(request_id: RequestId, caller: &FunctionCaller) -> Self { + let remote_ip = caller.remote_ip(); + tracing::info!("🔍 Creating ExecutionContext with remote IP: {:?}", remote_ip); Self { request_id, execution_id: ExecutionId::new(), parent_scheduled_job: caller.parent_scheduled_job(), - remote_ip: caller.remote_ip(), + remote_ip, is_root: caller.is_root(), } } @@ -312,11 +314,14 @@ impl TryFrom for ExecutionContext { impl From for JsonValue { fn from(value: ExecutionContext) -> Self { let (parent_component_id, parent_document_id) = value.parent_scheduled_job.unzip(); + let remote_ip_str = value.remote_ip.map(|addr| addr.to_string()); + tracing::info!("🔍 Serializing ExecutionContext to JSON with remoteIp: {:?}", remote_ip_str); json!({ "requestId": String::from(value.request_id), "executionId": value.execution_id.to_string(), "isRoot": value.is_root, - "remoteIp": value.remote_ip.map(|addr| addr.to_string()), + "remoteIp": remote_ip_str, + "testValue": "HELLO_FROM_RUST", "parentScheduledJob": parent_document_id.map(|id| id.to_string()), "parentScheduledJobComponentId": parent_component_id.unwrap_or(ComponentId::Root).serialize_to_string(), }) diff --git a/crates/common/src/types/functions.rs b/crates/common/src/types/functions.rs index a7d71c06d..fb695697e 100644 --- a/crates/common/src/types/functions.rs +++ b/crates/common/src/types/functions.rs @@ -202,7 +202,7 @@ impl FunctionCaller { } pub fn remote_ip(&self) -> Option { - match self { + let remote_ip = match self { FunctionCaller::SyncWorker(_, remote_ip) | FunctionCaller::HttpApi(_, remote_ip) | FunctionCaller::HttpEndpoint(remote_ip) => *remote_ip, @@ -212,7 +212,9 @@ impl FunctionCaller { | FunctionCaller::Action { .. } => None, #[cfg(any(test, feature = "testing"))] FunctionCaller::Test => None, - } + }; + tracing::info!("🔍 FunctionCaller::remote_ip() called, returning: {:?}", remote_ip); + remote_ip } pub fn is_root(&self) -> bool { diff --git a/crates/isolate/src/environment/udf/mod.rs b/crates/isolate/src/environment/udf/mod.rs index e339b3b49..24ccc79f1 100644 --- a/crates/isolate/src/environment/udf/mod.rs +++ b/crates/isolate/src/environment/udf/mod.rs @@ -999,4 +999,8 @@ impl DatabaseUdfEnvironment { } Ok(()) } + + pub fn execution_context(&self) -> &ExecutionContext { + &self.context + } } diff --git a/crates/isolate/src/isolate2/callback_context.rs b/crates/isolate/src/isolate2/callback_context.rs index 46e8b9ed1..9477520c5 100644 --- a/crates/isolate/src/isolate2/callback_context.rs +++ b/crates/isolate/src/isolate2/callback_context.rs @@ -595,5 +595,10 @@ mod op_provider { fn remove_text_decoder(&mut self, uuid: &Uuid) -> anyhow::Result { self.context_state()?.remove_text_decoder(uuid) } + + fn get_execution_context(&mut self) -> anyhow::Result> { + let state = self.context_state()?; + state.environment.get_execution_context() + } } } diff --git a/crates/isolate/src/isolate2/environment.rs b/crates/isolate/src/isolate2/environment.rs index 75b09f3f0..5f112157c 100644 --- a/crates/isolate/src/isolate2/environment.rs +++ b/crates/isolate/src/isolate2/environment.rs @@ -44,4 +44,6 @@ pub trait Environment { fn finish_execution(&mut self) -> anyhow::Result; fn get_all_table_mappings(&mut self) -> anyhow::Result; + + fn get_execution_context(&mut self) -> anyhow::Result>; } diff --git a/crates/isolate/src/isolate2/runner.rs b/crates/isolate/src/isolate2/runner.rs index 921d17a12..2d6649b28 100644 --- a/crates/isolate/src/isolate2/runner.rs +++ b/crates/isolate/src/isolate2/runner.rs @@ -243,6 +243,8 @@ struct UdfEnvironment { #[allow(unused)] env_vars: PreloadedEnvironmentVariables, + + execution_context: ExecutionContext, } impl UdfEnvironment { @@ -254,6 +256,7 @@ impl UdfEnvironment { shared: UdfShared, env_vars: PreloadedEnvironmentVariables, log_line_sender: spsc::Sender, + execution_context: ExecutionContext, ) -> Self { let rng = ChaCha12Rng::from_seed(import_time_seed.rng_seed); Self { @@ -270,6 +273,7 @@ impl UdfEnvironment { shared, env_vars, + execution_context, } } @@ -470,6 +474,11 @@ impl Environment for UdfEnvironment { self.check_executing()?; Ok(self.shared.get_all_table_mappings()) } + + fn get_execution_context(&mut self) -> anyhow::Result> { + let context_json: serde_json::Value = self.execution_context.clone().into(); + Ok(Some(context_json)) + } } async fn run_request( @@ -1063,6 +1072,7 @@ pub async fn run_isolate_v2_udf( shared.clone(), env_vars, log_line_sender, + context.clone(), ); // The protocol is synchronous, so there should never be more than diff --git a/crates/isolate/src/ops/mod.rs b/crates/isolate/src/ops/mod.rs index 535f39448..9637e73b3 100644 --- a/crates/isolate/src/ops/mod.rs +++ b/crates/isolate/src/ops/mod.rs @@ -38,11 +38,15 @@ use common::{ EnvVarValue, }, }; +use runtime::prod::ProdRuntime; +#[cfg(any(test, feature = "testing"))] +use runtime::testing::TestRuntime; use crypto::{ op_crypto_decrypt, op_crypto_encrypt, }; use deno_core::{ + serde_v8, v8, ModuleSpecifier, }; @@ -201,6 +205,8 @@ pub trait OpProvider<'b> { -> anyhow::Result>; fn get_all_table_mappings(&mut self) -> anyhow::Result; + + fn get_execution_context(&mut self) -> anyhow::Result>; } impl<'a, 'b: 'a, RT: Runtime, E: IsolateEnvironment> OpProvider<'b> @@ -350,6 +356,37 @@ impl<'a, 'b: 'a, RT: Runtime, E: IsolateEnvironment> OpProvider<'b> let state = self.state_mut()?; state.environment.get_all_table_mappings() } + + fn get_execution_context(&mut self) -> anyhow::Result> { + use crate::environment::udf::DatabaseUdfEnvironment; + use std::any::{Any, TypeId}; + + let state = self.state_mut()?; + + // Check if this is a DatabaseUdfEnvironment by checking type ID + let environment_any = &state.environment as &dyn Any; + let type_id = environment_any.type_id(); + + // Try common runtime types + if type_id == TypeId::of::>() { + if let Some(udf_env) = environment_any.downcast_ref::>() { + let context_json: serde_json::Value = udf_env.execution_context().clone().into(); + return Ok(Some(context_json)); + } + } + + // Add other runtime types as needed + #[cfg(any(test, feature = "testing"))] + if type_id == TypeId::of::>() { + if let Some(udf_env) = environment_any.downcast_ref::>() { + let context_json: serde_json::Value = udf_env.execution_context().clone().into(); + return Ok(Some(context_json)); + } + } + + // Not in UDF environment - return None + Ok(None) + } } pub fn run_op<'b, P: OpProvider<'b>>( @@ -428,6 +465,7 @@ pub fn run_op<'b, P: OpProvider<'b>>( "crypto/exportPkcs8X25519" => op_crypto_export_pkcs8_x25519(provider, args, rv)?, "crypto/generateKeyPair" => op_crypto_generate_keypair(provider, args, rv)?, "crypto/generateKeyBytes" => op_crypto_generate_key_bytes(provider, args, rv)?, + "getExecutionContext" => op_get_execution_context(provider, args, rv)?, _ => { anyhow::bail!(ErrorMetadata::bad_request( "UnknownOperation", @@ -473,3 +511,27 @@ pub fn start_async_op<'b, P: OpProvider<'b>>( rv.set(promise.into()); Ok(()) } + +// Access to execution context for user functions +pub fn op_get_execution_context<'b, P: OpProvider<'b>>( + provider: &mut P, + _args: v8::FunctionCallbackArguments, + mut rv: v8::ReturnValue, +) -> anyhow::Result<()> { + tracing::info!("🔍 op_get_execution_context called!"); + + // Get the execution context using the trait method + let context_json = provider.get_execution_context()?; + + tracing::info!("🔍 Retrieved execution context: {:?}", context_json); + + // Return the context or empty object if not available + let result = context_json.unwrap_or_else(|| serde_json::json!({})); + + // Convert to V8 value and return + let v8_value = serde_v8::to_v8(provider.scope(), result)?; + rv.set(v8_value); + + tracing::info!("🔍 op_get_execution_context completed successfully"); + Ok(()) +} diff --git a/crates/local_backend/src/subs/mod.rs b/crates/local_backend/src/subs/mod.rs index 0ce3953a3..93e9fdceb 100644 --- a/crates/local_backend/src/subs/mod.rs +++ b/crates/local_backend/src/subs/mod.rs @@ -244,6 +244,7 @@ async fn run_sync_socket_with_remote_ip( let mut identity_version: Option = None; let sync_worker_go = async { let _sync_worker_drop_token = DebugSyncSocketDropToken::new("sync_worker"); + tracing::info!("🔍 Creating SyncWorker with remote IP: {:?}", remote_ip); let mut sync_worker = SyncWorker::new_with_remote_ip( st.api.clone(), st.runtime.clone(), @@ -395,6 +396,7 @@ pub async fn sync( ExtractClientVersion(client_version): ExtractClientVersion, ws: WebSocketUpgrade, ) -> Result { + tracing::info!("🔍 WebSocket connection from remote IP: {}", remote_addr.0); sync_handler_with_remote_ip(st, Some(remote_addr.0), host, client_version, ws, Box::new(|_session_id| ())).await } diff --git a/crates/sync/src/worker.rs b/crates/sync/src/worker.rs index b6b1d360a..5a8eba179 100644 --- a/crates/sync/src/worker.rs +++ b/crates/sync/src/worker.rs @@ -810,6 +810,7 @@ impl SyncWorker { // We failed to refresh the subscription or it was invalid to start // with. Rerun the query. let caller = FunctionCaller::SyncWorker(client_version, remote_ip); + tracing::info!("🔍 About to execute query with FunctionCaller containing remote IP: {:?}", remote_ip); let ts = ExecuteQueryTimestamp::At(new_ts); // This query run might have been triggered due to invalidation diff --git a/npm-packages/convex/src/server/impl/registration_impl.ts b/npm-packages/convex/src/server/impl/registration_impl.ts index aaa422387..106996e18 100644 --- a/npm-packages/convex/src/server/impl/registration_impl.ts +++ b/npm-packages/convex/src/server/impl/registration_impl.ts @@ -40,13 +40,46 @@ import { performAsyncSyscall } from "./syscall.js"; import { asObjectValidator } from "../../values/validator.js"; import { getFunctionAddress } from "../components/paths.js"; +// Declare the global Convex object +declare const Convex: { + syscall: (op: string, jsonArgs: string) => string; + asyncSyscall: (op: string, jsonArgs: string) => Promise; + jsSyscall: (op: string, args: Record) => any; + op: (opName: string, ...args: any[]) => any; +}; + +// Get execution context from Rust +async function getExecutionContext(): Promise { + try { + if (typeof Convex === "undefined" || Convex.op === undefined) { + console.log("❌ Convex.op not available"); + throw new Error("Convex.op not available"); + } + console.log("✅ Calling getExecutionContext op..."); + const context = Convex.op("getExecutionContext"); + console.log("✅ Got execution context:", JSON.stringify(context, null, 2)); + return context; + } catch (error) { + console.log("❌ Error getting execution context:", error); + return { + testValue: "FALLBACK_CONTEXT_EXECUTED", // Make this very obvious + remoteIp: "DEBUG_FALLBACK", // Change null to a string so it's obvious + debugInfo: "getExecutionContext was called but failed", + }; + } +} + async function invokeMutation< F extends (ctx: GenericMutationCtx, ...args: any) => any, >(func: F, argsStr: string) { + console.log("🔍 TypeScript: invokeMutation called!"); // TODO(presley): Change the function signature and propagate the requestId from Rust. // Ok, to mock it out for now, since queries are only running in V8. const requestId = ""; const args = jsonToConvex(JSON.parse(argsStr)); + console.log("🔍 TypeScript: About to call getExecutionContext for mutation"); + const executionContext = await getExecutionContext(); + console.log("🔍 TypeScript: Mutation execution context:", executionContext); const mutationCtx = { db: setupWriter(), auth: setupAuth(requestId), @@ -56,7 +89,9 @@ async function invokeMutation< runQuery: (reference: any, args?: any) => runUdf("query", reference, args), runMutation: (reference: any, args?: any) => runUdf("mutation", reference, args), + ...executionContext, }; + console.log("🔍 TypeScript: Final mutation context:", mutationCtx); const result = await invokeFunction(func, mutationCtx, args as any); validateReturnValue(result); return JSON.stringify(convexToJson(result === undefined ? null : result)); @@ -251,16 +286,23 @@ export const internalMutationGeneric: MutationBuilder = (( async function invokeQuery< F extends (ctx: GenericQueryCtx, ...args: any) => any, >(func: F, argsStr: string) { + INTENTIONAL_SYNTAX_ERROR_TO_FORCE_FAILURE; + console.log("🔍 TypeScript: invokeQuery called!"); // TODO(presley): Change the function signature and propagate the requestId from Rust. // Ok, to mock it out for now, since queries are only running in V8. const requestId = ""; const args = jsonToConvex(JSON.parse(argsStr)); + console.log("🔍 TypeScript: About to call getExecutionContext for query"); + const executionContext = await getExecutionContext(); + console.log("🔍 TypeScript: Query execution context:", executionContext); const queryCtx = { db: setupReader(), auth: setupAuth(requestId), storage: setupStorageReader(requestId), runQuery: (reference: any, args?: any) => runUdf("query", reference, args), + ...executionContext, }; + console.log("🔍 TypeScript: Final query context:", queryCtx); const result = await invokeFunction(func, queryCtx, args as any); validateReturnValue(result); return JSON.stringify(convexToJson(result === undefined ? null : result)); From 89e4e9892186dfcc4a4a97f4036f18158137fb94 Mon Sep 17 00:00:00 2001 From: Lars Thomas Denstad Date: Fri, 27 Jun 2025 22:58:35 +0200 Subject: [PATCH 4/8] Cleanup --- crates/isolate/src/ops/mod.rs | 5 ----- .../src/server/impl/registration_impl.ts | 19 +------------------ 2 files changed, 1 insertion(+), 23 deletions(-) diff --git a/crates/isolate/src/ops/mod.rs b/crates/isolate/src/ops/mod.rs index 9637e73b3..93cfd955b 100644 --- a/crates/isolate/src/ops/mod.rs +++ b/crates/isolate/src/ops/mod.rs @@ -518,13 +518,9 @@ pub fn op_get_execution_context<'b, P: OpProvider<'b>>( _args: v8::FunctionCallbackArguments, mut rv: v8::ReturnValue, ) -> anyhow::Result<()> { - tracing::info!("🔍 op_get_execution_context called!"); - // Get the execution context using the trait method let context_json = provider.get_execution_context()?; - tracing::info!("🔍 Retrieved execution context: {:?}", context_json); - // Return the context or empty object if not available let result = context_json.unwrap_or_else(|| serde_json::json!({})); @@ -532,6 +528,5 @@ pub fn op_get_execution_context<'b, P: OpProvider<'b>>( let v8_value = serde_v8::to_v8(provider.scope(), result)?; rv.set(v8_value); - tracing::info!("🔍 op_get_execution_context completed successfully"); Ok(()) } diff --git a/npm-packages/convex/src/server/impl/registration_impl.ts b/npm-packages/convex/src/server/impl/registration_impl.ts index 106996e18..a2cd25efb 100644 --- a/npm-packages/convex/src/server/impl/registration_impl.ts +++ b/npm-packages/convex/src/server/impl/registration_impl.ts @@ -52,34 +52,23 @@ declare const Convex: { async function getExecutionContext(): Promise { try { if (typeof Convex === "undefined" || Convex.op === undefined) { - console.log("❌ Convex.op not available"); throw new Error("Convex.op not available"); } - console.log("✅ Calling getExecutionContext op..."); const context = Convex.op("getExecutionContext"); - console.log("✅ Got execution context:", JSON.stringify(context, null, 2)); return context; } catch (error) { - console.log("❌ Error getting execution context:", error); - return { - testValue: "FALLBACK_CONTEXT_EXECUTED", // Make this very obvious - remoteIp: "DEBUG_FALLBACK", // Change null to a string so it's obvious - debugInfo: "getExecutionContext was called but failed", - }; + return {}; } } async function invokeMutation< F extends (ctx: GenericMutationCtx, ...args: any) => any, >(func: F, argsStr: string) { - console.log("🔍 TypeScript: invokeMutation called!"); // TODO(presley): Change the function signature and propagate the requestId from Rust. // Ok, to mock it out for now, since queries are only running in V8. const requestId = ""; const args = jsonToConvex(JSON.parse(argsStr)); - console.log("🔍 TypeScript: About to call getExecutionContext for mutation"); const executionContext = await getExecutionContext(); - console.log("🔍 TypeScript: Mutation execution context:", executionContext); const mutationCtx = { db: setupWriter(), auth: setupAuth(requestId), @@ -91,7 +80,6 @@ async function invokeMutation< runUdf("mutation", reference, args), ...executionContext, }; - console.log("🔍 TypeScript: Final mutation context:", mutationCtx); const result = await invokeFunction(func, mutationCtx, args as any); validateReturnValue(result); return JSON.stringify(convexToJson(result === undefined ? null : result)); @@ -286,15 +274,11 @@ export const internalMutationGeneric: MutationBuilder = (( async function invokeQuery< F extends (ctx: GenericQueryCtx, ...args: any) => any, >(func: F, argsStr: string) { - INTENTIONAL_SYNTAX_ERROR_TO_FORCE_FAILURE; - console.log("🔍 TypeScript: invokeQuery called!"); // TODO(presley): Change the function signature and propagate the requestId from Rust. // Ok, to mock it out for now, since queries are only running in V8. const requestId = ""; const args = jsonToConvex(JSON.parse(argsStr)); - console.log("🔍 TypeScript: About to call getExecutionContext for query"); const executionContext = await getExecutionContext(); - console.log("🔍 TypeScript: Query execution context:", executionContext); const queryCtx = { db: setupReader(), auth: setupAuth(requestId), @@ -302,7 +286,6 @@ async function invokeQuery< runQuery: (reference: any, args?: any) => runUdf("query", reference, args), ...executionContext, }; - console.log("🔍 TypeScript: Final query context:", queryCtx); const result = await invokeFunction(func, queryCtx, args as any); validateReturnValue(result); return JSON.stringify(convexToJson(result === undefined ? null : result)); From 34859c194f1baaa95c7e43f4aee9815e866a6b85 Mon Sep 17 00:00:00 2001 From: Lars Thomas Denstad Date: Fri, 27 Jun 2025 23:03:06 +0200 Subject: [PATCH 5/8] Cleanup logging --- crates/common/src/execution_context.rs | 2 -- crates/common/src/types/functions.rs | 1 - crates/local_backend/src/subs/mod.rs | 2 -- crates/sync/src/worker.rs | 1 - 4 files changed, 6 deletions(-) diff --git a/crates/common/src/execution_context.rs b/crates/common/src/execution_context.rs index a53c98d9a..2fe3da9ca 100644 --- a/crates/common/src/execution_context.rs +++ b/crates/common/src/execution_context.rs @@ -52,7 +52,6 @@ pub struct ExecutionContext { impl ExecutionContext { pub fn new(request_id: RequestId, caller: &FunctionCaller) -> Self { let remote_ip = caller.remote_ip(); - tracing::info!("🔍 Creating ExecutionContext with remote IP: {:?}", remote_ip); Self { request_id, execution_id: ExecutionId::new(), @@ -315,7 +314,6 @@ impl From for JsonValue { fn from(value: ExecutionContext) -> Self { let (parent_component_id, parent_document_id) = value.parent_scheduled_job.unzip(); let remote_ip_str = value.remote_ip.map(|addr| addr.to_string()); - tracing::info!("🔍 Serializing ExecutionContext to JSON with remoteIp: {:?}", remote_ip_str); json!({ "requestId": String::from(value.request_id), "executionId": value.execution_id.to_string(), diff --git a/crates/common/src/types/functions.rs b/crates/common/src/types/functions.rs index fb695697e..b1c8121d9 100644 --- a/crates/common/src/types/functions.rs +++ b/crates/common/src/types/functions.rs @@ -213,7 +213,6 @@ impl FunctionCaller { #[cfg(any(test, feature = "testing"))] FunctionCaller::Test => None, }; - tracing::info!("🔍 FunctionCaller::remote_ip() called, returning: {:?}", remote_ip); remote_ip } diff --git a/crates/local_backend/src/subs/mod.rs b/crates/local_backend/src/subs/mod.rs index 93e9fdceb..0ce3953a3 100644 --- a/crates/local_backend/src/subs/mod.rs +++ b/crates/local_backend/src/subs/mod.rs @@ -244,7 +244,6 @@ async fn run_sync_socket_with_remote_ip( let mut identity_version: Option = None; let sync_worker_go = async { let _sync_worker_drop_token = DebugSyncSocketDropToken::new("sync_worker"); - tracing::info!("🔍 Creating SyncWorker with remote IP: {:?}", remote_ip); let mut sync_worker = SyncWorker::new_with_remote_ip( st.api.clone(), st.runtime.clone(), @@ -396,7 +395,6 @@ pub async fn sync( ExtractClientVersion(client_version): ExtractClientVersion, ws: WebSocketUpgrade, ) -> Result { - tracing::info!("🔍 WebSocket connection from remote IP: {}", remote_addr.0); sync_handler_with_remote_ip(st, Some(remote_addr.0), host, client_version, ws, Box::new(|_session_id| ())).await } diff --git a/crates/sync/src/worker.rs b/crates/sync/src/worker.rs index 5a8eba179..b6b1d360a 100644 --- a/crates/sync/src/worker.rs +++ b/crates/sync/src/worker.rs @@ -810,7 +810,6 @@ impl SyncWorker { // We failed to refresh the subscription or it was invalid to start // with. Rerun the query. let caller = FunctionCaller::SyncWorker(client_version, remote_ip); - tracing::info!("🔍 About to execute query with FunctionCaller containing remote IP: {:?}", remote_ip); let ts = ExecuteQueryTimestamp::At(new_ts); // This query run might have been triggered due to invalidation From cc39490ef2f0f520ad0199c7b000fa2108e6f86e Mon Sep 17 00:00:00 2001 From: Lars Thomas Denstad Date: Fri, 27 Jun 2025 23:32:18 +0200 Subject: [PATCH 6/8] Remove logging --- crates/common/src/execution_context.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/crates/common/src/execution_context.rs b/crates/common/src/execution_context.rs index 2fe3da9ca..1dfc9a1f9 100644 --- a/crates/common/src/execution_context.rs +++ b/crates/common/src/execution_context.rs @@ -61,7 +61,11 @@ impl ExecutionContext { } } - pub fn new_with_remote_ip(request_id: RequestId, caller: &FunctionCaller, remote_ip: Option) -> Self { + pub fn new_with_remote_ip( + request_id: RequestId, + caller: &FunctionCaller, + remote_ip: Option, + ) -> Self { Self { request_id, execution_id: ExecutionId::new(), @@ -115,7 +119,9 @@ impl HeapSize for ExecutionContext { + self .parent_scheduled_job .map_or(0, |(_, document_id)| document_id.heap_size()) - + self.remote_ip.map_or(0, |_| std::mem::size_of::()) + + self + .remote_ip + .map_or(0, |_| std::mem::size_of::()) + self.is_root.heap_size() } } @@ -295,7 +301,10 @@ impl TryFrom for ExecutionContext { value.parent_scheduled_job_component_id.as_deref(), )?; let parent_document_id = value.parent_scheduled_job.map(|s| s.parse()).transpose()?; - let remote_ip = value.remote_ip.map(|s| s.parse()).transpose() + let remote_ip = value + .remote_ip + .map(|s| s.parse()) + .transpose() .context("Invalid remote IP address")?; Ok(Self { request_id: RequestId::from_str(&value.request_id.context("Missing request id")?)?, @@ -319,7 +328,6 @@ impl From for JsonValue { "executionId": value.execution_id.to_string(), "isRoot": value.is_root, "remoteIp": remote_ip_str, - "testValue": "HELLO_FROM_RUST", "parentScheduledJob": parent_document_id.map(|id| id.to_string()), "parentScheduledJobComponentId": parent_component_id.unwrap_or(ComponentId::Root).serialize_to_string(), }) From d06580b67317f10fb84a6876fb80c8578faf0c31 Mon Sep 17 00:00:00 2001 From: Lars Thomas Denstad Date: Sat, 28 Jun 2025 02:06:23 +0200 Subject: [PATCH 7/8] Rewrite to use synchronous call and be zero-cost if not used --- .../isolate/src/isolate2/callback_context.rs | 2 +- crates/isolate/src/isolate2/context.rs | 1 + crates/isolate/src/isolate2/environment.rs | 2 +- crates/isolate/src/isolate2/runner.rs | 4 +- crates/isolate/src/ops/mod.rs | 53 ++++++++++++------- .../src/server/impl/registration_impl.ts | 23 ++++---- 6 files changed, 52 insertions(+), 33 deletions(-) diff --git a/crates/isolate/src/isolate2/callback_context.rs b/crates/isolate/src/isolate2/callback_context.rs index 9477520c5..741a80620 100644 --- a/crates/isolate/src/isolate2/callback_context.rs +++ b/crates/isolate/src/isolate2/callback_context.rs @@ -595,7 +595,7 @@ mod op_provider { fn remove_text_decoder(&mut self, uuid: &Uuid) -> anyhow::Result { self.context_state()?.remove_text_decoder(uuid) } - + fn get_execution_context(&mut self) -> anyhow::Result> { let state = self.context_state()?; state.environment.get_execution_context() diff --git a/crates/isolate/src/isolate2/context.rs b/crates/isolate/src/isolate2/context.rs index ce8f27851..953f38089 100644 --- a/crates/isolate/src/isolate2/context.rs +++ b/crates/isolate/src/isolate2/context.rs @@ -83,6 +83,7 @@ impl Context { let op_key = strings::op.create(&mut scope)?; convex_value.set(&mut scope, op_key.into(), op_value.into()); + let async_op_template = v8::FunctionTemplate::new(&mut scope, CallbackContext::start_async_op); let async_op_value = async_op_template diff --git a/crates/isolate/src/isolate2/environment.rs b/crates/isolate/src/isolate2/environment.rs index 5f112157c..24652ec8c 100644 --- a/crates/isolate/src/isolate2/environment.rs +++ b/crates/isolate/src/isolate2/environment.rs @@ -44,6 +44,6 @@ pub trait Environment { fn finish_execution(&mut self) -> anyhow::Result; fn get_all_table_mappings(&mut self) -> anyhow::Result; - + fn get_execution_context(&mut self) -> anyhow::Result>; } diff --git a/crates/isolate/src/isolate2/runner.rs b/crates/isolate/src/isolate2/runner.rs index 2d6649b28..96e5bddf9 100644 --- a/crates/isolate/src/isolate2/runner.rs +++ b/crates/isolate/src/isolate2/runner.rs @@ -243,7 +243,7 @@ struct UdfEnvironment { #[allow(unused)] env_vars: PreloadedEnvironmentVariables, - + execution_context: ExecutionContext, } @@ -474,7 +474,7 @@ impl Environment for UdfEnvironment { self.check_executing()?; Ok(self.shared.get_all_table_mappings()) } - + fn get_execution_context(&mut self) -> anyhow::Result> { let context_json: serde_json::Value = self.execution_context.clone().into(); Ok(Some(context_json)) diff --git a/crates/isolate/src/ops/mod.rs b/crates/isolate/src/ops/mod.rs index 93cfd955b..9ac00ee7c 100644 --- a/crates/isolate/src/ops/mod.rs +++ b/crates/isolate/src/ops/mod.rs @@ -38,9 +38,6 @@ use common::{ EnvVarValue, }, }; -use runtime::prod::ProdRuntime; -#[cfg(any(test, feature = "testing"))] -use runtime::testing::TestRuntime; use crypto::{ op_crypto_decrypt, op_crypto_encrypt, @@ -51,6 +48,9 @@ use deno_core::{ ModuleSpecifier, }; use rand_chacha::ChaCha12Rng; +use runtime::prod::ProdRuntime; +#[cfg(any(test, feature = "testing"))] +use runtime::testing::TestRuntime; use sourcemap::SourceMap; use structured_clone::op_structured_clone; use uuid::Uuid; @@ -205,7 +205,7 @@ pub trait OpProvider<'b> { -> anyhow::Result>; fn get_all_table_mappings(&mut self) -> anyhow::Result; - + fn get_execution_context(&mut self) -> anyhow::Result>; } @@ -356,34 +356,42 @@ impl<'a, 'b: 'a, RT: Runtime, E: IsolateEnvironment> OpProvider<'b> let state = self.state_mut()?; state.environment.get_all_table_mappings() } - + fn get_execution_context(&mut self) -> anyhow::Result> { + use std::any::{ + Any, + TypeId, + }; + use crate::environment::udf::DatabaseUdfEnvironment; - use std::any::{Any, TypeId}; - + let state = self.state_mut()?; - + // Check if this is a DatabaseUdfEnvironment by checking type ID let environment_any = &state.environment as &dyn Any; let type_id = environment_any.type_id(); - + // Try common runtime types if type_id == TypeId::of::>() { - if let Some(udf_env) = environment_any.downcast_ref::>() { + if let Some(udf_env) = + environment_any.downcast_ref::>() + { let context_json: serde_json::Value = udf_env.execution_context().clone().into(); return Ok(Some(context_json)); } } - + // Add other runtime types as needed #[cfg(any(test, feature = "testing"))] if type_id == TypeId::of::>() { - if let Some(udf_env) = environment_any.downcast_ref::>() { + if let Some(udf_env) = + environment_any.downcast_ref::>() + { let context_json: serde_json::Value = udf_env.execution_context().clone().into(); return Ok(Some(context_json)); } } - + // Not in UDF environment - return None Ok(None) } @@ -465,7 +473,7 @@ pub fn run_op<'b, P: OpProvider<'b>>( "crypto/exportPkcs8X25519" => op_crypto_export_pkcs8_x25519(provider, args, rv)?, "crypto/generateKeyPair" => op_crypto_generate_keypair(provider, args, rv)?, "crypto/generateKeyBytes" => op_crypto_generate_key_bytes(provider, args, rv)?, - "getExecutionContext" => op_get_execution_context(provider, args, rv)?, + "getRemoteIp" => op_get_remote_ip(provider, args, rv)?, _ => { anyhow::bail!(ErrorMetadata::bad_request( "UnknownOperation", @@ -512,17 +520,24 @@ pub fn start_async_op<'b, P: OpProvider<'b>>( Ok(()) } -// Access to execution context for user functions -pub fn op_get_execution_context<'b, P: OpProvider<'b>>( +// Get remote IP address for the current request +pub fn op_get_remote_ip<'b, P: OpProvider<'b>>( provider: &mut P, _args: v8::FunctionCallbackArguments, mut rv: v8::ReturnValue, ) -> anyhow::Result<()> { - // Get the execution context using the trait method + // Get the execution context and extract remote IP let context_json = provider.get_execution_context()?; - // Return the context or empty object if not available - let result = context_json.unwrap_or_else(|| serde_json::json!({})); + let result = if let Some(ctx) = context_json { + if let Some(ip) = ctx.get("remoteIp").and_then(|ip| ip.as_str()) { + serde_json::Value::String(ip.to_string()) + } else { + serde_json::Value::Null + } + } else { + serde_json::Value::Null + }; // Convert to V8 value and return let v8_value = serde_v8::to_v8(provider.scope(), result)?; diff --git a/npm-packages/convex/src/server/impl/registration_impl.ts b/npm-packages/convex/src/server/impl/registration_impl.ts index a2cd25efb..f675c94f3 100644 --- a/npm-packages/convex/src/server/impl/registration_impl.ts +++ b/npm-packages/convex/src/server/impl/registration_impl.ts @@ -48,16 +48,17 @@ declare const Convex: { op: (opName: string, ...args: any[]) => any; }; -// Get execution context from Rust -async function getExecutionContext(): Promise { +// Get remote IP address for the current request (lazy loaded) +function getRemoteIp(): string | null { try { if (typeof Convex === "undefined" || Convex.op === undefined) { - throw new Error("Convex.op not available"); + return null; } - const context = Convex.op("getExecutionContext"); - return context; + // Use synchronous op call - fast and only called when needed + const remoteIp = Convex.op("getRemoteIp"); + return remoteIp; } catch (error) { - return {}; + return null; } } @@ -68,7 +69,6 @@ async function invokeMutation< // Ok, to mock it out for now, since queries are only running in V8. const requestId = ""; const args = jsonToConvex(JSON.parse(argsStr)); - const executionContext = await getExecutionContext(); const mutationCtx = { db: setupWriter(), auth: setupAuth(requestId), @@ -78,7 +78,9 @@ async function invokeMutation< runQuery: (reference: any, args?: any) => runUdf("query", reference, args), runMutation: (reference: any, args?: any) => runUdf("mutation", reference, args), - ...executionContext, + + // Lazy-loaded remote IP access + getRemoteIp, }; const result = await invokeFunction(func, mutationCtx, args as any); validateReturnValue(result); @@ -278,13 +280,14 @@ async function invokeQuery< // Ok, to mock it out for now, since queries are only running in V8. const requestId = ""; const args = jsonToConvex(JSON.parse(argsStr)); - const executionContext = await getExecutionContext(); const queryCtx = { db: setupReader(), auth: setupAuth(requestId), storage: setupStorageReader(requestId), runQuery: (reference: any, args?: any) => runUdf("query", reference, args), - ...executionContext, + + // Lazy-loaded remote IP access + getRemoteIp, }; const result = await invokeFunction(func, queryCtx, args as any); validateReturnValue(result); From 8d9925cbb1ebeb4c3152b21a173ac88795a95cbe Mon Sep 17 00:00:00 2001 From: Lars Thomas Denstad Date: Sun, 29 Jun 2025 00:35:19 +0200 Subject: [PATCH 8/8] Remove unused stuff --- crates/local_backend/src/public_api.rs | 1 - crates/local_backend/src/subs/mod.rs | 11 +---------- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/crates/local_backend/src/public_api.rs b/crates/local_backend/src/public_api.rs index 6f642b6c3..28abc2ebc 100644 --- a/crates/local_backend/src/public_api.rs +++ b/crates/local_backend/src/public_api.rs @@ -421,7 +421,6 @@ pub async fn public_query_post( pub async fn public_get_query_ts( State(st): State, - remote_addr: axum::extract::ConnectInfo, ExtractResolvedHostname(host): ExtractResolvedHostname, ExtractRequestId(request_id): ExtractRequestId, ) -> Result { diff --git a/crates/local_backend/src/subs/mod.rs b/crates/local_backend/src/subs/mod.rs index 0ce3953a3..63d1a3201 100644 --- a/crates/local_backend/src/subs/mod.rs +++ b/crates/local_backend/src/subs/mod.rs @@ -130,16 +130,7 @@ impl Drop for DebugSyncSocketDropToken { // on a close frame and close the WebSocket. They can also signal clean shutdown // by returning `Ok(())`, and once all of them have cleanly exited, we'll // gracefully close the socket. -async fn run_sync_socket( - st: RouterState, - host: ResolvedHostname, - config: SyncWorkerConfig, - socket: WebSocket, - sentry_scope: sentry::Scope, - on_connect: Box, -) { - run_sync_socket_with_remote_ip(st, None, host, config, socket, sentry_scope, on_connect).await -} + async fn run_sync_socket_with_remote_ip( st: RouterState,