From 0534d7c307d58eb5434b48693ad892c2c3366efd Mon Sep 17 00:00:00 2001 From: Zixuan James Li Date: Tue, 28 Jan 2025 16:13:02 -0500 Subject: [PATCH 1/4] recent_senders [nfc]: Extract _groupStreamMessageIdsBySender helper To be reused later for handling moved messages. --- lib/model/recent_senders.dart | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/lib/model/recent_senders.dart b/lib/model/recent_senders.dart index a5c4bca778..4d63d6b1ca 100644 --- a/lib/model/recent_senders.dart +++ b/lib/model/recent_senders.dart @@ -71,18 +71,12 @@ class RecentSenders { void handleDeleteMessageEvent(DeleteMessageEvent event, Map cachedMessages) { if (event.messageType != MessageType.stream) return; - final messagesByUser = >{}; - for (final id in event.messageIds) { - final message = cachedMessages[id] as StreamMessage?; - if (message == null) continue; - (messagesByUser[message.senderId] ??= []).add(id); - } - + final messagesBySender = _groupStreamMessageIdsBySender(event.messageIds, cachedMessages); final DeleteMessageEvent(:streamId!, :topic!) = event; final sendersInStream = streamSenders[streamId]; final topicsInStream = topicSenders[streamId]; final sendersInTopic = topicsInStream?[topic]; - for (final entry in messagesByUser.entries) { + for (final entry in messagesBySender.entries) { final MapEntry(key: senderId, value: messages) = entry; final streamTracker = sendersInStream?[senderId]; @@ -97,6 +91,19 @@ class RecentSenders { if (sendersInTopic?.isEmpty ?? false) topicsInStream?.remove(topic); if (topicsInStream?.isEmpty ?? false) topicSenders.remove(streamId); } + + Map> _groupStreamMessageIdsBySender( + Iterable messageIds, + Map cachedMessages, + ) { + final messagesBySender = >{}; + for (final id in messageIds) { + final message = cachedMessages[id] as StreamMessage?; + if (message == null) continue; + (messagesBySender[message.senderId] ??= QueueList()).add(id); + } + return messagesBySender; + } } @visibleForTesting From dfc0ccc1d2e8549ab47727b53cf530b6fa1fb8e0 Mon Sep 17 00:00:00 2001 From: Zixuan James Li Date: Tue, 18 Mar 2025 16:24:14 -0400 Subject: [PATCH 2/4] recent_senders [nfc]: Use binarySearch when applicable This achieves the same effect that we used lowerBound for (testing if an element is present in a sorted list), but more concisely. (The other call site of lowerBound is untouched, because we indeed need the returned position for insertion.) --- lib/model/recent_senders.dart | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/lib/model/recent_senders.dart b/lib/model/recent_senders.dart index 4d63d6b1ca..4acc6e86e0 100644 --- a/lib/model/recent_senders.dart +++ b/lib/model/recent_senders.dart @@ -1,4 +1,4 @@ -import 'package:collection/collection.dart'; +import 'package:collection/collection.dart' hide binarySearch; import 'package:flutter/foundation.dart'; import '../api/model/events.dart'; @@ -145,10 +145,7 @@ class MessageIdTracker { } void removeAll(List idsToRemove) { - ids.removeWhere((id) { - final i = lowerBound(idsToRemove, id); - return i < idsToRemove.length && idsToRemove[i] == id; - }); + ids.removeWhere((id) => binarySearch(idsToRemove, id) != -1); } @override From 5d9852ad3c6338e8d0888dee938fdd0dd62e5c9e Mon Sep 17 00:00:00 2001 From: Zixuan James Li Date: Tue, 18 Mar 2025 16:15:33 -0400 Subject: [PATCH 3/4] recent_senders [nfc]: Verify that message ID lists are sorted The data structure and its helpers rely on message IDs being sorted, otherwise its behavior is unpredictable. This will provide useful diagnostic information when that happens. This is NFC because no change is made live, i.e. no additional sorting. --- lib/model/recent_senders.dart | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/model/recent_senders.dart b/lib/model/recent_senders.dart index 4acc6e86e0..46cd6bcfb9 100644 --- a/lib/model/recent_senders.dart +++ b/lib/model/recent_senders.dart @@ -137,6 +137,7 @@ class MessageIdTracker { /// /// [newIds] should be sorted ascending. void addAll(QueueList newIds) { + assert(isSortedWithoutDuplicates(newIds)); if (ids.isEmpty) { ids = newIds; return; @@ -144,7 +145,11 @@ class MessageIdTracker { ids = setUnion(ids, newIds); } + /// Remove message IDs found in [idsToRemove] from the tracker list. + /// + /// [idsToRemove] should be sorted ascending. void removeAll(List idsToRemove) { + assert(isSortedWithoutDuplicates(idsToRemove)); ids.removeWhere((id) => binarySearch(idsToRemove, id) != -1); } From 951dc3052e12a29e78a66fe3f53c4419b9fc5826 Mon Sep 17 00:00:00 2001 From: Zixuan James Li Date: Tue, 18 Mar 2025 16:03:59 -0400 Subject: [PATCH 4/4] recent_senders: Handle moved messages This is similar to how we add support to handling moves for unreads (commit 34e6201f), especially the optimizations to avoid making unnecessary copies of message IDs when the entire conversation is moved (e.g. resolve/unresolve topic). An alternative approach to this is extracting helpers from handleMessages and handleDeleteMessageEvent and combining the two to handle moves, like web does (https://github.com/zulip/zulip/blob/bd04a30b/web/src/recent_senders.ts#L165-L190). Compared to that, creating a dedicated helper (this commit) makes it more straightforward to optimize for our performance needs. (The tests do not have to use PerAccountStore, but this setup makes it a bit more integrated.) Fixes: #901 --- lib/model/recent_senders.dart | 91 +++++++++++++++++ lib/model/store.dart | 1 + test/model/recent_senders_test.dart | 153 ++++++++++++++++++++++++++++ 3 files changed, 245 insertions(+) diff --git a/lib/model/recent_senders.dart b/lib/model/recent_senders.dart index 46cd6bcfb9..a2b07cf9d8 100644 --- a/lib/model/recent_senders.dart +++ b/lib/model/recent_senders.dart @@ -68,6 +68,64 @@ class RecentSenders { [senderId] ??= MessageIdTracker()).add(messageId); } + /// Handles channel/topic updates when messages are moved. + /// + /// [cachedMessages] should just be a map of messages we know about, i.e. + /// [MessageStore.messages]. It doesn't matter whether the same + /// [UpdateMessageEvent] has been handled by the [MessageStore], + /// since only the sender IDs, which do not change, are looked at. + /// + /// This is a no-op if no message move happened. + void handleUpdateMessageEvent(UpdateMessageEvent event, Map cachedMessages) { + if (event.moveData == null) { + return; + } + final UpdateMessageMoveData( + :origStreamId, :newStreamId, :origTopic, :newTopic) = event.moveData!; + + final messagesBySender = _groupStreamMessageIdsBySender(event.messageIds, cachedMessages); + final sendersInStream = streamSenders[origStreamId]; + final topicsInStream = topicSenders[origStreamId]; + final sendersInTopic = topicsInStream?[origTopic]; + for (final MapEntry(key: senderId, value: messages) in messagesBySender.entries) { + // The later `popAll` calls require the message IDs to be sorted in + // ascending order. Only sort as many as we need: the message IDs + // with the same sender, instead of all of them in `event.messageIds`. + // TOOD(server) make this an API guarantee. CZO discussion: + // https://chat.zulip.org/#narrow/channel/412-api-documentation/topic/Make.20message_ids.20from.20message.20update.20event.20sorted/near/2143785 + messages.sort(); + + if (newStreamId != origStreamId) { + final streamTracker = sendersInStream?[senderId]; + // All messages from both `messages` and `streamTracker` are from the + // same sender and the same channel. `messages` contain only messages + // known to `store.messages`; all of them should have made there way + // to the recent senders data structure as well. + assert(messages.every((id) => streamTracker!.ids.contains(id))); + streamTracker?.removeAll(messages); + if (streamTracker?.maxId == null) sendersInStream?.remove(senderId); + if (messages.isNotEmpty) { + ((streamSenders[newStreamId] ??= {}) + [senderId] ??= MessageIdTracker()).addAll(messages); + } + } + + // This does not need a check like the stream trackers one above, + // because the conversation is guaranteed to have moved. This is an + // invariant [UpdateMessageMoveData] offers. + final topicTracker = sendersInTopic?[senderId]; + final movedMessagesInTopicTracker = topicTracker?.popAll(messages); + if (topicTracker?.maxId == null) sendersInTopic?.remove(senderId); + if (movedMessagesInTopicTracker != null) { + (((topicSenders[newStreamId] ??= {})[newTopic] ??= {}) + [senderId] ??= MessageIdTracker()).addAll(movedMessagesInTopicTracker); + } + } + if (sendersInStream?.isEmpty ?? false) streamSenders.remove(origStreamId); + if (sendersInTopic?.isEmpty ?? false) topicsInStream?.remove(origTopic); + if (topicsInStream?.isEmpty ?? false) topicSenders.remove(origStreamId); + } + void handleDeleteMessageEvent(DeleteMessageEvent event, Map cachedMessages) { if (event.messageType != MessageType.stream) return; @@ -153,6 +211,39 @@ class MessageIdTracker { ids.removeWhere((id) => binarySearch(idsToRemove, id) != -1); } + /// Remove message IDs found in [idsToRemove] from the tracker list. + /// + /// Returns the removed message IDs sorted in ascending order, or `null` if + /// nothing is removed. + /// + /// [idsToRemove] should be sorted ascending. + /// + /// Consider using [removeAll] if the returned message IDs are not needed. + // Part of this is adapted from [ListBase.removeWhere]. + QueueList? popAll(List idsToRemove) { + assert(isSortedWithoutDuplicates(idsToRemove)); + final retainedMessageIds = + ids.where((id) => binarySearch(idsToRemove, id) == -1).toList(); + + if (retainedMessageIds.isEmpty) { + // All message IDs in this tracker are removed; this is an optimization + // to clear all ids and return the removed ones without making a new copy. + final result = ids; + ids = QueueList(); + return result; + } + + QueueList? poppedMessageIds; + if (retainedMessageIds.length != ids.length) { + poppedMessageIds = QueueList.from( + ids.where((id) => binarySearch(idsToRemove, id) != -1)); + ids.setRange(0, retainedMessageIds.length, retainedMessageIds); + ids.length = retainedMessageIds.length; + assert(isSortedWithoutDuplicates(poppedMessageIds)); + } + return poppedMessageIds; + } + @override String toString() => ids.toString(); } diff --git a/lib/model/store.dart b/lib/model/store.dart index c3aaa501db..5bab3eba3c 100644 --- a/lib/model/store.dart +++ b/lib/model/store.dart @@ -767,6 +767,7 @@ class PerAccountStore extends ChangeNotifier with EmojiStore, UserStore, Channel case UpdateMessageEvent(): assert(debugLog("server event: update_message ${event.messageId}")); + recentSenders.handleUpdateMessageEvent(event, messages); _messages.handleUpdateMessageEvent(event); unreads.handleUpdateMessageEvent(event); diff --git a/test/model/recent_senders_test.dart b/test/model/recent_senders_test.dart index 602362cbcc..9ddba5cebe 100644 --- a/test/model/recent_senders_test.dart +++ b/test/model/recent_senders_test.dart @@ -2,12 +2,15 @@ import 'package:checks/checks.dart'; import 'package:flutter_test/flutter_test.dart'; import 'package:zulip/api/model/model.dart'; import 'package:zulip/model/recent_senders.dart'; +import 'package:zulip/model/store.dart'; import '../example_data.dart' as eg; +import 'test_store.dart'; /// [messages] should be sorted by [id] ascending. void checkMatchesMessages(RecentSenders model, List messages) { final Map>> messagesByUserInStream = {}; final Map>>> messagesByUserInTopic = {}; + messages.sort((a, b) => a.id - b.id); for (final message in messages) { if (message is! StreamMessage) { throw UnsupportedError('Message of type ${message.runtimeType} is not expected.'); @@ -142,6 +145,156 @@ void main() { }); }); + group('RecentSenders.handleUpdateMessageEvent', () { + late PerAccountStore store; + late RecentSenders model; + + final origChannel = eg.stream(); final newChannel = eg.stream(); + final origTopic = 'origTopic'; final newTopic = 'newTopic'; + final userX = eg.user(); final userY = eg.user(); + + Future prepare(List messages) async { + store = eg.store(); + await store.addMessages(messages); + await store.addStreams([origChannel, newChannel]); + await store.addUsers([userX, userY]); + model = store.recentSenders; + } + + List copyMessagesWith(Iterable messages, { + ZulipStream? newChannel, + String? newTopic, + }) { + assert(newChannel != null || newTopic != null); + return messages.map((message) => StreamMessage.fromJson( + message.toJson() + ..['stream_id'] = newChannel?.streamId ?? message.streamId + // See [StreamMessage.displayRecipient] for why this is needed. + ..['display_recipient'] = newChannel?.name ?? message.displayRecipient! + + ..['subject'] = newTopic ?? message.topic + )).toList(); + } + + test('move a conversation entirely, with additional unknown messages', () async { + final messages = List.generate(10, (i) => eg.streamMessage( + stream: origChannel, topic: origTopic, sender: userX)); + await prepare(messages); + final unknownMessages = List.generate(10, (i) => eg.streamMessage( + stream: origChannel, topic: origTopic, sender: userX)); + checkMatchesMessages(model, messages); + + final messageIdsByUserInTopicBefore = + model.topicSenders[origChannel.streamId]![eg.t(origTopic)]![userX.userId]!.ids; + + await store.handleEvent(eg.updateMessageEventMoveFrom( + origMessages: messages + unknownMessages, + newStreamId: newChannel.streamId)); + checkMatchesMessages(model, copyMessagesWith( + messages, newChannel: newChannel)); + + // Check we avoided creating a new list for the moved message IDs. + check(messageIdsByUserInTopicBefore).identicalTo( + model.topicSenders[newChannel.streamId]![eg.t(origTopic)]![userX.userId]!.ids); + }); + + test('move a conversation exactly', () async { + final messages = List.generate(10, (i) => eg.streamMessage( + stream: origChannel, topic: origTopic, sender: userX)); + await prepare(messages); + + final messageIdsByUserInTopicBefore = + model.topicSenders[origChannel.streamId]![eg.t(origTopic)]![userX.userId]!.ids; + + await store.handleEvent(eg.updateMessageEventMoveFrom( + origMessages: messages, + newStreamId: newChannel.streamId, + newTopicStr: newTopic)); + checkMatchesMessages(model, copyMessagesWith( + messages, newChannel: newChannel, newTopic: newTopic)); + + // Check we avoided creating a new list for the moved message IDs. + check(messageIdsByUserInTopicBefore).identicalTo( + model.topicSenders[newChannel.streamId]![eg.t(newTopic)]![userX.userId]!.ids); + }); + + test('move a conversation partially to a different channel', () async { + final messages = List.generate(10, (i) => eg.streamMessage( + stream: origChannel, topic: origTopic)); + final movedMessages = messages.take(5).toList(); + final otherMessages = messages.skip(5); + await prepare(messages); + + await store.handleEvent(eg.updateMessageEventMoveFrom( + origMessages: movedMessages, + newStreamId: newChannel.streamId)); + checkMatchesMessages(model, [ + ...copyMessagesWith(movedMessages, newChannel: newChannel), + ...otherMessages, + ]); + }); + + test('move a conversation partially to a different topic, within the same channel', () async { + final messages = List.generate(10, (i) => eg.streamMessage( + stream: origChannel, topic: origTopic, sender: userX)); + final movedMessages = messages.take(5).toList(); + final otherMessages = messages.skip(5); + await prepare(messages); + + final messageIdsByUserInStreamBefore = + model.streamSenders[origChannel.streamId]![userX.userId]!.ids; + + await store.handleEvent(eg.updateMessageEventMoveFrom( + origMessages: movedMessages, + newTopicStr: newTopic)); + checkMatchesMessages(model, [ + ...copyMessagesWith(movedMessages, newTopic: newTopic), + ...otherMessages, + ]); + + // Check that we did not touch stream message IDs tracker + // when there wasn't a stream move. + check(messageIdsByUserInStreamBefore).identicalTo( + model.streamSenders[origChannel.streamId]![userX.userId]!.ids); + }); + + test('move a conversation with multiple senders', () async { + final messages = [ + eg.streamMessage(stream: origChannel, topic: origTopic, sender: userX), + eg.streamMessage(stream: origChannel, topic: origTopic, sender: userX), + eg.streamMessage(stream: origChannel, topic: origTopic, sender: userY), + ]; + await prepare(messages); + + await store.handleEvent(eg.updateMessageEventMoveFrom( + origMessages: messages, + newStreamId: newChannel.streamId)); + checkMatchesMessages(model, copyMessagesWith( + messages, newChannel: newChannel)); + }); + + test('move a converstion, but message IDs from the event are not sorted in ascending order', () async { + final messages = List.generate(10, (i) => eg.streamMessage( + id: 100-i, stream: origChannel, topic: origTopic)); + await prepare(messages); + + await store.handleEvent(eg.updateMessageEventMoveFrom( + origMessages: messages, + newStreamId: newChannel.streamId)); + checkMatchesMessages(model, + copyMessagesWith(messages, newChannel: newChannel)); + }); + + test('message edit update without move', () async { + final messages = List.generate(10, (i) => eg.streamMessage( + stream: origChannel, topic: origTopic)); + await prepare(messages); + + await store.handleEvent(eg.updateMessageEditEvent(messages[0])); + checkMatchesMessages(model, messages); + }); + }); + test('RecentSenders.handleDeleteMessageEvent', () { final model = RecentSenders(); final stream = eg.stream();