Skip to content

Commit 5927e4c

Browse files
authored
Lift restriction of having uniqueness of subscription registration (#1720)
* Lift restriction of having uniqueness of subscription registration Remove checking if the subscription with the same arguments was already registered. Address some synchronization concerns. Closes #1706 * Fix tests
1 parent fc37c15 commit 5927e4c

File tree

2 files changed

+151
-86
lines changed

2 files changed

+151
-86
lines changed

apollo-runtime/src/main/java/com/apollographql/apollo/internal/subscription/RealSubscriptionManager.java

Lines changed: 140 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@
1515
import com.apollographql.apollo.subscription.SubscriptionTransport;
1616
import org.jetbrains.annotations.NotNull;
1717

18-
import java.util.ArrayList;
1918
import java.util.Collection;
2019
import java.util.Collections;
2120
import java.util.LinkedHashMap;
2221
import java.util.List;
2322
import java.util.Map;
2423
import java.util.Timer;
2524
import java.util.TimerTask;
25+
import java.util.UUID;
2626
import java.util.concurrent.CopyOnWriteArrayList;
2727
import java.util.concurrent.Executor;
2828
import java.util.concurrent.TimeUnit;
@@ -37,7 +37,7 @@ public final class RealSubscriptionManager implements SubscriptionManager {
3737
static final long CONNECTION_ACKNOWLEDGE_TIMEOUT = TimeUnit.SECONDS.toMillis(5);
3838
static final long INACTIVITY_TIMEOUT = TimeUnit.SECONDS.toMillis(10);
3939

40-
Map<String, SubscriptionRecord> subscriptions = new LinkedHashMap<>();
40+
Map<UUID, SubscriptionRecord> subscriptions = new LinkedHashMap<>();
4141
volatile SubscriptionManagerState state = SubscriptionManagerState.DISCONNECTED;
4242
final AutoReleaseTimer timer = new AutoReleaseTimer();
4343

@@ -82,8 +82,7 @@ public RealSubscriptionManager(@NotNull ScalarTypeAdapters scalarTypeAdapters,
8282
}
8383

8484
@Override
85-
public <T> void subscribe(@NotNull final Subscription<?, T, ?> subscription,
86-
@NotNull final SubscriptionManager.Callback<T> callback) {
85+
public <T> void subscribe(@NotNull final Subscription<?, T, ?> subscription, @NotNull final SubscriptionManager.Callback<T> callback) {
8786
checkNotNull(subscription, "subscription == null");
8887
checkNotNull(callback, "callback == null");
8988
dispatcher.execute(new Runnable() {
@@ -111,11 +110,15 @@ public void run() {
111110
*/
112111
@Override
113112
public void start() {
113+
final SubscriptionManagerState oldState;
114114
synchronized (this) {
115+
oldState = state;
115116
if (state == SubscriptionManagerState.STOPPED) {
116-
setStateAndNotify(SubscriptionManagerState.DISCONNECTED);
117+
state = SubscriptionManagerState.DISCONNECTED;
117118
}
118119
}
120+
121+
notifyStateChanged(oldState, state);
119122
}
120123

121124
/**
@@ -127,14 +130,11 @@ public void start() {
127130
*/
128131
@Override
129132
public void stop() {
130-
synchronized (this) {
131-
setStateAndNotify(SubscriptionManagerState.STOPPING);
132-
ArrayList<SubscriptionRecord> values = new ArrayList<>(subscriptions.values());
133-
for (SubscriptionRecord subscription : values) {
134-
doUnsubscribe(subscription.subscription);
133+
dispatcher.execute(new Runnable() {
134+
@Override public void run() {
135+
doStop();
135136
}
136-
disconnect(true);
137-
}
137+
});
138138
}
139139

140140
@Override public SubscriptionManagerState getState() {
@@ -150,67 +150,112 @@ public void stop() {
150150
}
151151

152152
void doSubscribe(Subscription subscription, SubscriptionManager.Callback callback) {
153-
if (state == SubscriptionManagerState.STOPPING || state == SubscriptionManagerState.STOPPED) {
153+
final SubscriptionManagerState oldState;
154+
synchronized (this) {
155+
oldState = state;
156+
157+
if (state != SubscriptionManagerState.STOPPING && state != SubscriptionManagerState.STOPPED) {
158+
timer.cancelTask(INACTIVITY_TIMEOUT_TIMER_TASK_ID);
159+
160+
final UUID subscriptionId = UUID.randomUUID();
161+
162+
subscriptions.put(subscriptionId, new SubscriptionRecord(subscriptionId, subscription, callback));
163+
if (state == SubscriptionManagerState.DISCONNECTED) {
164+
state = SubscriptionManagerState.CONNECTING;
165+
transport.connect();
166+
} else if (state == SubscriptionManagerState.ACTIVE) {
167+
transport.send(new OperationClientMessage.Start(subscriptionId.toString(), subscription, scalarTypeAdapters));
168+
}
169+
}
170+
}
171+
172+
if (oldState == SubscriptionManagerState.STOPPING || oldState == SubscriptionManagerState.STOPPED) {
154173
callback.onError(new ApolloSubscriptionException(
155174
"Illegal state: " + state.name() + " for subscriptions to be created."
156175
+ " SubscriptionManager.start() must be called to re-enable subscriptions."));
157-
return;
176+
} else if (oldState == SubscriptionManagerState.CONNECTED) {
177+
callback.onConnected();
158178
}
159-
timer.cancelTask(INACTIVITY_TIMEOUT_TIMER_TASK_ID);
160179

161-
String subscriptionId = idForSubscription(subscription);
180+
notifyStateChanged(oldState, state);
181+
}
182+
183+
void doUnsubscribe(Subscription subscription) {
162184
synchronized (this) {
163-
if (subscriptions.containsKey(subscriptionId)) {
164-
callback.onError(new ApolloSubscriptionException("Already subscribed"));
165-
return;
185+
SubscriptionRecord subscriptionRecord = null;
186+
for (SubscriptionRecord record : subscriptions.values()) {
187+
if (record.subscription == subscription) {
188+
subscriptionRecord = record;
189+
}
166190
}
167191

168-
subscriptions.put(subscriptionId, new SubscriptionRecord(subscription, callback));
169-
if (state == SubscriptionManagerState.DISCONNECTED) {
170-
setStateAndNotify(SubscriptionManagerState.CONNECTING);
171-
transport.connect();
172-
} else if (state == SubscriptionManagerState.ACTIVE) {
173-
transport.send(new OperationClientMessage.Start(subscriptionId, subscription, scalarTypeAdapters));
192+
if (subscriptionRecord != null) {
193+
subscriptions.remove(subscriptionRecord.id);
194+
if (state == SubscriptionManagerState.ACTIVE || state == SubscriptionManagerState.STOPPING) {
195+
transport.send(new OperationClientMessage.Stop(subscriptionRecord.id.toString()));
196+
}
197+
}
198+
199+
if (subscriptions.isEmpty() && state != SubscriptionManagerState.STOPPING) {
200+
startInactivityTimer();
174201
}
175202
}
176203
}
177204

178-
void doUnsubscribe(Subscription subscription) {
179-
String subscriptionId = idForSubscription(subscription);
180-
181-
SubscriptionRecord subscriptionRecord;
205+
void doStop() {
206+
final Collection<SubscriptionRecord> subscriptionRecords;
207+
final SubscriptionManagerState oldState;
182208
synchronized (this) {
183-
subscriptionRecord = subscriptions.remove(subscriptionId);
184-
if ((subscriptionRecord != null) && (state == SubscriptionManagerState.ACTIVE || state == SubscriptionManagerState.STOPPING)) {
185-
transport.send(new OperationClientMessage.Stop(subscriptionId));
186-
}
209+
oldState = state;
210+
state = SubscriptionManagerState.STOPPING;
187211

188-
if (subscriptions.isEmpty() && state != SubscriptionManagerState.STOPPING) {
189-
startInactivityTimer();
212+
subscriptionRecords = subscriptions.values();
213+
214+
if (oldState == SubscriptionManagerState.ACTIVE) {
215+
for (SubscriptionRecord subscriptionRecord : subscriptionRecords) {
216+
transport.send(new OperationClientMessage.Stop(subscriptionRecord.id.toString()));
217+
}
190218
}
219+
220+
state = SubscriptionManagerState.STOPPED;
221+
222+
transport.disconnect(new OperationClientMessage.Terminate());
223+
subscriptions = new LinkedHashMap<>();
191224
}
225+
226+
for (SubscriptionRecord record : subscriptionRecords) {
227+
record.notifyOnCompleted();
228+
}
229+
230+
notifyStateChanged(oldState, SubscriptionManagerState.STOPPING);
231+
notifyStateChanged(SubscriptionManagerState.STOPPING, state);
192232
}
193233

194234
void onTransportConnected() {
195235
final Collection<SubscriptionRecord> subscriptionRecords;
236+
237+
final SubscriptionManagerState oldState;
196238
synchronized (this) {
239+
oldState = state;
240+
197241
if (state == SubscriptionManagerState.CONNECTING) {
198242
subscriptionRecords = subscriptions.values();
199-
setStateAndNotify(SubscriptionManagerState.CONNECTED);
243+
state = SubscriptionManagerState.CONNECTED;
200244
transport.send(new OperationClientMessage.Init(connectionParams.provide()));
201245
} else {
202246
subscriptionRecords = Collections.emptyList();
203247
}
248+
249+
if (state == SubscriptionManagerState.CONNECTED) {
250+
timer.schedule(CONNECTION_ACKNOWLEDGE_TIMEOUT_TIMER_TASK_ID, connectionAcknowledgeTimeoutTimerTask, CONNECTION_ACKNOWLEDGE_TIMEOUT);
251+
}
204252
}
205253

206254
for (SubscriptionRecord record : subscriptionRecords) {
207255
record.callback.onConnected();
208256
}
209257

210-
if (state == SubscriptionManagerState.CONNECTED) {
211-
timer.schedule(CONNECTION_ACKNOWLEDGE_TIMEOUT_TIMER_TASK_ID, connectionAcknowledgeTimeoutTimerTask,
212-
CONNECTION_ACKNOWLEDGE_TIMEOUT);
213-
}
258+
notifyStateChanged(oldState, state);
214259
}
215260

216261
void onConnectionAcknowledgeTimeout() {
@@ -234,12 +279,7 @@ public void run() {
234279
}
235280

236281
void onTransportFailure(Throwable t) {
237-
Collection<SubscriptionRecord> subscriptionRecords;
238-
synchronized (this) {
239-
subscriptionRecords = subscriptions.values();
240-
disconnect(true);
241-
}
242-
282+
Collection<SubscriptionRecord> subscriptionRecords = disconnect(true);
243283
for (SubscriptionRecord record : subscriptionRecords) {
244284
record.notifyOnNetworkError(t);
245285
}
@@ -269,48 +309,62 @@ void onOperationServerMessage(OperationServerMessage message) {
269309
*
270310
* @param force if true, always disconnect web socket, regardless of the status of {@link #subscriptions}
271311
*/
272-
void disconnect(boolean force) {
312+
Collection<SubscriptionRecord> disconnect(boolean force) {
313+
final SubscriptionManagerState oldState;
314+
final Collection<SubscriptionRecord> subscriptionRecords;
273315
synchronized (this) {
316+
oldState = state;
317+
subscriptionRecords = subscriptions.values();
274318
if (force || subscriptions.isEmpty()) {
275319
transport.disconnect(new OperationClientMessage.Terminate());
276-
SubscriptionManagerState disconnectionState = (state == SubscriptionManagerState.STOPPING) ? SubscriptionManagerState.STOPPED
277-
: SubscriptionManagerState.DISCONNECTED;
278-
setStateAndNotify(disconnectionState);
320+
state = (state == SubscriptionManagerState.STOPPING) ? SubscriptionManagerState.STOPPED : SubscriptionManagerState.DISCONNECTED;
279321
subscriptions = new LinkedHashMap<>();
280322
}
281323
}
324+
325+
notifyStateChanged(oldState, state);
326+
327+
return subscriptionRecords;
282328
}
283329

284330
void onConnectionHeartbeatTimeout() {
331+
final SubscriptionManagerState oldState;
285332
synchronized (this) {
333+
oldState = state;
334+
state = SubscriptionManagerState.DISCONNECTED;
286335
transport.disconnect(new OperationClientMessage.Terminate());
287-
setStateAndNotify(SubscriptionManagerState.DISCONNECTED);
288-
289-
setStateAndNotify(SubscriptionManagerState.CONNECTING);
336+
state = SubscriptionManagerState.CONNECTING;
290337
transport.connect();
291338
}
339+
340+
notifyStateChanged(oldState, SubscriptionManagerState.DISCONNECTED);
341+
notifyStateChanged(SubscriptionManagerState.DISCONNECTED, SubscriptionManagerState.CONNECTING);
292342
}
293343

294344
void onConnectionClosed() {
295345
Collection<SubscriptionRecord> subscriptionRecords;
346+
final SubscriptionManagerState oldState;
296347
synchronized (this) {
348+
oldState = state;
349+
297350
subscriptionRecords = subscriptions.values();
298-
setStateAndNotify(SubscriptionManagerState.DISCONNECTED);
351+
state = SubscriptionManagerState.DISCONNECTED;
299352
subscriptions = new LinkedHashMap<>();
300353
}
301354

302355
for (SubscriptionRecord record : subscriptionRecords) {
303356
record.callback.onTerminated();
304357
}
358+
359+
notifyStateChanged(oldState, state);
305360
}
306361

307362
private void resetConnectionKeepAliveTimerTask() {
308363
if (connectionHeartbeatTimeoutMs <= 0) {
309364
return;
310365
}
311366
synchronized (this) {
312-
timer.schedule(CONNECTION_KEEP_ALIVE_TIMEOUT_TIMER_TASK_ID, connectionHeartbeatTimeoutTimerTask,
313-
connectionHeartbeatTimeoutMs);
367+
timer.schedule(CONNECTION_KEEP_ALIVE_TIMEOUT_TIMER_TASK_ID, connectionHeartbeatTimeoutTimerTask, connectionHeartbeatTimeoutMs);
314368
}
315369
}
316370

@@ -323,7 +377,11 @@ private void onOperationDataServerMessage(OperationServerMessage.Data message) {
323377
String subscriptionId = message.id != null ? message.id : "";
324378
SubscriptionRecord subscriptionRecord;
325379
synchronized (this) {
326-
subscriptionRecord = subscriptions.get(subscriptionId);
380+
try {
381+
subscriptionRecord = subscriptions.get(UUID.fromString(subscriptionId));
382+
} catch (IllegalArgumentException e) {
383+
subscriptionRecord = null;
384+
}
327385
}
328386

329387
if (subscriptionRecord != null) {
@@ -347,17 +405,22 @@ private void onOperationDataServerMessage(OperationServerMessage.Data message) {
347405
}
348406

349407
private void onConnectionAcknowledgeServerMessage() {
350-
timer.cancelTask(CONNECTION_ACKNOWLEDGE_TIMEOUT_TIMER_TASK_ID);
408+
final SubscriptionManagerState oldState;
351409
synchronized (this) {
410+
oldState = state;
411+
412+
timer.cancelTask(CONNECTION_ACKNOWLEDGE_TIMEOUT_TIMER_TASK_ID);
413+
352414
if (state == SubscriptionManagerState.CONNECTED) {
353-
setStateAndNotify(SubscriptionManagerState.ACTIVE);
354-
for (Map.Entry<String, SubscriptionRecord> entry : subscriptions.entrySet()) {
355-
String subscriptionId = entry.getKey();
356-
Subscription<?, ?, ?> subscription = entry.getValue().subscription;
357-
transport.send(new OperationClientMessage.Start(subscriptionId, subscription, scalarTypeAdapters));
415+
state = SubscriptionManagerState.ACTIVE;
416+
for (SubscriptionRecord subscriptionRecord : subscriptions.values()) {
417+
transport.send(new OperationClientMessage.Start(subscriptionRecord.id.toString(), subscriptionRecord.subscription,
418+
scalarTypeAdapters));
358419
}
359420
}
360421
}
422+
423+
notifyStateChanged(oldState, state);
361424
}
362425

363426
private void onErrorServerMessage(OperationServerMessage.Error message) {
@@ -379,31 +442,36 @@ private void onCompleteServerMessage(OperationServerMessage.Complete message) {
379442
private SubscriptionRecord removeSubscriptionById(String subscriptionId) {
380443
SubscriptionRecord subscriptionRecord;
381444
synchronized (this) {
382-
subscriptionRecord = subscriptions.remove(subscriptionId);
445+
try {
446+
subscriptionRecord = subscriptions.remove(UUID.fromString(subscriptionId));
447+
} catch (IllegalArgumentException e) {
448+
subscriptionRecord = null;
449+
}
450+
383451
if (subscriptions.isEmpty()) {
384452
startInactivityTimer();
385453
}
386454
}
387455
return subscriptionRecord;
388456
}
389457

390-
private void setStateAndNotify(SubscriptionManagerState newState) {
391-
SubscriptionManagerState oldState = state;
392-
state = newState;
458+
private void notifyStateChanged(SubscriptionManagerState oldState, SubscriptionManagerState newState) {
459+
if (oldState == newState) {
460+
return;
461+
}
462+
393463
for (OnSubscriptionManagerStateChangeListener onStateChangeListener : onStateChangeListeners) {
394464
onStateChangeListener.onStateChange(oldState, newState);
395465
}
396466
}
397467

398-
static String idForSubscription(Subscription<?, ?, ?> subscription) {
399-
return subscription.operationId() + "$" + subscription.variables().valueMap().hashCode();
400-
}
401-
402468
private static class SubscriptionRecord {
469+
final UUID id;
403470
final Subscription<?, ?, ?> subscription;
404471
final SubscriptionManager.Callback<?> callback;
405472

406-
SubscriptionRecord(Subscription<?, ?, ?> subscription, SubscriptionManager.Callback<?> callback) {
473+
SubscriptionRecord(UUID id, Subscription<?, ?, ?> subscription, SubscriptionManager.Callback<?> callback) {
474+
this.id = id;
407475
this.subscription = subscription;
408476
this.callback = callback;
409477
}
@@ -504,7 +572,6 @@ public void run() {
504572

505573
timer.schedule(timerTask, delay);
506574
}
507-
508575
}
509576

510577
void cancelTask(int taskId) {

0 commit comments

Comments
 (0)