From f2415c7ab23e4bfcaab24d7bbf4ef2c6a351d26b Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Sat, 6 Jun 2026 03:14:59 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E2=AC=86=EF=B8=8F=20(aa-ffi-python):=20Pin?= =?UTF-8?q?=20shared=20crates=20to=209cf8a033=20+=20add=20aa-sdk-client?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bump the aa-core / aa-proto git-SHA pin to the agent-assembly master commit that ships aa-sdk-client, and add aa-sdk-client itself at the same SHA (single workspace checkout per ADR 0002). The next commit delegates the runtime client to it. Handle the new aa-proto ActionType::ToolResult variant in the audit-event translation helpers, which the new SHA requires. --- rust/aa-ffi-python/Cargo.toml | 8 ++++++-- rust/aa-ffi-python/src/lib.rs | 2 ++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/rust/aa-ffi-python/Cargo.toml b/rust/aa-ffi-python/Cargo.toml index 1eb1bf3..a1d868f 100644 --- a/rust/aa-ffi-python/Cargo.toml +++ b/rust/aa-ffi-python/Cargo.toml @@ -10,8 +10,12 @@ name = "aa_ffi_python" crate-type = ["cdylib"] [dependencies] -aa-core = { git = "https://github.com/AI-agent-assembly/agent-assembly.git", rev = "ed4aa11a8c1d1ce1e6f96b08cf2179fd772099b2", package = "aa-core", features = ["serde"] } -aa-proto = { git = "https://github.com/AI-agent-assembly/agent-assembly.git", rev = "ed4aa11a8c1d1ce1e6f96b08cf2179fd772099b2", package = "aa-proto" } +# Shared crates consumed via git-SHA pin (ADR 0002, AAASM-2559). aa-core, +# aa-proto, and aa-sdk-client must share one SHA so cargo resolves a single +# checkout of the agent-assembly workspace. +aa-core = { git = "https://github.com/AI-agent-assembly/agent-assembly.git", rev = "9cf8a033d39c870ba46a0929b62261d125a00a09", package = "aa-core", features = ["serde"] } +aa-proto = { git = "https://github.com/AI-agent-assembly/agent-assembly.git", rev = "9cf8a033d39c870ba46a0929b62261d125a00a09", package = "aa-proto" } +aa-sdk-client = { git = "https://github.com/AI-agent-assembly/agent-assembly.git", rev = "9cf8a033d39c870ba46a0929b62261d125a00a09", package = "aa-sdk-client" } once_cell = "1.20" prost = "0.14" pyo3 = { version = "0.28", features = ["extension-module"] } diff --git a/rust/aa-ffi-python/src/lib.rs b/rust/aa-ffi-python/src/lib.rs index 698b832..46bec40 100644 --- a/rust/aa-ffi-python/src/lib.rs +++ b/rust/aa-ffi-python/src/lib.rs @@ -414,6 +414,7 @@ fn action_type_from_str(value: &str) -> i32 { "network_call" => ActionType::NetworkCall as i32, "process_exec" => ActionType::ProcessExec as i32, "agent_spawn" => ActionType::AgentSpawn as i32, + "tool_result" => ActionType::ToolResult as i32, _ => ActionType::ActionUnspecified as i32, } } @@ -426,6 +427,7 @@ fn action_type_to_str(value: i32) -> &'static str { ActionType::NetworkCall => "network_call", ActionType::ProcessExec => "process_exec", ActionType::AgentSpawn => "agent_spawn", + ActionType::ToolResult => "tool_result", ActionType::ActionUnspecified => "", } } From 4cedc02d86ebd2ce73fe9d82a8bfd072fe51fa9d Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Sat, 6 Jun 2026 03:16:33 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20(aa-ffi-python):=20Del?= =?UTF-8?q?egate=20RuntimeClient=20to=20aa-sdk-client?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make the pyo3 binding a thin shim over the shared aa-sdk-client crate (ADR 0002). RuntimeClient.connect/send_event/close now delegate to aa_sdk_client::AssemblyClient — the UDS transport, IPC wire codec, and background lifecycle live once in aa-sdk-client instead of being reimplemented here. Remove the local tokio worker loop, frame codec, and the synchronous query_policy round-trip (PolicyResult / PolicyTimeoutError): policy and approval are server-side per the trust model, and the advisory, non-authoritative credential preflight is provided transitively by aa-sdk-client — the shim holds no security authority. Type translation (GovernanceEvent, audit_event_to/from_wire_bytes) is retained. Drop the now-unused tokio and once_cell dependencies. --- rust/aa-ffi-python/Cargo.toml | 2 - rust/aa-ffi-python/src/lib.rs | 549 ++++------------------------------ 2 files changed, 56 insertions(+), 495 deletions(-) diff --git a/rust/aa-ffi-python/Cargo.toml b/rust/aa-ffi-python/Cargo.toml index a1d868f..8640266 100644 --- a/rust/aa-ffi-python/Cargo.toml +++ b/rust/aa-ffi-python/Cargo.toml @@ -16,9 +16,7 @@ crate-type = ["cdylib"] aa-core = { git = "https://github.com/AI-agent-assembly/agent-assembly.git", rev = "9cf8a033d39c870ba46a0929b62261d125a00a09", package = "aa-core", features = ["serde"] } aa-proto = { git = "https://github.com/AI-agent-assembly/agent-assembly.git", rev = "9cf8a033d39c870ba46a0929b62261d125a00a09", package = "aa-proto" } aa-sdk-client = { git = "https://github.com/AI-agent-assembly/agent-assembly.git", rev = "9cf8a033d39c870ba46a0929b62261d125a00a09", package = "aa-sdk-client" } -once_cell = "1.20" prost = "0.14" pyo3 = { version = "0.28", features = ["extension-module"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -tokio = { version = "1.41", features = ["io-util", "net", "rt-multi-thread", "sync", "time"] } diff --git a/rust/aa-ffi-python/src/lib.rs b/rust/aa-ffi-python/src/lib.rs index 46bec40..5be43b9 100644 --- a/rust/aa-ffi-python/src/lib.rs +++ b/rust/aa-ffi-python/src/lib.rs @@ -1,4 +1,15 @@ -//! aa-ffi-python crate bootstrap. +//! Thin pyo3 shim over `aa-sdk-client`. +//! +//! Per ADR 0002 (Epic AAASM-2552), the runtime-client logic — UDS transport, +//! IPC wire codec, and the `AssemblyClient` lifecycle — lives once in the +//! shared `aa-sdk-client` crate. This binding is a thin pyo3 shim: ergonomic +//! API, type translation, and event capture. It holds **no** security +//! authority: the advisory, best-effort credential preflight is provided by +//! `aa-sdk-client`, and `aa-runtime` re-scans every event authoritatively. +//! +//! Policy / approval are server-side (see the ADR trust model), so the binding +//! does not perform a synchronous policy round-trip; event reporting is +//! fire-and-forget over the shared client. use aa_core::AuditEntry; use aa_proto::assembly::audit::v1::AuditEvent; @@ -6,42 +17,15 @@ use aa_proto::assembly::audit::v1::CallStackNode as ProtoCallStackNode; use aa_proto::assembly::common::v1::ActionType; use aa_proto::assembly::common::v1::AgentId; use aa_proto::assembly::common::v1::Decision; -use aa_proto::assembly::policy::v1::CheckActionRequest; -use aa_proto::assembly::policy::v1::CheckActionResponse; -use once_cell::sync::Lazy; +use aa_sdk_client::ipc::spawn_ipc_thread; +use aa_sdk_client::{AssemblyClient, AssemblyConfig, SdkClientError}; use prost::Message; -use pyo3::exceptions::PyValueError; use pyo3::exceptions::PyRuntimeError; +use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3::types::{PyAny, PyBytes, PyDict, PyList, PyModule}; -use std::sync::Arc; -use std::sync::Mutex; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::Duration; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use tokio::net::UnixStream; -use tokio::runtime::Runtime; -use tokio::sync::{mpsc, oneshot}; -use tokio::time; - -pyo3::create_exception!(_core, PolicyTimeoutError, pyo3::exceptions::PyTimeoutError); - -static TOKIO_RUNTIME: Lazy = Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .thread_name("aa-ffi-python") - .build() - .expect("failed to build aa-ffi-python tokio runtime") -}); -const TAG_POLICY_QUERY: u8 = 1; -const TAG_EVENT_REPORT: u8 = 2; -const TAG_HEARTBEAT: u8 = 4; - -const TAG_POLICY_RESPONSE: u8 = 1; -const TAG_ACK: u8 = 3; - -#[pyclass(module = "agent_assembly._core")] +#[pyclass(module = "agent_assembly._core", from_py_object)] #[derive(Clone)] struct GovernanceEvent { #[pyo3(get)] @@ -65,345 +49,63 @@ impl GovernanceEvent { } } -#[pyclass(module = "agent_assembly._core")] -#[derive(Clone)] -struct PolicyResult { - #[pyo3(get)] - allowed: bool, - #[pyo3(get)] - reason: String, -} - -#[pymethods] -impl PolicyResult { - #[new] - fn new(allowed: bool, reason: Option) -> Self { - Self { - allowed, - reason: reason.unwrap_or_default(), - } - } -} - +/// Handle to an Agent Assembly runtime session. +/// +/// Thin wrapper over [`aa_sdk_client::AssemblyClient`]: the UDS transport, wire +/// codec, and background IPC thread all live in `aa-sdk-client`. This type only +/// translates the Python surface (`connect` / `send_event` / `close`) onto the +/// shared client. #[pyclass(module = "agent_assembly._core")] struct RuntimeClient { #[pyo3(get)] socket_path: String, - sender: Option>, - closed: Arc, - last_error: Arc>>, -} - -enum WorkerMessage { - Event(GovernanceEvent), - PolicyQuery { - action_json: String, - timeout_ms: u64, - response_tx: oneshot::Sender>, - }, - Close, -} - -#[derive(Clone)] -struct PolicyResultPayload { - allowed: bool, - reason: String, -} - -#[derive(Debug)] -enum WorkerError { - Timeout, - Disconnected, - Transport(String), - Decode(String), -} - -enum WorkerWaitError { - Timeout, - Disconnected, + client: AssemblyClient, } #[pymethods] impl RuntimeClient { - #[new] - fn new(socket_path: String) -> Self { - Self { - socket_path, - sender: None, - closed: Arc::new(AtomicBool::new(true)), - last_error: Arc::new(Mutex::new(None)), - } - } - + /// Connect to `aa-runtime` over the given Unix domain socket path. + /// + /// Spawns the shared client's background IPC thread and returns a handle. + /// Event reporting is fire-and-forget; a failed connection surfaces on a + /// later `send_event` rather than here. #[staticmethod] - fn connect(socket_path: String) -> Self { - let _ = &*TOKIO_RUNTIME; - - let (sender, receiver) = mpsc::unbounded_channel::(); - let closed = Arc::new(AtomicBool::new(false)); - let last_error = Arc::new(Mutex::new(None)); - - TOKIO_RUNTIME.spawn(worker_loop( - socket_path.clone(), - receiver, - Arc::clone(&closed), - Arc::clone(&last_error), - )); - - Self { - socket_path, - sender: Some(sender), - closed, - last_error, - } - } - - fn send_event(&self, event: GovernanceEvent) -> PyResult<()> { - ensure_client_open(self.closed.as_ref(), self.last_error.as_ref())?; - let sender = self - .sender - .as_ref() - .ok_or_else(|| PyRuntimeError::new_err("runtime event queue is unavailable"))?; - sender - .send(WorkerMessage::Event(event)) - .map_err(|_| PyRuntimeError::new_err("failed to enqueue governance event"))?; - Ok(()) - } - - fn query_policy(&self, py: Python<'_>, action: &Bound<'_, PyAny>) -> PyResult { - ensure_client_open(self.closed.as_ref(), self.last_error.as_ref())?; - let action_json = serialize_action_to_json(py, action)?; - let timeout_ms = extract_timeout_ms(action); - let sender = self - .sender - .as_ref() - .ok_or_else(|| PyRuntimeError::new_err("runtime event queue is unavailable"))?; - - let (response_tx, response_rx) = oneshot::channel::>(); - sender - .send(WorkerMessage::PolicyQuery { - action_json, - timeout_ms, - response_tx, - }) - .map_err(|_| PyRuntimeError::new_err("failed to enqueue policy query"))?; - - let worker_result = py.detach(|| wait_for_worker_response(timeout_ms + 100, response_rx)); - let worker_result = worker_result.map_err(|error| match error { - WorkerWaitError::Timeout => PolicyTimeoutError::new_err("policy query timed out"), - WorkerWaitError::Disconnected => PyRuntimeError::new_err("policy worker disconnected"), + fn connect(socket_path: String) -> PyResult { + let config = AssemblyConfig { + agent_id: String::new(), + socket_path: Some(socket_path.clone()), + }; + let handle = spawn_ipc_thread(config.resolve_socket_path()).map_err(|error| { + PyRuntimeError::new_err(format!("failed to start runtime IPC thread: {error}")) })?; - - let payload = worker_result.map_err(map_worker_error_to_py)?; - Ok(PolicyResult { - allowed: payload.allowed, - reason: payload.reason, + Ok(Self { + socket_path, + client: AssemblyClient::new(handle, Vec::new()), }) } - fn close(&mut self) { - if self.closed.swap(true, Ordering::SeqCst) { - return; - } - if let Some(sender) = self.sender.take() { - let _ = sender.send(WorkerMessage::Close); - } - } -} - -async fn worker_loop( - socket_path: String, - mut receiver: mpsc::UnboundedReceiver, - closed: Arc, - last_error: Arc>>, -) { - let stream = match UnixStream::connect(&socket_path).await { - Ok(stream) => stream, - Err(error) => { - set_worker_error(last_error.as_ref(), format!("failed to connect runtime socket: {error}")); - closed.store(true, Ordering::SeqCst); - return; - } - }; - - let (mut reader, mut writer) = stream.into_split(); - if let Err(error) = write_heartbeat(&mut writer).await { - set_worker_error(last_error.as_ref(), format!("failed to send heartbeat: {error:?}")); - closed.store(true, Ordering::SeqCst); - return; - } - - match read_runtime_response(&mut reader).await { - Ok(RuntimeResponse::Ack) => {} - Ok(_) => { - set_worker_error(last_error.as_ref(), "unexpected heartbeat response from runtime".to_string()); - closed.store(true, Ordering::SeqCst); - return; - } - Err(error) => { - set_worker_error(last_error.as_ref(), format!("failed to read heartbeat ack: {error}")); - closed.store(true, Ordering::SeqCst); - return; - } - } - - while let Some(message) = receiver.recv().await { - match message { - WorkerMessage::Event(event) => { - let send_result = send_event_frame(&mut writer, &event).await; - if let Err(error) = send_result { - set_worker_error(last_error.as_ref(), format!("failed to send event: {error:?}")); - break; - } - - match read_runtime_response(&mut reader).await { - Ok(RuntimeResponse::Ack) => {} - Ok(_) => { - set_worker_error(last_error.as_ref(), "unexpected event ack response from runtime".to_string()); - break; - } - Err(error) => { - set_worker_error(last_error.as_ref(), format!("failed to read event ack: {error}")); - break; - } - } - } - WorkerMessage::PolicyQuery { - action_json, - timeout_ms, - response_tx, - } => { - let response = process_policy_query(&mut reader, &mut writer, action_json, timeout_ms).await; - let _ = response_tx.send(response); - } - WorkerMessage::Close => break, - } - } - - closed.store(true, Ordering::SeqCst); -} - -async fn send_event_frame(writer: &mut W, event: &GovernanceEvent) -> Result<(), WorkerError> -where - W: AsyncWrite + Unpin, -{ - let entry = &event.audit_entry; - let event_type = format!("{:?}", entry.event_type()); - let agent_id_hex = bytes_to_hex(entry.agent_id().as_bytes()); - let session_id_hex = bytes_to_hex(entry.session_id().as_bytes()); - let audit_event = AuditEvent { - event_id: make_event_id(), - trace_id: "python-sdk".to_string(), - span_id: "ffi-send-event".to_string(), - decision: Decision::Allow as i32, - labels: std::collections::HashMap::from([ - (String::from("payload_json"), event.payload_json.clone()), - (String::from("event_type"), event_type), - (String::from("agent_id_hex"), agent_id_hex), - (String::from("session_id_hex"), session_id_hex), - (String::from("payload"), entry.payload().to_string()), - ]), - ..Default::default() - }; - let payload = audit_event.encode_to_vec(); - write_frame(writer, TAG_EVENT_REPORT, &payload).await -} - -async fn process_policy_query( - reader: &mut R, - writer: &mut W, - action_json: String, - timeout_ms: u64, -) -> Result -where - R: AsyncRead + Unpin, - W: AsyncWrite + Unpin, -{ - let request = CheckActionRequest { - trace_id: action_json, - span_id: "ffi-query-policy".to_string(), - ..Default::default() - }; - let payload = request.encode_to_vec(); - write_frame(writer, TAG_POLICY_QUERY, &payload).await?; - - let response = time::timeout( - Duration::from_millis(timeout_ms), - read_runtime_response(reader), - ) - .await - .map_err(|_| WorkerError::Timeout)? - .map_err(|error| WorkerError::Transport(error))?; - - match response { - RuntimeResponse::PolicyResponse(bytes) => { - let policy = CheckActionResponse::decode(bytes.as_slice()) - .map_err(|error| WorkerError::Decode(error.to_string()))?; - let allowed = matches!(policy.decision, x if x == Decision::Allow as i32 || x == Decision::Redact as i32); - Ok(PolicyResultPayload { - allowed, - reason: policy.reason, - }) - } - RuntimeResponse::Ack => Err(WorkerError::Transport( - "runtime returned ACK instead of policy response".to_string(), - )), - RuntimeResponse::Unknown(tag, _) => Err(WorkerError::Transport(format!( - "runtime returned unexpected tag {tag} for policy query" - ))), - } -} - -fn map_worker_error_to_py(error: WorkerError) -> PyErr { - match error { - WorkerError::Timeout => PolicyTimeoutError::new_err("policy query timed out"), - WorkerError::Disconnected => PyRuntimeError::new_err("policy worker disconnected"), - WorkerError::Transport(message) | WorkerError::Decode(message) => PyRuntimeError::new_err(message), - } -} - -fn ensure_client_open(closed: &AtomicBool, last_error: &Mutex>) -> PyResult<()> { - if !closed.load(Ordering::SeqCst) { - return Ok(()); + /// Ship a captured governance event to the runtime (fire-and-forget). + /// + /// The event payload passes through `aa-sdk-client`'s advisory preflight + /// before the wire; the runtime re-scans authoritatively regardless. + fn send_event(&self, event: GovernanceEvent) -> PyResult<()> { + let event_type = format!("{:?}", event.audit_entry.event_type()); + self.client + .report_event(event_type, event.payload_json) + .map_err(map_sdk_error) } - if let Ok(guard) = last_error.lock() { - if let Some(message) = guard.as_ref() { - return Err(PyRuntimeError::new_err(message.clone())); - } + /// Shut down the background IPC thread. Idempotent. + fn close(&self, py: Python<'_>) { + // Release the GIL while the shared client joins the background thread. + py.detach(|| { + let _ = self.client.shutdown(); + }); } - - Err(PyRuntimeError::new_err("runtime client is closed")) } -fn extract_timeout_ms(action: &Bound<'_, PyAny>) -> u64 { - action - .cast::() - .ok() - .and_then(|dict| dict.get_item("timeout_ms").ok().flatten()) - .and_then(|value| value.extract::().ok()) - .unwrap_or(50) -} - -fn serialize_action_to_json(py: Python<'_>, action: &Bound<'_, PyAny>) -> PyResult { - let json_module = PyModule::import(py, "json")?; - let dumped = json_module.call_method1("dumps", (action,))?; - dumped.extract::() -} - -fn set_worker_error(last_error: &Mutex>, message: String) { - if let Ok(mut guard) = last_error.lock() { - *guard = Some(message); - } -} - -fn make_event_id() -> String { - use std::time::{SystemTime, UNIX_EPOCH}; - - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default(); - format!("py-{}-{}", now.as_secs(), now.subsec_nanos()) +fn map_sdk_error(error: SdkClientError) -> PyErr { + PyRuntimeError::new_err(error.to_string()) } fn action_type_from_str(value: &str) -> i32 { @@ -558,143 +260,6 @@ fn decision_to_str(value: i32) -> &'static str { } } -fn bytes_to_hex(bytes: &[u8; 16]) -> String { - const HEX: &[u8; 16] = b"0123456789abcdef"; - let mut result = String::with_capacity(bytes.len() * 2); - for byte in bytes { - result.push(HEX[(byte >> 4) as usize] as char); - result.push(HEX[(byte & 0x0F) as usize] as char); - } - result -} - -enum RuntimeResponse { - Ack, - PolicyResponse(Vec), - Unknown(u8, Vec), -} - -async fn write_heartbeat(writer: &mut W) -> Result<(), WorkerError> -where - W: AsyncWrite + Unpin, -{ - writer - .write_u8(TAG_HEARTBEAT) - .await - .map_err(|error| WorkerError::Transport(error.to_string()))?; - writer - .flush() - .await - .map_err(|error| WorkerError::Transport(error.to_string())) -} - -async fn write_frame(writer: &mut W, tag: u8, payload: &[u8]) -> Result<(), WorkerError> -where - W: AsyncWrite + Unpin, -{ - writer - .write_u8(tag) - .await - .map_err(|error| WorkerError::Transport(error.to_string()))?; - write_varint(writer, payload.len() as u64).await?; - writer - .write_all(payload) - .await - .map_err(|error| WorkerError::Transport(error.to_string()))?; - writer - .flush() - .await - .map_err(|error| WorkerError::Transport(error.to_string())) -} - -async fn read_runtime_response(reader: &mut R) -> Result -where - R: AsyncRead + Unpin, -{ - let tag = reader.read_u8().await.map_err(|error| error.to_string())?; - match tag { - TAG_ACK => { - let _ = read_length_delimited(reader).await?; - Ok(RuntimeResponse::Ack) - } - TAG_POLICY_RESPONSE => { - let payload = read_length_delimited(reader).await?; - Ok(RuntimeResponse::PolicyResponse(payload)) - } - other => { - let payload = read_length_delimited(reader).await?; - Ok(RuntimeResponse::Unknown(other, payload)) - } - } -} - -async fn read_length_delimited(reader: &mut R) -> Result, String> -where - R: AsyncRead + Unpin, -{ - let len = read_varint(reader).await? as usize; - let mut payload = vec![0u8; len]; - reader - .read_exact(&mut payload) - .await - .map_err(|error| error.to_string())?; - Ok(payload) -} - -async fn read_varint(reader: &mut R) -> Result -where - R: AsyncRead + Unpin, -{ - let mut result: u64 = 0; - let mut shift = 0u32; - loop { - let byte = reader.read_u8().await.map_err(|error| error.to_string())?; - result |= ((byte & 0x7F) as u64) << shift; - if byte & 0x80 == 0 { - break; - } - shift += 7; - if shift >= 64 { - return Err("varint too long".to_string()); - } - } - Ok(result) -} - -async fn write_varint(writer: &mut W, mut value: u64) -> Result<(), WorkerError> -where - W: AsyncWrite + Unpin, -{ - loop { - let byte = (value & 0x7F) as u8; - value >>= 7; - if value == 0 { - writer - .write_u8(byte) - .await - .map_err(|error| WorkerError::Transport(error.to_string()))?; - break; - } - - writer - .write_u8(byte | 0x80) - .await - .map_err(|error| WorkerError::Transport(error.to_string()))?; - } - - Ok(()) -} - -fn wait_for_worker_response( - timeout_ms: u64, - response_rx: oneshot::Receiver>, -) -> Result, WorkerWaitError> { - TOKIO_RUNTIME - .block_on(async move { time::timeout(Duration::from_millis(timeout_ms), response_rx).await }) - .map_err(|_| WorkerWaitError::Timeout)? - .map_err(|_| WorkerWaitError::Disconnected) -} - #[pyfunction] fn audit_event_to_wire_bytes(py: Python<'_>, event: &Bound<'_, PyAny>) -> PyResult> { let proto = audit_event_from_py(event)?; @@ -710,10 +275,8 @@ fn audit_event_from_wire_bytes(py: Python<'_>, data: &Bound<'_, PyBytes>) -> PyR } #[pymodule] -fn _core(py: Python<'_>, module: &Bound<'_, PyModule>) -> PyResult<()> { - module.add("PolicyTimeoutError", py.get_type::())?; +fn _core(_py: Python<'_>, module: &Bound<'_, PyModule>) -> PyResult<()> { module.add_class::()?; - module.add_class::()?; module.add_class::()?; module.add_function(wrap_pyfunction!(audit_event_to_wire_bytes, module)?)?; module.add_function(wrap_pyfunction!(audit_event_from_wire_bytes, module)?)?; From 12fe56355f75716f16b2f580334b9edfa676c1c8 Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Sat, 6 Jun 2026 03:51:53 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=F0=9F=90=9B=20(aa-ffi-python):=20Release?= =?UTF-8?q?=20the=20GIL=20while=20delegating=20send=5Fevent?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit aa-sdk-client ships events over a bounded channel with a blocking send, so under backpressure report_event can park the calling thread. Holding the GIL there stalls every other Python thread — and deadlocks outright when the runtime peer is an in-process Python thread (the native test's mock runtime). Wrap the delegation in py.detach so the GIL is released for the duration of the send. --- rust/aa-ffi-python/src/lib.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/rust/aa-ffi-python/src/lib.rs b/rust/aa-ffi-python/src/lib.rs index 5be43b9..653334d 100644 --- a/rust/aa-ffi-python/src/lib.rs +++ b/rust/aa-ffi-python/src/lib.rs @@ -88,10 +88,13 @@ impl RuntimeClient { /// /// The event payload passes through `aa-sdk-client`'s advisory preflight /// before the wire; the runtime re-scans authoritatively regardless. - fn send_event(&self, event: GovernanceEvent) -> PyResult<()> { + fn send_event(&self, py: Python<'_>, event: GovernanceEvent) -> PyResult<()> { let event_type = format!("{:?}", event.audit_entry.event_type()); - self.client - .report_event(event_type, event.payload_json) + let details = event.payload_json; + // Release the GIL while delegating: aa-sdk-client uses a bounded channel + // with a blocking send, so under backpressure this can park the calling + // thread — holding the GIL there would stall every other Python thread. + py.detach(move || self.client.report_event(event_type, details)) .map_err(map_sdk_error) } From 0104291611281b7c11382b99fb41928aec270232 Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Sat, 6 Jun 2026 08:48:19 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=F0=9F=90=9B=20(ci):=20Drop=20PolicyResult?= =?UTF-8?q?=20from=20the=20native-core=20import=20smoke-check?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The thin shim no longer exposes PolicyResult, so the native-core-build workflow's import verification (`from agent_assembly._core import ..., PolicyResult`) failed after maturin built the module. Import only the symbols the shim exposes (RuntimeClient, GovernanceEvent). --- .github/workflows/native-core-build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/native-core-build.yml b/.github/workflows/native-core-build.yml index 8073c55..08319ca 100644 --- a/.github/workflows/native-core-build.yml +++ b/.github/workflows/native-core-build.yml @@ -41,4 +41,4 @@ jobs: - name: Verify native module import run: | - uv run python -c "from agent_assembly._core import RuntimeClient, GovernanceEvent, PolicyResult" + uv run python -c "from agent_assembly._core import RuntimeClient, GovernanceEvent"