Skip to content

Commit 34a70ac

Browse files
authored
feat(realtime): subscribe retry improvements (#747)
* feat(realtime): add retry limit to subscribe * feat(realtime): add exponential backoff with jitter and cancellation support * use Encodable type for broadcast * fix tests * test: revamp realtime tests * remove redundant test case * fix(realtime): fix subscription retry logic and test failures - Add missing return statement in subscribeWithError() to prevent infinite retries - Add heartbeat response handling in testBehavior test - Fix test expectations for retry attempts - Ensure proper cleanup on subscription failure This fixes the issue where successful subscriptions would continue retrying and tests would fail due to incorrect retry attempt counting. * add configurable option for maxRetryAttempts
1 parent e1a400a commit 34a70ac

File tree

7 files changed

+326
-35
lines changed

7 files changed

+326
-35
lines changed

Sources/Helpers/Task+withTimeout.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import Foundation
1010
@discardableResult
1111
package func withTimeout<R: Sendable>(
1212
interval: TimeInterval,
13-
@_inheritActorContext operation: @escaping @Sendable () async throws -> R
13+
@_inheritActorContext operation: @escaping @Sendable () async -> R
1414
) async throws -> R {
1515
try await withThrowingTaskGroup(of: R.self) { group in
1616
defer {
@@ -20,7 +20,7 @@ package func withTimeout<R: Sendable>(
2020
let deadline = Date(timeIntervalSinceNow: interval)
2121

2222
group.addTask {
23-
try await operation()
23+
await operation()
2424
}
2525

2626
group.addTask {

Sources/Realtime/RealtimeChannelV2.swift

Lines changed: 109 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,101 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
9191
callbackManager.reset()
9292
}
9393

94-
/// Subscribes to the channel
94+
/// Subscribes to the channel.
95+
public func subscribeWithError() async throws {
96+
logger?.debug("Starting subscription to channel '\(topic)' (attempt 1/\(socket.options.maxRetryAttempts))")
97+
98+
status = .subscribing
99+
100+
defer {
101+
// If the subscription fails, we need to set the status to unsubscribed
102+
// to avoid the channel being stuck in a subscribing state.
103+
if status != .subscribed {
104+
status = .unsubscribed
105+
}
106+
}
107+
108+
var attempts = 0
109+
110+
while attempts < socket.options.maxRetryAttempts {
111+
attempts += 1
112+
113+
do {
114+
logger?.debug(
115+
"Attempting to subscribe to channel '\(topic)' (attempt \(attempts)/\(socket.options.maxRetryAttempts))"
116+
)
117+
118+
try await withTimeout(interval: socket.options.timeoutInterval) { [self] in
119+
await _subscribe()
120+
}
121+
122+
logger?.debug("Successfully subscribed to channel '\(topic)'")
123+
return
124+
125+
} catch is TimeoutError {
126+
logger?.debug(
127+
"Subscribe timed out for channel '\(topic)' (attempt \(attempts)/\(socket.options.maxRetryAttempts))"
128+
)
129+
130+
if attempts < socket.options.maxRetryAttempts {
131+
// Add exponential backoff with jitter
132+
let delay = calculateRetryDelay(for: attempts)
133+
logger?.debug(
134+
"Retrying subscription to channel '\(topic)' in \(String(format: "%.2f", delay)) seconds..."
135+
)
136+
137+
do {
138+
try await _clock.sleep(for: delay)
139+
} catch {
140+
// If sleep is cancelled, break out of retry loop
141+
logger?.debug("Subscription retry cancelled for channel '\(topic)'")
142+
throw CancellationError()
143+
}
144+
} else {
145+
logger?.error(
146+
"Failed to subscribe to channel '\(topic)' after \(socket.options.maxRetryAttempts) attempts due to timeout"
147+
)
148+
}
149+
} catch is CancellationError {
150+
logger?.debug("Subscription retry cancelled for channel '\(topic)'")
151+
throw CancellationError()
152+
} catch {
153+
preconditionFailure(
154+
"The only possible error here is TimeoutError or CancellationError, this should never happen."
155+
)
156+
}
157+
}
158+
159+
logger?.error("Subscription to channel '\(topic)' failed after \(attempts) attempts")
160+
throw RealtimeError.maxRetryAttemptsReached
161+
}
162+
163+
/// Subscribes to the channel.
164+
@available(*, deprecated, message: "Use `subscribeWithError` instead")
95165
@MainActor
96166
public func subscribe() async {
167+
try? await subscribeWithError()
168+
}
169+
170+
/// Calculates retry delay with exponential backoff and jitter
171+
private func calculateRetryDelay(for attempt: Int) -> TimeInterval {
172+
let baseDelay: TimeInterval = 1.0
173+
let maxDelay: TimeInterval = 30.0
174+
let backoffMultiplier: Double = 2.0
175+
176+
let exponentialDelay = baseDelay * pow(backoffMultiplier, Double(attempt - 1))
177+
let cappedDelay = min(exponentialDelay, maxDelay)
178+
179+
// Add jitter (±25% random variation) to prevent thundering herd
180+
let jitterRange = cappedDelay * 0.25
181+
let jitter = Double.random(in: -jitterRange...jitterRange)
182+
183+
return max(0.1, cappedDelay + jitter)
184+
}
185+
186+
/// Subscribes to the channel
187+
@MainActor
188+
private func _subscribe() async {
97189
if socket.status != .connected {
98190
if socket.options.connectOnSubscribe != true {
99191
reportIssue(
@@ -104,7 +196,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
104196
await socket.connect()
105197
}
106198

107-
status = .subscribing
108199
logger?.debug("Subscribing to channel \(topic)")
109200

110201
config.presence.enabled = callbackManager.callbacks.contains(where: { $0.isPresence })
@@ -133,18 +224,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
133224
payload: try! JSONObject(payload)
134225
)
135226

136-
do {
137-
try await withTimeout(interval: socket.options.timeoutInterval) { [self] in
138-
_ = await statusChange.first { @Sendable in $0 == .subscribed }
139-
}
140-
} catch {
141-
if error is TimeoutError {
142-
logger?.debug("Subscribe timed out.")
143-
await subscribe()
144-
} else {
145-
logger?.error("Subscribe failed: \(error)")
146-
}
147-
}
227+
_ = await statusChange.first { @Sendable in $0 == .subscribed }
148228
}
149229

150230
public func unsubscribe() async {
@@ -183,13 +263,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
183263
@MainActor
184264
public func broadcast(event: String, message: JSONObject) async {
185265
if status != .subscribed {
186-
struct Message: Encodable {
187-
let topic: String
188-
let event: String
189-
let payload: JSONObject
190-
let `private`: Bool
191-
}
192-
193266
var headers: HTTPFields = [.contentType: "application/json"]
194267
if let apiKey = socket.options.apikey {
195268
headers[.apiKey] = apiKey
@@ -198,23 +271,34 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
198271
headers[.authorization] = "Bearer \(accessToken)"
199272
}
200273

274+
struct BroadcastMessagePayload: Encodable {
275+
let messages: [Message]
276+
277+
struct Message: Encodable {
278+
let topic: String
279+
let event: String
280+
let payload: JSONObject
281+
let `private`: Bool
282+
}
283+
}
284+
201285
let task = Task { [headers] in
202286
_ = try? await socket.http.send(
203287
HTTPRequest(
204288
url: socket.broadcastURL,
205289
method: .post,
206290
headers: headers,
207291
body: JSONEncoder().encode(
208-
[
209-
"messages": [
210-
Message(
292+
BroadcastMessagePayload(
293+
messages: [
294+
BroadcastMessagePayload.Message(
211295
topic: topic,
212296
event: event,
213297
payload: message,
214298
private: config.isPrivate
215299
)
216300
]
217-
]
301+
)
218302
)
219303
)
220304
)

Sources/Realtime/RealtimeError.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,10 @@ struct RealtimeError: LocalizedError {
1414
self.errorDescription = errorDescription
1515
}
1616
}
17+
18+
extension RealtimeError {
19+
/// The maximum retry attempts reached.
20+
static var maxRetryAttemptsReached: Self {
21+
Self("Maximum retry attempts reached.")
22+
}
23+
}

Sources/Realtime/Types.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public struct RealtimeClientOptions: Sendable {
2020
var timeoutInterval: TimeInterval
2121
var disconnectOnSessionLoss: Bool
2222
var connectOnSubscribe: Bool
23+
var maxRetryAttempts: Int
2324

2425
/// Sets the log level for Realtime
2526
var logLevel: LogLevel?
@@ -32,6 +33,7 @@ public struct RealtimeClientOptions: Sendable {
3233
public static let defaultTimeoutInterval: TimeInterval = 10
3334
public static let defaultDisconnectOnSessionLoss = true
3435
public static let defaultConnectOnSubscribe: Bool = true
36+
public static let defaultMaxRetryAttempts: Int = 5
3537

3638
public init(
3739
headers: [String: String] = [:],
@@ -40,6 +42,7 @@ public struct RealtimeClientOptions: Sendable {
4042
timeoutInterval: TimeInterval = Self.defaultTimeoutInterval,
4143
disconnectOnSessionLoss: Bool = Self.defaultDisconnectOnSessionLoss,
4244
connectOnSubscribe: Bool = Self.defaultConnectOnSubscribe,
45+
maxRetryAttempts: Int = Self.defaultMaxRetryAttempts,
4346
logLevel: LogLevel? = nil,
4447
fetch: (@Sendable (_ request: URLRequest) async throws -> (Data, URLResponse))? = nil,
4548
accessToken: (@Sendable () async throws -> String?)? = nil,
@@ -51,6 +54,7 @@ public struct RealtimeClientOptions: Sendable {
5154
self.timeoutInterval = timeoutInterval
5255
self.disconnectOnSessionLoss = disconnectOnSessionLoss
5356
self.connectOnSubscribe = connectOnSubscribe
57+
self.maxRetryAttempts = maxRetryAttempts
5458
self.logLevel = logLevel
5559
self.fetch = fetch
5660
self.accessToken = accessToken

Tests/IntegrationTests/RealtimeIntegrationTests.swift

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,11 @@ struct TestLogger: SupabaseLogger {
7070

7171
await Task.yield()
7272

73-
await channel.subscribe()
73+
do {
74+
try await channel.subscribeWithError()
75+
} catch {
76+
XCTFail("Expected .subscribed but got error: \(error)")
77+
}
7478

7579
struct Message: Codable {
7680
var value: Int
@@ -141,7 +145,11 @@ struct TestLogger: SupabaseLogger {
141145

142146
await Task.yield()
143147

144-
await channel.subscribe()
148+
do {
149+
try await channel.subscribeWithError()
150+
} catch {
151+
XCTFail("Expected .subscribed but got error: \(error)")
152+
}
145153

146154
struct UserState: Codable, Equatable {
147155
let email: String
@@ -201,7 +209,11 @@ struct TestLogger: SupabaseLogger {
201209
}
202210

203211
await Task.yield()
204-
await channel.subscribe()
212+
do {
213+
try await channel.subscribeWithError()
214+
} catch {
215+
XCTFail("Expected .subscribed but got error: \(error)")
216+
}
205217

206218
struct Entry: Codable, Equatable {
207219
let key: String

Tests/RealtimeTests/RealtimeChannelTests.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ final class RealtimeChannelTests: XCTestCase {
161161
XCTAssertTrue(channel.callbackManager.callbacks.contains(where: { $0.isPresence }))
162162

163163
// Start subscription process
164-
Task {
164+
let subscribeTask = Task {
165165
await channel.subscribe()
166166
}
167167

@@ -191,5 +191,8 @@ final class RealtimeChannelTests: XCTestCase {
191191
presenceSubscription.cancel()
192192
await channel.unsubscribe()
193193
socket.disconnect()
194+
195+
// Note: We don't assert the subscribe status here because the test doesn't wait for completion
196+
// The subscription is still in progress when we clean up
194197
}
195198
}

0 commit comments

Comments
 (0)