Skip to content

Commit 7ff51e3

Browse files
authored
Merge pull request #1042 from supabase-community/remove-atomicfu
Split callbacks in the CallbackManager
2 parents e082018 + 90fa3e6 commit 7ff51e3

File tree

5 files changed

+97
-48
lines changed

5 files changed

+97
-48
lines changed

Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/CallbackManager.kt

Lines changed: 89 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,88 +2,147 @@ package io.github.jan.supabase.realtime
22

33
import io.github.jan.supabase.SupabaseSerializer
44
import io.github.jan.supabase.annotations.SupabaseInternal
5-
import io.github.jan.supabase.collections.AtomicMutableList
65
import io.github.jan.supabase.serializer.KotlinXSerializer
6+
import kotlinx.collections.immutable.PersistentList
7+
import kotlinx.collections.immutable.PersistentMap
8+
import kotlinx.collections.immutable.persistentHashMapOf
9+
import kotlinx.collections.immutable.persistentListOf
10+
import kotlinx.collections.immutable.plus
711
import kotlinx.serialization.json.JsonObject
812
import kotlin.concurrent.atomics.AtomicInt
913
import kotlin.concurrent.atomics.AtomicReference
1014
import kotlin.concurrent.atomics.fetchAndIncrement
15+
import kotlin.concurrent.atomics.update
1116

