Skip to content
141 changes: 119 additions & 22 deletions lib/model/message.dart
Original file line number Diff line number Diff line change
Expand Up @@ -400,29 +400,88 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage

void reconcileMessages(List<Message> messages) {
assert(!_disposed);
// What to do when some of the just-fetched messages are already known?
// This is common and normal: in particular it happens when one message list
// overlaps another, e.g. a stream and a topic within it.
//
// Most often, the just-fetched message will look just like the one we
// already have. But they can differ: message fetching happens out of band
// from the event queue, so there's inherently a race.
//
// If the fetched message reflects changes we haven't yet heard from the
// event queue, then it doesn't much matter which version we use: we'll
// soon get the corresponding events and apply the changes anyway.
// But if it lacks changes we've already heard from the event queue, then
// we won't hear those events again; the only way to wind up with an
// updated message is to use the version we have, that already reflects
// those events' changes. So we always stick with the version we have.
for (int i = 0; i < messages.length; i++) {
final message = messages[i];
messages[i] = this.messages.putIfAbsent(message.id, () {
message.matchContent = null;
message.matchTopic = null;
return message;
});

messages[i] = this.messages.update(message.id,
ifAbsent: () => _reconcileUnrecognizedMessage(message),
(current) => _reconcileRecognizedMessage(current, message));
}
}

Message _reconcileUnrecognizedMessage(Message incoming) {
if (
incoming is StreamMessage
&& subscriptions[incoming.streamId] == null
) {
// The message is in an unsubscribed channel. It might grow stale;
// add it to _maybeStaleChannelMessages.
_maybeStaleChannelMessages.add(incoming.id);
}
return _stripMatchFields(incoming);
}

Message _reconcileRecognizedMessage(Message current, Message incoming) {
// This message is one we already know about. This is common and normal:
// in particular it happens when one message list overlaps another,
// e.g. a stream and a topic within it.
//
// Most often, the just-fetched message will look just like the one we
// already have. But not always, and we can choose intelligently whether
// to keep the stored version or clobber it with the incoming one.

bool currentIsMaybeStale = false;
if (incoming is StreamMessage) {
if (subscriptions[incoming.streamId] != null) {
// The incoming version won't grow stale; it's in a subscribed channel.
// Remove it from _maybeStaleChannelMessages if it was there.
currentIsMaybeStale = _maybeStaleChannelMessages.remove(incoming.id);
} else {
assert(_maybeStaleChannelMessages.contains(incoming.id));
currentIsMaybeStale = true;
}
}

if (currentIsMaybeStale) {
// The event queue is unreliable for this message; the message was in an
// unsubscribed channel when we stored it or sometime since, so the stored
// version might be stale. Refresh it with the fetched version.
return _stripMatchFields(incoming);
} else {
// Message fetching happens out of band from the event queue, so there's
// inherently a race.
//
// If the fetched message reflects changes we haven't yet heard from the
// event queue, then it doesn't much matter which version we use: we'll
// soon get the corresponding events and apply the changes anyway.
// But if it lacks changes we've already heard from the event queue, then
// we won't hear those events again; the only way to wind up with an
// updated message is to use the version we have, that already reflects
// those events' changes. So, stick with the version we have.
return current;
}
}

/// Messages in [messages] whose data stream is or was presumably broken
/// by the message being in an unsubscribed channel.
///
/// This is the subset of [messages] where the message was
/// in an unsubscribed channel when we added it or sometime since.
///
/// We don't expect update events for messages in unsubscribed channels,
/// so if some of these maybe-stale messages appear in a fetch,
/// we'll always clobber our stored version with the fetched version.
/// See [reconcileMessages].
///
/// (We have seen a few such events, actually --
/// maybe because the channel only recently became unsubscribed? --
/// but not consistently, and we're not supposed to rely on them.)
final Set<int> _maybeStaleChannelMessages = {};

Message _stripMatchFields(Message message) {
message.matchContent = null;
message.matchTopic = null;
return message;
}
Comment on lines +481 to 485
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The factoring out of this step could happen in a prep commit. That would simplify the complexity of the main commit a bit — it'd be moving around where an expression like _stripMatchFields(incoming) appears, and adding a case where something else gets used instead, but the expression itself would remain the same.

(Or maybe the main commit would change from "message" to "incoming", but anyway that's a rather smaller change.)


@override
Expand Down Expand Up @@ -489,6 +548,29 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
);
}

void handleChannelDeleteEvent(ChannelDeleteEvent event) {
final channelIds = event.streams.map((channel) => channel.streamId);
_handleSubscriptionsRemoved(channelIds);
}

void handleSubscriptionRemoveEvent(SubscriptionRemoveEvent event) {
_handleSubscriptionsRemoved(event.streamIds);
}

void _handleSubscriptionsRemoved(Iterable<int> channelIds) {
if (channelIds.length > 1) {
assert(channelIds is! Set);
channelIds = Set.from(channelIds); // optimization
}

// Linear in [messages].
final affectedKnownMessageIds = messages.values
.where((message) => message is StreamMessage && channelIds.contains(message.streamId))
.map((message) => message.id);

_maybeStaleChannelMessages.addAll(affectedKnownMessageIds);
}

void handleUserTopicEvent(UserTopicEvent event) {
for (final view in _messageListViews) {
view.handleUserTopicEvent(event);
Expand All @@ -502,10 +584,18 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
}

void handleMessageEvent(MessageEvent event) {
final message = event.message;

// If the message is one we already know about (from a fetch),
// clobber it with the one from the event system.
// See [fetchedMessages] for reasoning.
messages[event.message.id] = event.message;
// See [reconcileMessages] for reasoning.
messages[event.message.id] = message;

if (message is StreamMessage && subscriptions[message.streamId] == null) {
// We didn't expect this event, because the channel is unsubscribed. But
// that doesn't mean we should expect future events about this message.
_maybeStaleChannelMessages.add(message.id);
}

_handleMessageEventOutbox(event);

Expand Down Expand Up @@ -594,6 +684,12 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
// See [StreamConversation.displayRecipient] on why the invalidation is
// needed.
message.conversation.displayRecipient = null;

if (subscriptions[newStreamId] == null) {
// The message was moved into an unsubscribed channel, which means
// we expect our data on it to get stale.
_maybeStaleChannelMessages.add(messageId);
}
}

if (newTopic != origTopic) {
Expand All @@ -616,6 +712,7 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
void handleDeleteMessageEvent(DeleteMessageEvent event) {
for (final messageId in event.messageIds) {
messages.remove(messageId);
_maybeStaleChannelMessages.remove(messageId);
_editMessageRequests.remove(messageId);
}
for (final view in _messageListViews) {
Expand Down
6 changes: 6 additions & 0 deletions lib/model/store.dart
Original file line number Diff line number Diff line change
Expand Up @@ -821,11 +821,17 @@ class PerAccountStore extends PerAccountStoreBase with

case ChannelEvent():
assert(debugLog("server event: stream/${event.op}"));
if (event is ChannelDeleteEvent) {
_messages.handleChannelDeleteEvent(event);
}
_channels.handleChannelEvent(event);
notifyListeners();

case SubscriptionEvent():
assert(debugLog("server event: subscription/${event.op}"));
if (event is SubscriptionRemoveEvent) {
_messages.handleSubscriptionRemoveEvent(event);
}
_channels.handleSubscriptionEvent(event);
notifyListeners();

Expand Down
Loading