From 2020d6ba6c5b4fefe6a325b530f680220ad8ae47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Fri, 1 Aug 2025 10:48:04 +0400 Subject: [PATCH 01/15] return HTTP accepted on error --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 2 +- Sources/MockServer/MockHTTPServer.swift | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index f536e3f4..86069aad 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -452,7 +452,7 @@ internal struct LambdaHTTPServer { await self.responsePool.push( LocalServerResponse( id: requestId, - status: .ok, + status: .accepted, // the local server has no mecanism to collect headers set by the lambda function headers: HTTPHeaders(), body: body, diff --git a/Sources/MockServer/MockHTTPServer.swift b/Sources/MockServer/MockHTTPServer.swift index 78685c52..92fd297f 100644 --- a/Sources/MockServer/MockHTTPServer.swift +++ b/Sources/MockServer/MockHTTPServer.swift @@ -224,7 +224,7 @@ struct HttpServer { } else if requestHead.uri.hasSuffix("/response") { responseStatus = .accepted } else if requestHead.uri.hasSuffix("/error") { - responseStatus = .ok + responseStatus = .accepted } else { responseStatus = .notFound } From 6e01c6ef85b12d38632ae349ed6971f5774c3fa4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Fri, 1 Aug 2025 11:16:54 +0400 Subject: [PATCH 02/15] force exit() when we loose connection to Lambda service --- Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index a1afb464..1962478f 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -330,6 +330,8 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { try channel.pipeline.syncOperations.addHTTPClientHandlers() // Lambda quotas... An invocation payload is maximal 6MB in size: // https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html + // TODO: should we enforce this here ? What about streaming functions that + // support up to 20Mb responses ? try channel.pipeline.syncOperations.addHandler( NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024) ) @@ -364,6 +366,14 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { channel.closeFuture.whenComplete { result in self.assumeIsolated { runtimeClient in runtimeClient.channelClosed(channel) + + // at this stage, we lost the connection to the Lambda Service, + // this is very unlikely to happen when running in a lambda function deployed in the cloud + // however, this happens when performance testing against the MockServer + // shutdown this runtime. + // The Lambda service will create a new runtime environment anyway + runtimeClient.logger.trace("Connection to Lambda API lost, exiting") + exit(-1) } } From 166cd461de8f595da9af1d33be4e2d601ab392a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sun, 3 Aug 2025 18:54:23 +0200 Subject: [PATCH 03/15] propagate the connection closed info through a Future --- Sources/AWSLambdaRuntime/Lambda.swift | 12 ++++++++++++ .../LambdaRuntime+ServiceLifecycle.swift | 10 ++++++++-- Sources/AWSLambdaRuntime/LambdaRuntime.swift | 8 +++++++- Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift | 10 ++++++++-- 4 files changed, 35 insertions(+), 5 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda.swift b/Sources/AWSLambdaRuntime/Lambda.swift index 5412c139..ae8ca78a 100644 --- a/Sources/AWSLambdaRuntime/Lambda.swift +++ b/Sources/AWSLambdaRuntime/Lambda.swift @@ -41,6 +41,17 @@ public enum Lambda { var logger = logger do { while !Task.isCancelled { + + if let runtimeClient = runtimeClient as? LambdaRuntimeClient, + let futureConnectionClosed = await runtimeClient.futureConnectionClosed + { + // Wait for the futureConnectionClosed to complete, + // which will happen when the Lambda HTTP Server (or MockServer) closes the connection + // This allows us to exit the run loop gracefully. + // The futureConnectionClosed is always an error, let it throw to finish the run loop. + let _ = try await futureConnectionClosed.get() + } + let (invocation, writer) = try await runtimeClient.nextInvocation() logger[metadataKey: "aws-request-id"] = "\(invocation.metadata.requestID)" @@ -84,6 +95,7 @@ public enum Lambda { } catch is CancellationError { // don't allow cancellation error to propagate further } + } /// The default EventLoop the Lambda is scheduled on. diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift b/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift index 1b05b1c2..eb3e210e 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift @@ -17,8 +17,14 @@ import ServiceLifecycle extension LambdaRuntime: Service { public func run() async throws { - try await cancelWhenGracefulShutdown { - try await self._run() + await cancelWhenGracefulShutdown { + do { + try await self._run() + } catch { + // catch top level error that have not been handled before + // this avoids the runtime to crash and generate a backtrace + self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) + } } } } diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime.swift b/Sources/AWSLambdaRuntime/LambdaRuntime.swift index 5f66df6f..7fc217a6 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime.swift @@ -59,7 +59,13 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb #if !ServiceLifecycleSupport @inlinable internal func run() async throws { - try await _run() + do { + try await _run() + } catch { + // catch top level error that have not been handled before + // this avoids the runtime to crash and generate a backtrace + self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) + } } #endif diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index 1962478f..a54c534d 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -92,6 +92,9 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { case closed } + @usableFromInline + var futureConnectionClosed: EventLoopFuture? = nil + private let eventLoop: any EventLoop private let logger: Logger private let configuration: Configuration @@ -372,8 +375,10 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { // however, this happens when performance testing against the MockServer // shutdown this runtime. // The Lambda service will create a new runtime environment anyway - runtimeClient.logger.trace("Connection to Lambda API lost, exiting") - exit(-1) + runtimeClient.logger.trace("Connection to Lambda API. lost, exiting") + runtimeClient.futureConnectionClosed = runtimeClient.eventLoop.makeFailedFuture( + LambdaRuntimeError(code: .connectionToControlPlaneLost) + ) } } @@ -392,6 +397,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { return handler } } catch { + switch self.connectionState { case .disconnected, .connected: fatalError("Unexpected state: \(self.connectionState)") From a69ed54056867d823bc0c7263b17cfc70b8e954d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sun, 3 Aug 2025 19:04:50 +0200 Subject: [PATCH 04/15] fix typos --- Sources/AWSLambdaRuntime/Lambda.swift | 2 +- Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift | 2 +- Sources/AWSLambdaRuntime/LambdaRuntime.swift | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda.swift b/Sources/AWSLambdaRuntime/Lambda.swift index ae8ca78a..9b96f7ce 100644 --- a/Sources/AWSLambdaRuntime/Lambda.swift +++ b/Sources/AWSLambdaRuntime/Lambda.swift @@ -48,7 +48,7 @@ public enum Lambda { // Wait for the futureConnectionClosed to complete, // which will happen when the Lambda HTTP Server (or MockServer) closes the connection // This allows us to exit the run loop gracefully. - // The futureConnectionClosed is always an error, let it throw to finish the run loop. + // The futureConnectionClosed is always an error, let it throw to terminate the Lambda run loop. let _ = try await futureConnectionClosed.get() } diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift b/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift index eb3e210e..90c4b060 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift @@ -21,7 +21,7 @@ extension LambdaRuntime: Service { do { try await self._run() } catch { - // catch top level error that have not been handled before + // catch top level errors that have not been handled until now // this avoids the runtime to crash and generate a backtrace self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) } diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime.swift b/Sources/AWSLambdaRuntime/LambdaRuntime.swift index 7fc217a6..9d4c3c8b 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime.swift @@ -62,7 +62,7 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb do { try await _run() } catch { - // catch top level error that have not been handled before + // catch top level errors that have not been handled until now // this avoids the runtime to crash and generate a backtrace self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) } From 04d9fc7cdce9a443533f6b1a178483285df8d1a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sun, 3 Aug 2025 20:07:09 +0200 Subject: [PATCH 05/15] fix unit tests --- Sources/AWSLambdaRuntime/Lambda.swift | 4 ++++ .../LambdaRuntime+ServiceLifecycle.swift | 9 ++++++++- Sources/AWSLambdaRuntime/LambdaRuntime.swift | 7 +++++++ Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift | 4 +--- 4 files changed, 20 insertions(+), 4 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda.swift b/Sources/AWSLambdaRuntime/Lambda.swift index 9b96f7ce..b24d98fb 100644 --- a/Sources/AWSLambdaRuntime/Lambda.swift +++ b/Sources/AWSLambdaRuntime/Lambda.swift @@ -52,6 +52,7 @@ public enum Lambda { let _ = try await futureConnectionClosed.get() } + logger.trace("Waiting for next invocation") let (invocation, writer) = try await runtimeClient.nextInvocation() logger[metadataKey: "aws-request-id"] = "\(invocation.metadata.requestID)" @@ -87,10 +88,13 @@ public enum Lambda { logger: logger ) ) + logger.trace("Handler finished processing invocation") } catch { + logger.trace("Handler failed processing invocation", metadata: ["Handler error": "\(error)"]) try await writer.reportError(error) continue } + logger.handler.metadata.removeValue(forKey: "aws-request-id") } } catch is CancellationError { // don't allow cancellation error to propagate further diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift b/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift index 90c4b060..50616df9 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift @@ -17,13 +17,20 @@ import ServiceLifecycle extension LambdaRuntime: Service { public func run() async throws { - await cancelWhenGracefulShutdown { + try await cancelWhenGracefulShutdown { do { try await self._run() } catch { // catch top level errors that have not been handled until now // this avoids the runtime to crash and generate a backtrace self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) + if let error = error as? LambdaRuntimeError, + error.code != .connectionToControlPlaneLost + { + // if the error is a LambdaRuntimeError but not a connection error, + // we rethrow it to preserve existing behaviour + throw error + } } } } diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime.swift b/Sources/AWSLambdaRuntime/LambdaRuntime.swift index 9d4c3c8b..daa8ed5f 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime.swift @@ -65,6 +65,13 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb // catch top level errors that have not been handled until now // this avoids the runtime to crash and generate a backtrace self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) + if let error = error as? LambdaRuntimeError, + error.code != .connectionToControlPlaneLost + { + // if the error is a LambdaRuntimeError but not a connection error, + // we rethrow it to preserve existing behaviour + throw error + } } } #endif diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index a54c534d..ece727ef 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -121,10 +121,8 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { } catch { result = .failure(error) } - await runtime.close() - //try? await runtime.close() return try result.get() } @@ -375,7 +373,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { // however, this happens when performance testing against the MockServer // shutdown this runtime. // The Lambda service will create a new runtime environment anyway - runtimeClient.logger.trace("Connection to Lambda API. lost, exiting") + runtimeClient.logger.trace("Connection to Lambda Service HTTP Server lost, exiting") runtimeClient.futureConnectionClosed = runtimeClient.eventLoop.makeFailedFuture( LambdaRuntimeError(code: .connectionToControlPlaneLost) ) From 025a0e59e31df9275d23b4e8ed41122d8c8d527f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 7 Aug 2025 16:00:39 +0200 Subject: [PATCH 06/15] simplify by checking connection state in the `nextInvocation()` call --- Sources/AWSLambdaRuntime/Lambda.swift | 16 +++----- .../LambdaRuntimeClient.swift | 39 ++++++++++++------- 2 files changed, 31 insertions(+), 24 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda.swift b/Sources/AWSLambdaRuntime/Lambda.swift index 80f862ce..f3edaccf 100644 --- a/Sources/AWSLambdaRuntime/Lambda.swift +++ b/Sources/AWSLambdaRuntime/Lambda.swift @@ -41,17 +41,13 @@ public enum Lambda { var logger = logger do { while !Task.isCancelled { - - if let runtimeClient = runtimeClient as? LambdaRuntimeClient, - let futureConnectionClosed = await runtimeClient.futureConnectionClosed - { - // Wait for the futureConnectionClosed to complete, - // which will happen when the Lambda HTTP Server (or MockServer) closes the connection - // This allows us to exit the run loop gracefully. - // The futureConnectionClosed is always an error, let it throw to terminate the Lambda run loop. - let _ = try await futureConnectionClosed.get() + + guard let runtimeClient = runtimeClient as? LambdaRuntimeClient, + await !runtimeClient.isConnectionStateDisconnected else { + logger.trace("Runtime client not connected, exiting run loop") + throw LambdaRuntimeError.init(code: .connectionToControlPlaneLost) } - + logger.trace("Waiting for next invocation") let (invocation, writer) = try await runtimeClient.nextInvocation() logger[metadataKey: "aws-request-id"] = "\(invocation.metadata.requestID)" diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index ece727ef..e06d1732 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -67,10 +67,23 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { NIOLoopBound>, any Error > - private enum ConnectionState { + private enum ConnectionState: Equatable { case disconnected case connecting([ConnectionContinuation]) case connected(Channel, LambdaChannelHandler) + + static func == (lhs: ConnectionState, rhs: ConnectionState) -> Bool { + switch (lhs, rhs) { + case (.disconnected, .disconnected): + return true + case (.connecting, .connecting): + return true + case (.connected, .connected): + return true + default: + return false + } + } } enum LambdaState { @@ -92,14 +105,22 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { case closed } - @usableFromInline - var futureConnectionClosed: EventLoopFuture? = nil - private let eventLoop: any EventLoop private let logger: Logger private let configuration: Configuration private var connectionState: ConnectionState = .disconnected + + // adding this dynamic property because I can not give access to `connectionState` directly + // because it is private, depending on multiple private and non-Sendable types + // the only thing we need to know outside of this class is if the connection state is disconnected + @usableFromInline + var isConnectionStateDisconnected: Bool { + get { + self.connectionState == .disconnected + } + } + private var lambdaState: LambdaState = .idle(previousRequestID: nil) private var closingState: ClosingState = .notClosing @@ -367,16 +388,6 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { channel.closeFuture.whenComplete { result in self.assumeIsolated { runtimeClient in runtimeClient.channelClosed(channel) - - // at this stage, we lost the connection to the Lambda Service, - // this is very unlikely to happen when running in a lambda function deployed in the cloud - // however, this happens when performance testing against the MockServer - // shutdown this runtime. - // The Lambda service will create a new runtime environment anyway - runtimeClient.logger.trace("Connection to Lambda Service HTTP Server lost, exiting") - runtimeClient.futureConnectionClosed = runtimeClient.eventLoop.makeFailedFuture( - LambdaRuntimeError(code: .connectionToControlPlaneLost) - ) } } From ce8b5672aa4806a1d6d9f2652fed5452d15a5466 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 7 Aug 2025 16:49:26 +0200 Subject: [PATCH 07/15] introducing a new connection state "lostConnection" --- Sources/AWSLambdaRuntime/Lambda.swift | 4 ++-- .../LambdaRuntimeClient.swift | 24 +++++++++++++++---- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda.swift b/Sources/AWSLambdaRuntime/Lambda.swift index f3edaccf..683e3090 100644 --- a/Sources/AWSLambdaRuntime/Lambda.swift +++ b/Sources/AWSLambdaRuntime/Lambda.swift @@ -43,8 +43,8 @@ public enum Lambda { while !Task.isCancelled { guard let runtimeClient = runtimeClient as? LambdaRuntimeClient, - await !runtimeClient.isConnectionStateDisconnected else { - logger.trace("Runtime client not connected, exiting run loop") + await !runtimeClient.didLooseConnection else { + logger.trace("Runtime client disconnected, exiting run loop") throw LambdaRuntimeError.init(code: .connectionToControlPlaneLost) } diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index e06d1732..d1900509 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -69,6 +69,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { private enum ConnectionState: Equatable { case disconnected + case lostConnection case connecting([ConnectionContinuation]) case connected(Channel, LambdaChannelHandler) @@ -80,6 +81,8 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { return true case (.connected, .connected): return true + case (.lostConnection, .lostConnection): + return true default: return false } @@ -115,9 +118,9 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { // because it is private, depending on multiple private and non-Sendable types // the only thing we need to know outside of this class is if the connection state is disconnected @usableFromInline - var isConnectionStateDisconnected: Bool { + var didLooseConnection: Bool { get { - self.connectionState == .disconnected + self.connectionState == .lostConnection } } @@ -179,12 +182,16 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { case .connected(let channel, _): channel.close(mode: .all, promise: nil) + case .lostConnection: + // this should never happen. + fatalError("Lost connection to Lambda service while closing the runtime client") } } } @usableFromInline func nextInvocation() async throws -> (Invocation, Writer) { + try await withTaskCancellationHandler { switch self.lambdaState { case .idle: @@ -284,7 +291,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { private func channelClosed(_ channel: any Channel) { switch (self.connectionState, self.closingState) { - case (_, .closed): + case (_, .closed), (.lostConnection, _): fatalError("Invalid state: \(self.connectionState), \(self.closingState)") case (.disconnected, .notClosing): @@ -344,6 +351,10 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { return loopBound.value case .connected(_, let handler): return handler + + case .lostConnection: + // this should never happen + fatalError("Lost connection to Lambda service") } let bootstrap = ClientBootstrap(group: self.eventLoop) @@ -392,7 +403,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { } switch self.connectionState { - case .disconnected, .connected: + case .disconnected, .connected, .lostConnection: fatalError("Unexpected state: \(self.connectionState)") case .connecting(let array): @@ -408,7 +419,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { } catch { switch self.connectionState { - case .disconnected, .connected: + case .disconnected, .connected, .lostConnection: fatalError("Unexpected state: \(self.connectionState)") case .connecting(let array): @@ -456,6 +467,9 @@ extension LambdaRuntimeClient: LambdaChannelHandlerDelegate { isolated.connectionState = .disconnected + case .lostConnection: + // this should never happen + fatalError("Lost connection to Lambda service") } } } From be4cb20bf1d0fc656a4ffa56258a528e995d3e3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 7 Aug 2025 16:58:10 +0200 Subject: [PATCH 08/15] add state change --- Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index d1900509..eef8cbcd 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -399,6 +399,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { channel.closeFuture.whenComplete { result in self.assumeIsolated { runtimeClient in runtimeClient.channelClosed(channel) + runtimeClient.connectionState = .lostConnection } } From b37ea0ee389fafa16d5ab8181900fd68b27753cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 7 Aug 2025 23:26:42 +0200 Subject: [PATCH 09/15] fix lost continuation --- Sources/AWSLambdaRuntime/Lambda.swift | 6 --- .../LambdaRuntimeClient.swift | 46 ++++++++++++------- .../AWSLambdaRuntime/LambdaRuntimeError.swift | 1 - 3 files changed, 29 insertions(+), 24 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda.swift b/Sources/AWSLambdaRuntime/Lambda.swift index 683e3090..042d9c6f 100644 --- a/Sources/AWSLambdaRuntime/Lambda.swift +++ b/Sources/AWSLambdaRuntime/Lambda.swift @@ -42,12 +42,6 @@ public enum Lambda { do { while !Task.isCancelled { - guard let runtimeClient = runtimeClient as? LambdaRuntimeClient, - await !runtimeClient.didLooseConnection else { - logger.trace("Runtime client disconnected, exiting run loop") - throw LambdaRuntimeError.init(code: .connectionToControlPlaneLost) - } - logger.trace("Waiting for next invocation") let (invocation, writer) = try await runtimeClient.nextInvocation() logger[metadataKey: "aws-request-id"] = "\(invocation.metadata.requestID)" diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index eef8cbcd..e0366a77 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -114,16 +114,6 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { private var connectionState: ConnectionState = .disconnected - // adding this dynamic property because I can not give access to `connectionState` directly - // because it is private, depending on multiple private and non-Sendable types - // the only thing we need to know outside of this class is if the connection state is disconnected - @usableFromInline - var didLooseConnection: Bool { - get { - self.connectionState == .lostConnection - } - } - private var lambdaState: LambdaState = .idle(previousRequestID: nil) private var closingState: ClosingState = .notClosing @@ -146,7 +136,6 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { result = .failure(error) } await runtime.close() - return try result.get() } @@ -182,9 +171,9 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { case .connected(let channel, _): channel.close(mode: .all, promise: nil) + case .lostConnection: - // this should never happen. - fatalError("Lost connection to Lambda service while closing the runtime client") + continuation.resume() } } } @@ -192,12 +181,17 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { @usableFromInline func nextInvocation() async throws -> (Invocation, Writer) { - try await withTaskCancellationHandler { + if self.connectionState == .lostConnection { + throw LambdaRuntimeError(code: .connectionToControlPlaneLost) + } + + return try await withTaskCancellationHandler { switch self.lambdaState { case .idle: self.lambdaState = .waitingForNextInvocation let handler = try await self.makeOrGetConnection() let invocation = try await handler.nextInvocation() + guard case .waitingForNextInvocation = self.lambdaState else { fatalError("Invalid state: \(self.lambdaState)") } @@ -312,7 +306,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { case (.connecting(let array), .notClosing): self.connectionState = .disconnected for continuation in array { - continuation.resume(throwing: LambdaRuntimeError(code: .lostConnectionToControlPlane)) + continuation.resume(throwing: LambdaRuntimeError(code: .connectionToControlPlaneLost)) } case (.connecting(let array), .closing(let continuation)): @@ -326,6 +320,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { case (.connected, .notClosing): self.connectionState = .disconnected + case (.connected, .closing(let continuation)): self.connectionState = .disconnected @@ -398,13 +393,24 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { ) channel.closeFuture.whenComplete { result in self.assumeIsolated { runtimeClient in + + // resume any pending continuation on the handler + if case .connected(_ , let handler) = runtimeClient.connectionState { + if case .connected(_ , let lambdaState) = handler.state { + if case .waitingForNextInvocation(let continuation) = lambdaState { + continuation.resume(throwing: LambdaRuntimeError(code: .connectionToControlPlaneLost)) + } + } + } + + // close the channel runtimeClient.channelClosed(channel) runtimeClient.connectionState = .lostConnection } } switch self.connectionState { - case .disconnected, .connected, .lostConnection: + case .disconnected, .connected: fatalError("Unexpected state: \(self.connectionState)") case .connecting(let array): @@ -416,11 +422,14 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { } } return handler + case .lostConnection: + // this should never happen + fatalError("Lost connection to Lambda service") } } catch { switch self.connectionState { - case .disconnected, .connected, .lostConnection: + case .disconnected, .connected: fatalError("Unexpected state: \(self.connectionState)") case .connecting(let array): @@ -431,6 +440,9 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { } } throw error + case .lostConnection: + // this should never happen + fatalError("Lost connection to Lambda service") } } } diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeError.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeError.swift index a9c0cbca..bc4865db 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeError.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeError.swift @@ -25,7 +25,6 @@ package struct LambdaRuntimeError: Error { case writeAfterFinishHasBeenSent case finishAfterFinishHasBeenSent - case lostConnectionToControlPlane case unexpectedStatusCodeForRequest case nextInvocationMissingHeaderRequestID From cd0094823fcb54277bd0d682ea756ead78e066b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 7 Aug 2025 23:32:46 +0200 Subject: [PATCH 10/15] fix compilation error --- Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index e0366a77..27eb74ea 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -515,7 +515,7 @@ private final class LambdaChannelHandler } } - private var state: State = .disconnected + var state: State = .disconnected private var lastError: Error? private var reusableErrorBuffer: ByteBuffer? private let logger: Logger From 008c54246390ae45ace08ba8b0860649958fc5f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 7 Aug 2025 23:33:06 +0200 Subject: [PATCH 11/15] DRY: move the error handling to the _run() function --- .../LambdaRuntime+ServiceLifecycle.swift | 15 +--------- Sources/AWSLambdaRuntime/LambdaRuntime.swift | 28 +++++++++---------- 2 files changed, 15 insertions(+), 28 deletions(-) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift b/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift index 50616df9..1b05b1c2 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift @@ -18,20 +18,7 @@ import ServiceLifecycle extension LambdaRuntime: Service { public func run() async throws { try await cancelWhenGracefulShutdown { - do { - try await self._run() - } catch { - // catch top level errors that have not been handled until now - // this avoids the runtime to crash and generate a backtrace - self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) - if let error = error as? LambdaRuntimeError, - error.code != .connectionToControlPlaneLost - { - // if the error is a LambdaRuntimeError but not a connection error, - // we rethrow it to preserve existing behaviour - throw error - } - } + try await self._run() } } } diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime.swift b/Sources/AWSLambdaRuntime/LambdaRuntime.swift index daa8ed5f..4d7450e5 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime.swift @@ -59,20 +59,7 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb #if !ServiceLifecycleSupport @inlinable internal func run() async throws { - do { - try await _run() - } catch { - // catch top level errors that have not been handled until now - // this avoids the runtime to crash and generate a backtrace - self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) - if let error = error as? LambdaRuntimeError, - error.code != .connectionToControlPlaneLost - { - // if the error is a LambdaRuntimeError but not a connection error, - // we rethrow it to preserve existing behaviour - throw error - } - } + try await _run() } #endif @@ -107,6 +94,7 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb let ip = String(ipAndPort[0]) guard let port = Int(ipAndPort[1]) else { throw LambdaRuntimeError(code: .invalidPort) } + do { try await LambdaRuntimeClient.withRuntimeClient( configuration: .init(ip: ip, port: port), eventLoop: self.eventLoop, @@ -118,6 +106,18 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb logger: self.logger ) } + } catch { + // catch top level errors that have not been handled until now + // this avoids the runtime to crash and generate a backtrace + self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) + if let error = error as? LambdaRuntimeError, + error.code != .connectionToControlPlaneLost + { + // if the error is a LambdaRuntimeError but not a connection error, + // we rethrow it to preserve existing behaviour + throw error + } + } } else { From 9dcb4b33115b0667a2f1a6542e5215c30237dd2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 7 Aug 2025 23:51:33 +0200 Subject: [PATCH 12/15] fix a case where continuation was resumed twice --- Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index 27eb74ea..ff03e329 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -937,6 +937,7 @@ extension LambdaChannelHandler: ChannelInboundHandler { // fail any pending responses with last error or assume peer disconnected switch self.state { case .connected(_, .waitingForNextInvocation(let continuation)): + self.state = .disconnected continuation.resume(throwing: self.lastError ?? ChannelError.ioOnClosedChannel) default: break From f2d94a2b5c68f4ad45b1292c7f391071f098a204 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 7 Aug 2025 23:52:45 +0200 Subject: [PATCH 13/15] fix unit test --- Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index ff03e329..a0898971 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -348,8 +348,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { return handler case .lostConnection: - // this should never happen - fatalError("Lost connection to Lambda service") + throw LambdaRuntimeError(code: .connectionToControlPlaneLost) } let bootstrap = ClientBootstrap(group: self.eventLoop) From 852391e51635c23f21bda3705ac2a997c28e935b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 7 Aug 2025 23:53:11 +0200 Subject: [PATCH 14/15] swift format --- Sources/AWSLambdaRuntime/Lambda.swift | 2 +- Sources/AWSLambdaRuntime/LambdaRuntime.swift | 42 +++++++++---------- .../LambdaRuntimeClient.swift | 11 +++-- 3 files changed, 27 insertions(+), 28 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda.swift b/Sources/AWSLambdaRuntime/Lambda.swift index 042d9c6f..f6223cb5 100644 --- a/Sources/AWSLambdaRuntime/Lambda.swift +++ b/Sources/AWSLambdaRuntime/Lambda.swift @@ -41,7 +41,7 @@ public enum Lambda { var logger = logger do { while !Task.isCancelled { - + logger.trace("Waiting for next invocation") let (invocation, writer) = try await runtimeClient.nextInvocation() logger[metadataKey: "aws-request-id"] = "\(invocation.metadata.requestID)" diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime.swift b/Sources/AWSLambdaRuntime/LambdaRuntime.swift index 4d7450e5..a639ac31 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime.swift @@ -95,29 +95,29 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb guard let port = Int(ipAndPort[1]) else { throw LambdaRuntimeError(code: .invalidPort) } do { - try await LambdaRuntimeClient.withRuntimeClient( - configuration: .init(ip: ip, port: port), - eventLoop: self.eventLoop, - logger: self.logger - ) { runtimeClient in - try await Lambda.runLoop( - runtimeClient: runtimeClient, - handler: handler, + try await LambdaRuntimeClient.withRuntimeClient( + configuration: .init(ip: ip, port: port), + eventLoop: self.eventLoop, logger: self.logger - ) - } - } catch { - // catch top level errors that have not been handled until now - // this avoids the runtime to crash and generate a backtrace - self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) - if let error = error as? LambdaRuntimeError, - error.code != .connectionToControlPlaneLost - { - // if the error is a LambdaRuntimeError but not a connection error, - // we rethrow it to preserve existing behaviour - throw error + ) { runtimeClient in + try await Lambda.runLoop( + runtimeClient: runtimeClient, + handler: handler, + logger: self.logger + ) + } + } catch { + // catch top level errors that have not been handled until now + // this avoids the runtime to crash and generate a backtrace + self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) + if let error = error as? LambdaRuntimeError, + error.code != .connectionToControlPlaneLost + { + // if the error is a LambdaRuntimeError but not a connection error, + // we rethrow it to preserve existing behaviour + throw error + } } - } } else { diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index a0898971..40f84f89 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -72,7 +72,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { case lostConnection case connecting([ConnectionContinuation]) case connected(Channel, LambdaChannelHandler) - + static func == (lhs: ConnectionState, rhs: ConnectionState) -> Bool { switch (lhs, rhs) { case (.disconnected, .disconnected): @@ -171,7 +171,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { case .connected(let channel, _): channel.close(mode: .all, promise: nil) - + case .lostConnection: continuation.resume() } @@ -191,7 +191,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { self.lambdaState = .waitingForNextInvocation let handler = try await self.makeOrGetConnection() let invocation = try await handler.nextInvocation() - + guard case .waitingForNextInvocation = self.lambdaState else { fatalError("Invalid state: \(self.lambdaState)") } @@ -320,7 +320,6 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { case (.connected, .notClosing): self.connectionState = .disconnected - case (.connected, .closing(let continuation)): self.connectionState = .disconnected @@ -394,8 +393,8 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { self.assumeIsolated { runtimeClient in // resume any pending continuation on the handler - if case .connected(_ , let handler) = runtimeClient.connectionState { - if case .connected(_ , let lambdaState) = handler.state { + if case .connected(_, let handler) = runtimeClient.connectionState { + if case .connected(_, let lambdaState) = handler.state { if case .waitingForNextInvocation(let continuation) = lambdaState { continuation.resume(throwing: LambdaRuntimeError(code: .connectionToControlPlaneLost)) } From b13bf5cfa0bf5c28c6eb0a31a3218a48f4a26537 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 7 Aug 2025 23:59:23 +0200 Subject: [PATCH 15/15] remove comment on max payload size --- Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift | 2 -- 1 file changed, 2 deletions(-) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index 40f84f89..008b9b42 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -356,8 +356,6 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { try channel.pipeline.syncOperations.addHTTPClientHandlers() // Lambda quotas... An invocation payload is maximal 6MB in size: // https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html - // TODO: should we enforce this here ? What about streaming functions that - // support up to 20Mb responses ? try channel.pipeline.syncOperations.addHandler( NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024) )