1217
@SupabaseInternal
13-
sealed interface CallbackManager {
18+
sealed class RealtimeCallbackId(val value: Int) {
19+
20+
class Postgres(value: Int) : RealtimeCallbackId(value)
21+
22+
class Presence(value: Int) : RealtimeCallbackId(value)
23+
24+
class Broadcast(value: Int) : RealtimeCallbackId(value)
25+
26+
}
27+
28+
@SupabaseInternal
29+
interface CallbackManager {
1430

1531
fun triggerPostgresChange(ids: List<Int>, data: PostgresAction)
1632

1733
fun triggerBroadcast(event: String, data: JsonObject)
1834

1935
fun triggerPresenceDiff(joins: Map<String, Presence>, leaves: Map<String, Presence>)
2036

21-
fun addBroadcastCallback(event: String, callback: (JsonObject) -> Unit): Int
37+
fun hasPresenceCallback(): Boolean
2238

23-
fun addPostgresCallback(filter: PostgresJoinConfig, callback: (PostgresAction) -> Unit): Int
39+
fun addBroadcastCallback(event: String, callback: (JsonObject) -> Unit): RealtimeCallbackId.Broadcast
2440

25-
fun addPresenceCallback(callback: (PresenceAction) -> Unit): Int
41+
fun addPostgresCallback(filter: PostgresJoinConfig, callback: (PostgresAction) -> Unit): RealtimeCallbackId.Postgres
2642

27-
fun removeCallbackById(id: Int)
43+
fun addPresenceCallback(callback: (PresenceAction) -> Unit): RealtimeCallbackId.Presence
2844

29-
fun setServerChanges(changes: List<PostgresJoinConfig>)
45+
fun removeCallbackById(id: RealtimeCallbackId)
3046

31-
fun getCallbacks(): List<RealtimeCallback<*>>
47+
fun setServerChanges(changes: List<PostgresJoinConfig>)
3248

3349
}
3450

51+
private typealias BroadcastMap = PersistentMap<String, PersistentList<RealtimeCallback.BroadcastCallback>>
52+
private typealias PresenceMap = PersistentMap<Int, RealtimeCallback.PresenceCallback>
53+
private typealias PostgresMap = PersistentMap<Int, RealtimeCallback.PostgresCallback>
54+
3555
internal class CallbackManagerImpl(
3656
private val serializer: SupabaseSerializer = KotlinXSerializer()
3757
) : CallbackManager {
3858

3959
private val nextId = AtomicInt(0)
4060
private val _serverChanges = AtomicReference(listOf<PostgresJoinConfig>())
4161
val serverChanges: List<PostgresJoinConfig> get() = _serverChanges.load()
42-
private val callbacks = AtomicMutableList<RealtimeCallback<*>>()
4362

44-
override fun getCallbacks(): List<RealtimeCallback<*>> {
45-
return callbacks.toList()
46-
}
63+
private val presenceCallbacks = AtomicReference<PresenceMap>(persistentHashMapOf())
4764

48-
override fun addBroadcastCallback(event: String, callback: (JsonObject) -> Unit): Int {
65+
private val broadcastCallbacks = AtomicReference<BroadcastMap>(persistentHashMapOf())
66+
// Additional map to know from which list a callback may be removed in broadcastCallbacks without searching through the whole map
67+
private val broadcastEventId = AtomicReference<PersistentMap<Int, String>>(persistentHashMapOf())
68+
69+
private val postgresCallbacks = AtomicReference<PostgresMap>(persistentHashMapOf())
70+
71+
override fun addBroadcastCallback(event: String, callback: (JsonObject) -> Unit): RealtimeCallbackId.Broadcast {
4972
val id = nextId.fetchAndIncrement()
50-
callbacks += RealtimeCallback.BroadcastCallback(callback, event, id)
51-
return id
73+
broadcastCallbacks.update {
74+
val current = it[event] ?: persistentListOf()
75+
it.put(event, current + RealtimeCallback.BroadcastCallback(callback, event, id))
76+
}
77+
broadcastEventId.update {
78+
it.put(id, event)
79+
}
80+
return RealtimeCallbackId.Broadcast(id)
5281
}
5382

54-
override fun addPostgresCallback(filter: PostgresJoinConfig, callback: (PostgresAction) -> Unit): Int {
83+
override fun addPostgresCallback(filter: PostgresJoinConfig, callback: (PostgresAction) -> Unit): RealtimeCallbackId.Postgres {
5584
val id = nextId.fetchAndIncrement()
56-
callbacks += RealtimeCallback.PostgresCallback(callback, filter, id)
57-
return id
85+
postgresCallbacks.update {
86+
it.put(id, RealtimeCallback.PostgresCallback(callback, filter, id))
87+
}
88+
return RealtimeCallbackId.Postgres(id)
5889
}
5990

6091
override fun triggerPostgresChange(ids: List<Int>, data: PostgresAction) {
6192
val filter = serverChanges.filter { it.id in ids }
62-
val postgresCallbacks = callbacks.filterIsInstance<RealtimeCallback.PostgresCallback>()
6393
val callbacks =
64-
postgresCallbacks.filter { cc -> filter.any { sc -> cc.filter == sc } }
94+
postgresCallbacks.load().values.filter { cc -> filter.any { sc -> cc.filter == sc } }
6595
callbacks.forEach { it.callback(data) }
6696
}
6797

6898
override fun triggerBroadcast(event: String, data: JsonObject) {
69-
val broadcastCallbacks = callbacks.filterIsInstance<RealtimeCallback.BroadcastCallback>()
70-
val callbacks = broadcastCallbacks.filter { it.event == event }
71-
callbacks.forEach { it.callback(data) }
99+
broadcastCallbacks.load()[event]?.forEach { it.callback(data) }
72100
}
73101

74102
override fun triggerPresenceDiff(joins: Map<String, Presence>, leaves: Map<String, Presence>) {
75-
val presenceCallbacks = callbacks.filterIsInstance<RealtimeCallback.PresenceCallback>()
76-
presenceCallbacks.forEach { it.callback(PresenceActionImpl(serializer, joins, leaves)) }
103+
presenceCallbacks.load().values.forEach { it.callback(PresenceActionImpl(serializer, joins, leaves)) }
77104
}
78105

79-
override fun addPresenceCallback(callback: (PresenceAction) -> Unit): Int {
106+
override fun hasPresenceCallback(): Boolean {
107+
return presenceCallbacks.load().isNotEmpty()
108+
}
109+
110+
override fun addPresenceCallback(callback: (PresenceAction) -> Unit): RealtimeCallbackId.Presence {
80111
val id = nextId.fetchAndIncrement()
81-
callbacks += RealtimeCallback.PresenceCallback(callback, id)
82-
return id
112+
presenceCallbacks.update {
113+
it.put(id, RealtimeCallback.PresenceCallback(callback, id))
114+
}
115+
return RealtimeCallbackId.Presence(id)
116+
}
117+
118+
fun removeBroadcastCallbackById(id: Int) {
119+
val event = broadcastEventId.load()[id] ?: return
120+
broadcastCallbacks.update {
121+
it.put(event, it[event]?.removeAll { c -> c.id == id } ?: persistentListOf())
122+
}
123+
broadcastEventId.update {
124+
it.remove(id)
125+
}
126+
}
127+
128+
fun removePresenceCallbackById(id: Int) {
129+
presenceCallbacks.update {
130+
it.remove(id)
131+
}
132+
}
133+
134+
fun removePostgresCallbackById(id: Int) {
135+
postgresCallbacks.update {
136+
it.remove(id)
137+
}
83138
}
84139

85-
override fun removeCallbackById(id: Int) {
86-
callbacks.indexOfFirst { it.id == id }.takeIf { it != -1 }?.let { callbacks.removeAt(it) }
140+
override fun removeCallbackById(id: RealtimeCallbackId) {
141+
when (id) {
142+
is RealtimeCallbackId.Broadcast -> removeBroadcastCallbackById(id.value)
143+
is RealtimeCallbackId.Presence -> removePresenceCallbackById(id.value)
144+
is RealtimeCallbackId.Postgres -> removePostgresCallbackById(id.value)
145+
}
87146
}
88147

89148
override fun setServerChanges(changes: List<PostgresJoinConfig>) {

Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/PresenceAction.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ sealed interface PresenceAction {
9696

9797
}
9898

99-
@PublishedApi internal class PresenceActionImpl(
99+
@PublishedApi
100+
internal class PresenceActionImpl(
100101
@PublishedApi internal val serializer: SupabaseSerializer,
101102
override val joins: Map<String, Presence>,
102103
override val leaves: Map<String, Presence>

Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelImpl.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ internal class RealtimeChannelImpl(
6464
Realtime.logger.d { "Subscribing to channel $topic" }
6565
val currentJwt = accessToken()
6666
val postgrestChanges = clientChanges.toList()
67-
val hasPresenceCallback = callbackManager.getCallbacks().filterIsInstance<RealtimeCallback.PresenceCallback>().isNotEmpty()
67+
val hasPresenceCallback = callbackManager.hasPresenceCallback()
6868
presenceJoinConfig.enabled = hasPresenceCallback
6969
val joinConfig = RealtimeJoinPayload(RealtimeJoinConfig(broadcastJoinConfig, presenceJoinConfig, postgrestChanges, isPrivate))
7070
val joinConfigObject = buildJsonObject {

Realtime/src/commonTest/kotlin/CallbackManagerTest.kt

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import io.github.jan.supabase.realtime.HasRecord
44
import io.github.jan.supabase.realtime.PostgresAction
55
import io.github.jan.supabase.realtime.PostgresJoinConfig
66
import io.github.jan.supabase.realtime.Presence
7-
import io.github.jan.supabase.realtime.RealtimeCallback
87
import io.github.jan.supabase.serializer.KotlinXSerializer
98
import kotlinx.serialization.json.JsonObject
109
import kotlinx.serialization.json.buildJsonObject
@@ -37,18 +36,6 @@ class CallbackManagerTest {
3736
assertFalse { called }
3837
}
3938

40-
@Test
41-
fun testGetCallbacks() {
42-
val cm = CallbackManagerImpl()
43-
val expectedEvent = "event"
44-
cm.addBroadcastCallback(expectedEvent) {
45-
//...
46-
}
47-
val callbacks = cm.getCallbacks()
48-
assertTrue { callbacks.isNotEmpty() }
49-
assertTrue { callbacks.any { it is RealtimeCallback.BroadcastCallback && it.event == expectedEvent } }
50-
}
51-
5239
@Test
5340
fun testPresenceCallbacks() {
5441
val cm = CallbackManagerImpl()
@@ -64,10 +51,12 @@ class CallbackManagerTest {
6451
assertEquals(expectedLeaves, it.leaves)
6552
called = true
6653
}
54+
assertTrue { cm.hasPresenceCallback() }
6755
cm.triggerPresenceDiff(expectedJoins, expectedLeaves)
6856
assertTrue { called }
6957
cm.removeCallbackById(id)
7058
called = false
59+
assertFalse { cm.hasPresenceCallback() }
7160
cm.triggerPresenceDiff(expectedJoins, expectedLeaves)
7261
assertFalse { called }
7362
}
@@ -96,15 +85,15 @@ class CallbackManagerTest {
9685
called = true
9786
}
9887
cm.setServerChanges(listOf(joinConfig))
99-
cm.triggerPostgresChange(listOf(id), actionFromEvent(event, expectedRecord, expectedOldRecord))
88+
cm.triggerPostgresChange(listOf(id.value), actionFromEvent(event, expectedRecord, expectedOldRecord))
10089
assertTrue { called }
10190
called = false
10291
if(event != "*") {
10392
cm.triggerPostgresChange(listOf(2), actionFromEvent(events.filter { it != event && it != "*" }.random(), expectedRecord, expectedOldRecord))
10493
assertFalse { called }
10594
}
10695
cm.removeCallbackById(id)
107-
cm.triggerPostgresChange(listOf(id), actionFromEvent(event, expectedRecord, expectedOldRecord))
96+
cm.triggerPostgresChange(listOf(id.value), actionFromEvent(event, expectedRecord, expectedOldRecord))
10897
assertFalse { called }
10998
}
11099
}

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[versions]
2-
kotlin = "2.2.20-RC2"
2+
kotlin = "2.2.20"
33
accompanist-permissions = "0.37.3"
44
ktor = "3.2.3"
55
dokka = "2.0.0"

0 commit comments

Comments
 (0)