diff --git a/changelog.d/19206.bugfix b/changelog.d/19206.bugfix new file mode 100644 index 00000000000..9cdfaa2571b --- /dev/null +++ b/changelog.d/19206.bugfix @@ -0,0 +1 @@ +Fix sliding sync performance slow down for long lived connections. diff --git a/scripts-dev/check_schema_delta.py b/scripts-dev/check_schema_delta.py index dd96c904bb3..7be6f3730ff 100755 --- a/scripts-dev/check_schema_delta.py +++ b/scripts-dev/check_schema_delta.py @@ -12,7 +12,7 @@ SCHEMA_FILE_REGEX = re.compile(r"^synapse/storage/schema/(.*)/delta/(.*)/(.*)$") INDEX_CREATION_REGEX = re.compile( - r"CREATE .*INDEX .*ON ([a-z_0-9]+)", flags=re.IGNORECASE + r"CREATE .*INDEX .*ON ([a-z_0-9]+)\s+\(", flags=re.IGNORECASE ) INDEX_DELETION_REGEX = re.compile(r"DROP .*INDEX ([a-z_0-9]+)", flags=re.IGNORECASE) TABLE_CREATION_REGEX = re.compile( diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index 6a5d5c7b3cc..d5bad75451e 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -17,6 +17,7 @@ from itertools import chain from typing import TYPE_CHECKING, AbstractSet, Mapping +import attr from prometheus_client import Histogram from typing_extensions import assert_never @@ -62,6 +63,7 @@ HaveSentRoomFlag, MutablePerConnectionState, PerConnectionState, + RoomLazyMembershipChanges, RoomSyncConfig, SlidingSyncConfig, SlidingSyncResult, @@ -983,14 +985,15 @@ async def get_room_sync_data( # # Calculate the `StateFilter` based on the `required_state` for the room required_state_filter = StateFilter.none() - # The requested `required_state_map` with the lazy membership expanded and - # `$ME` replaced with the user's ID. This allows us to see what membership we've - # sent down to the client in the next request. - # - # Make a copy so we can modify it. Still need to be careful to make a copy of - # the state key sets if we want to add/remove from them. We could make a deep - # copy but this saves us some work. - expanded_required_state_map = dict(room_sync_config.required_state_map) + + # Keep track of which users' state we may need to fetch. We split this + # into explicit users and lazy loaded users. + explicit_user_state = set() + lazy_load_user_ids = set() + + # Whether lazy-loading of room members is enabled. + lazy_load_room_members = False + if room_membership_for_user_at_to_token.membership not in ( Membership.INVITE, Membership.KNOCK, @@ -1038,7 +1041,6 @@ async def get_room_sync_data( else: required_state_types: list[tuple[str, str | None]] = [] num_wild_state_keys = 0 - lazy_load_room_members = False num_others = 0 for ( state_type, @@ -1070,43 +1072,60 @@ async def get_room_sync_data( timeline_event.state_key ) + # The client needs to know the membership of everyone in + # the timeline we're returning. + lazy_load_user_ids.update(timeline_membership) + # Update the required state filter so we pick up the new # membership - for user_id in timeline_membership: - required_state_types.append( - (EventTypes.Member, user_id) + if limited or initial: + # If the timeline is limited, we only need to + # return the membership changes for people in + # the timeline. + for user_id in timeline_membership: + required_state_types.append( + (EventTypes.Member, user_id) + ) + else: + # For non-limited timelines we always return all + # membership changes. This is so that clients + # who have fetched the full membership list + # already can continue to maintain it for + # non-limited syncs. + # + # This assumes that for non-limited syncs there + # won't be many membership changes that wouldn't + # have been included already (this can only + # happen if membership state was rolled back due + # to state resolution anyway). + # + # `None` is a wildcard in the `StateFilter` + required_state_types.append((EventTypes.Member, None)) + + # Record the extra members we're returning. + lazy_load_user_ids.update( + state_key + for event_type, state_key in room_state_delta_id_map + if event_type == EventTypes.Member ) - - # Add an explicit entry for each user in the timeline - # - # Make a new set or copy of the state key set so we can - # modify it without affecting the original - # `required_state_map` - expanded_required_state_map[EventTypes.Member] = ( - expanded_required_state_map.get( - EventTypes.Member, set() - ) - | timeline_membership - ) - elif state_key == StateValues.ME: + else: num_others += 1 - required_state_types.append((state_type, user.to_string())) + # Replace `$ME` with the user's ID so we can deduplicate # when someone requests the same state with `$ME` or with # their user ID. - # - # Make a new set or copy of the state key set so we can - # modify it without affecting the original - # `required_state_map` - expanded_required_state_map[EventTypes.Member] = ( - expanded_required_state_map.get( - EventTypes.Member, set() - ) - | {user.to_string()} + normalized_state_key = state_key + if state_key == StateValues.ME: + normalized_state_key = user.to_string() + + if state_type == EventTypes.Member: + # Also track explicitly requested member state for + # lazy membership tracking. + explicit_user_state.add(normalized_state_key) + + required_state_types.append( + (state_type, normalized_state_key) ) - else: - num_others += 1 - required_state_types.append((state_type, state_key)) set_tag( SynapseTags.FUNC_ARG_PREFIX @@ -1124,6 +1143,10 @@ async def get_room_sync_data( required_state_filter = StateFilter.from_types(required_state_types) + # Remove any explicitly requested user state from the lazy-loaded set, + # as we track them separately. + lazy_load_user_ids -= explicit_user_state + # We need this base set of info for the response so let's just fetch it along # with the `required_state` for the room hero_room_state = [ @@ -1151,6 +1174,22 @@ async def get_room_sync_data( # We can return all of the state that was requested if this was the first # time we've sent the room down this connection. room_state: StateMap[EventBase] = {} + + # Includes the state for the heroes if we need them (may contain other + # state as well). + hero_membership_state: StateMap[EventBase] = {} + + # By default we mark all required user state as being added when lazy + # loaded members is enabled. + # + # We may later update this to account for previously sent members. + returned_user_id_to_last_seen_ts_map = {} + if lazy_load_room_members: + returned_user_id_to_last_seen_ts_map = dict.fromkeys(lazy_load_user_ids) + new_connection_state.room_lazy_membership[room_id] = RoomLazyMembershipChanges( + returned_user_id_to_last_seen_ts_map=returned_user_id_to_last_seen_ts_map + ) + if initial: room_state = await self.get_current_state_at( room_id=room_id, @@ -1158,28 +1197,72 @@ async def get_room_sync_data( state_filter=state_filter, to_token=to_token, ) + + # The `room_state` includes the hero membership state if needed. + # We'll later filter this down so we don't need to do so here. + hero_membership_state = room_state else: + assert from_token is not None assert from_bound is not None if prev_room_sync_config is not None: + # Define `required_user_state` as all user state we want, which + # is the explicitly requested members, any needed for lazy + # loading, and users whose membership has changed.s + all_required_user_state = explicit_user_state | lazy_load_user_ids + for state_type, state_key in room_state_delta_id_map: + if state_type == EventTypes.Member: + all_required_user_state.add(state_key) + + # We need to know what user state we previously sent down the + # connection so we can determine what has changed. + # + # We don't just pull out the lazy loaded members here to handle + # the case where the client added explicit user state requests + # for users they already had lazy loaded. + if all_required_user_state: + previously_returned_user_to_last_seen = ( + await self.store.get_sliding_sync_connection_lazy_members( + connection_position=from_token.connection_position, + room_id=room_id, + user_ids=all_required_user_state, + ) + ) + + # Update the room lazy membership changes to track which + # lazy loaded members were needed for this sync. This is so + # that we can correctly track the last time we sent down + # users' membership (and so can evict old membership state + # from the DB tables). + returned_user_id_to_last_seen_ts_map.update( + previously_returned_user_to_last_seen + ) + else: + previously_returned_user_to_last_seen = {} + # Check if there are any changes to the required state config # that we need to handle. - changed_required_state_map, added_state_filter = ( - _required_state_changes( - user.to_string(), - prev_required_state_map=prev_room_sync_config.required_state_map, - request_required_state_map=expanded_required_state_map, - state_deltas=room_state_delta_id_map, - ) + changes_return = _required_state_changes( + user.to_string(), + prev_required_state_map=prev_room_sync_config.required_state_map, + request_required_state_map=room_sync_config.required_state_map, + previously_returned_lazy_user_ids=previously_returned_user_to_last_seen.keys(), + lazy_load_user_ids=lazy_load_user_ids, + state_deltas=room_state_delta_id_map, ) + changed_required_state_map = changes_return.required_state_map_change + + new_connection_state.room_lazy_membership[ + room_id + ].invalidated_user_ids = changes_return.lazy_members_invalidated - if added_state_filter: + if changes_return.added_state_filter: # Some state entries got added, so we pull out the current # state for them. If we don't do this we'd only send down new deltas. state_ids = await self.get_current_state_ids_at( room_id=room_id, room_membership_for_user_at_to_token=room_membership_for_user_at_to_token, - state_filter=added_state_filter, + state_filter=changes_return.added_state_filter, to_token=to_token, ) room_state_delta_id_map.update(state_ids) @@ -1191,6 +1274,7 @@ async def get_room_sync_data( # If the membership changed and we have to get heroes, get the remaining # heroes from the state + hero_membership_state = {} if hero_user_ids: hero_membership_state = await self.get_current_state_at( room_id=room_id, @@ -1198,7 +1282,6 @@ async def get_room_sync_data( state_filter=StateFilter.from_types(hero_room_state), to_token=to_token, ) - room_state.update(hero_membership_state) required_room_state: StateMap[EventBase] = {} if required_state_filter != StateFilter.none(): @@ -1221,7 +1304,7 @@ async def get_room_sync_data( # Assemble heroes: extract the info from the state we just fetched heroes: list[SlidingSyncResult.RoomResult.StrippedHero] = [] for hero_user_id in hero_user_ids: - member_event = room_state.get((EventTypes.Member, hero_user_id)) + member_event = hero_membership_state.get((EventTypes.Member, hero_user_id)) if member_event is not None: heroes.append( SlidingSyncResult.RoomResult.StrippedHero( @@ -1283,7 +1366,7 @@ async def get_room_sync_data( bump_stamp = 0 room_sync_required_state_map_to_persist: Mapping[str, AbstractSet[str]] = ( - expanded_required_state_map + room_sync_config.required_state_map ) if changed_required_state_map: room_sync_required_state_map_to_persist = changed_required_state_map @@ -1473,13 +1556,38 @@ async def _get_bump_stamp( return None +@attr.s(auto_attribs=True) +class _RequiredStateChangesReturn: + """Return type for _required_state_changes. + + Attributes: + required_state_map_change: The updated required state map to store in + the room config, or None if there is no change. + added_state_filter: The state filter to use to fetch any additional + current state that needs to be returned to the client. + lazy_members_previously_returned: The set of user IDs we should add to + the lazy members cache that we had previously returned. + lazy_members_invalidated: The set of user IDs whose membership has + changed but we didn't send down, so we need to invalidate them from + the cache. + """ + + required_state_map_change: Mapping[str, AbstractSet[str]] | None + added_state_filter: StateFilter + + lazy_members_previously_returned: AbstractSet[str] = frozenset() + lazy_members_invalidated: AbstractSet[str] = frozenset() + + def _required_state_changes( user_id: str, *, prev_required_state_map: Mapping[str, AbstractSet[str]], request_required_state_map: Mapping[str, AbstractSet[str]], + previously_returned_lazy_user_ids: AbstractSet[str], + lazy_load_user_ids: AbstractSet[str], state_deltas: StateMap[str], -) -> tuple[Mapping[str, AbstractSet[str]] | None, StateFilter]: +) -> _RequiredStateChangesReturn: """Calculates the changes between the required state room config from the previous requests compared with the current request. @@ -1493,14 +1601,61 @@ def _required_state_changes( added, removed and then added again to the required state. In that case we only want to re-send that entry down sync if it has changed. - Returns: - A 2-tuple of updated required state config (or None if there is no update) - and the state filter to use to fetch extra current state that we need to - return. + Args: + user_id: The user ID of the user making the request. + prev_required_state_map: The required state map from the previous + request. + request_required_state_map: The required state map from the current + request. + previously_returned_lazy_user_ids: The set of user IDs whose membership + we have previously returned to the client due to lazy loading. This + is filtered to only include users who have either sent events in the + timeline, required state or whose membership changed. + lazy_load_user_ids: The set of user IDs whose lazy-loaded membership + is required for this request. + state_deltas: The state deltas that have changed in the room since the + previous request. """ + + # First we find any lazy members that have been invalidated due to state + # changes that we are not sending down. + lazy_members_invalidated = set() + for event_type, state_key in state_deltas: + if event_type != EventTypes.Member: + continue + + if state_key in lazy_load_user_ids: + # Because it's part of the `required_user_state`, we're going to + # send this member change down. + continue + + if state_key not in previously_returned_lazy_user_ids: + # We've not previously returned this member so nothing to + # invalidate. + continue + + lazy_members_invalidated.add(state_key) + if prev_required_state_map == request_required_state_map: - # There has been no change. Return immediately. - return None, StateFilter.none() + # There has been no change in state, just need to check lazy members. + newly_returned_lazy_members = ( + lazy_load_user_ids - previously_returned_lazy_user_ids + ) + if newly_returned_lazy_members: + # There are some new lazy members we need to fetch. + added_types: list[tuple[str, str | None]] = [] + for new_user_id in newly_returned_lazy_members: + added_types.append((EventTypes.Member, new_user_id)) + + added_state_filter = StateFilter.from_types(added_types) + else: + added_state_filter = StateFilter.none() + + return _RequiredStateChangesReturn( + required_state_map_change=None, + added_state_filter=added_state_filter, + lazy_members_invalidated=lazy_members_invalidated, + ) prev_wildcard = prev_required_state_map.get(StateValues.WILDCARD, set()) request_wildcard = request_required_state_map.get(StateValues.WILDCARD, set()) @@ -1510,17 +1665,29 @@ def _required_state_changes( # already fetching everything, we don't have to fetch anything now that they've # narrowed. if StateValues.WILDCARD in prev_wildcard: - return request_required_state_map, StateFilter.none() + return _RequiredStateChangesReturn( + required_state_map_change=request_required_state_map, + added_state_filter=StateFilter.none(), + lazy_members_invalidated=lazy_members_invalidated, + ) # If a event type wildcard has been added or removed we don't try and do # anything fancy, and instead always update the effective room required # state config to match the request. if request_wildcard - prev_wildcard: # Some keys were added, so we need to fetch everything - return request_required_state_map, StateFilter.all() + return _RequiredStateChangesReturn( + required_state_map_change=request_required_state_map, + added_state_filter=StateFilter.all(), + lazy_members_invalidated=lazy_members_invalidated, + ) if prev_wildcard - request_wildcard: # Keys were only removed, so we don't have to fetch everything. - return request_required_state_map, StateFilter.none() + return _RequiredStateChangesReturn( + required_state_map_change=request_required_state_map, + added_state_filter=StateFilter.none(), + lazy_members_invalidated=lazy_members_invalidated, + ) # Contains updates to the required state map compared with the previous room # config. This has the same format as `RoomSyncConfig.required_state` @@ -1530,6 +1697,11 @@ def _required_state_changes( # client. Passed to `StateFilter.from_types(...)` added: list[tuple[str, str | None]] = [] + # Record any members that were previously explicitly tracked and should now + # be tracked as lazy members. This handles the case of membership changing + # from e.g. `{@alice:example.com}` to `{LAZY}`. + lazy_members_previously_returned: set[str] = set() + # Convert the list of state deltas to map from type to state_keys that have # changed. changed_types_to_state_keys: dict[str, set[str]] = {} @@ -1552,6 +1724,39 @@ def _required_state_changes( # Nothing *added*, so we skip. Removals happen below. continue + # Handle the special case of adding LAZY membership, where we want to + # remember what explicit members we've previously sent down. + if event_type == EventTypes.Member: + old_state_key_lazy = StateValues.LAZY in old_state_keys + request_state_key_lazy = StateValues.LAZY in request_state_keys + if not old_state_key_lazy and request_state_key_lazy: + # We're adding a LAZY flag. We therefore add any previously + # explicit members we've sent down to lazy cache. + for state_key in old_state_keys: + if ( + state_key == StateValues.WILDCARD + or state_key == StateValues.LAZY + ): + # Ignore non-user IDs. + continue + + if state_key == StateValues.ME: + # Normalize to proper user ID + state_key = user_id + + # We remember the user if either a) they haven't been + # invalidated... + if (EventTypes.Member, state_key) not in state_deltas: + lazy_members_previously_returned.add(state_key) + + # ...or b) if we are going to send the delta down in this + # sync. + if state_key in lazy_load_user_ids: + lazy_members_previously_returned.add(state_key) + + changes[event_type] = request_state_keys + continue + # We only remove state keys from the effective state if they've been # removed from the request *and* the state has changed. This ensures # that if a client removes and then re-adds a state key, we only send @@ -1622,9 +1827,23 @@ def _required_state_changes( # LAZY values should also be ignore for event types that are # not membership. pass + elif event_type == EventTypes.Member: + if state_key not in previously_returned_lazy_user_ids: + # Only add *explicit* members we haven't previously sent + # down. + added.append((event_type, state_key)) else: added.append((event_type, state_key)) + # We also need to pull out any lazy members that are now required but + # haven't previously been returned. + for required_user_id in ( + lazy_load_user_ids + - previously_returned_lazy_user_ids + - lazy_members_previously_returned + ): + added.append((EventTypes.Member, required_user_id)) + added_state_filter = StateFilter.from_types(added) # Figure out what changes we need to apply to the effective required state @@ -1696,6 +1915,16 @@ def _required_state_changes( # Remove entries with empty state keys. new_required_state_map.pop(event_type, None) - return new_required_state_map, added_state_filter + return _RequiredStateChangesReturn( + required_state_map_change=new_required_state_map, + added_state_filter=added_state_filter, + lazy_members_invalidated=lazy_members_invalidated, + lazy_members_previously_returned=lazy_members_previously_returned, + ) else: - return None, added_state_filter + return _RequiredStateChangesReturn( + required_state_map_change=None, + added_state_filter=added_state_filter, + lazy_members_invalidated=lazy_members_invalidated, + lazy_members_previously_returned=lazy_members_previously_returned, + ) diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index 8cd3de8f40c..043eccce249 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -14,7 +14,7 @@ import logging -from typing import TYPE_CHECKING, Mapping, cast +from typing import TYPE_CHECKING, AbstractSet, Mapping, cast import attr @@ -26,13 +26,16 @@ DatabasePool, LoggingDatabaseConnection, LoggingTransaction, + make_in_list_sql_clause, ) +from synapse.storage.engines import PostgresEngine from synapse.types import MultiWriterStreamToken, RoomStreamToken from synapse.types.handlers.sliding_sync import ( HaveSentRoom, HaveSentRoomFlag, MutablePerConnectionState, PerConnectionState, + RoomLazyMembershipChanges, RoomStatusMap, RoomSyncConfig, ) @@ -52,6 +55,10 @@ logger = logging.getLogger(__name__) +# How often to update the last seen timestamp for lazy members. We don't want to +# update it too often as that causes DB writes. +LAZY_MEMBERS_UPDATE_INTERVAL_MS = ONE_HOUR_SECONDS * MILLISECONDS_PER_SECOND + # How often to update the `last_used_ts` column on # `sliding_sync_connection_positions` when the client uses a connection # position. We don't want to update it on every use to avoid excessive @@ -378,6 +385,13 @@ def persist_per_connection_state_txn( value_values=values, ) + self._persist_sliding_sync_connection_lazy_members_txn( + txn, + connection_key, + connection_position, + per_connection_state.room_lazy_membership, + ) + return connection_position @cached(iterable=True, max_entries=100000) @@ -448,6 +462,19 @@ def _get_and_clear_connection_positions_txn( """ txn.execute(sql, (connection_key, connection_position)) + # Move any lazy membership entries for this connection position to have + # `NULL` connection position, indicating that it applies to all future + # positions on this connecetion. + self.db_pool.simple_update_txn( + txn, + table="sliding_sync_connection_lazy_members", + keyvalues={ + "connection_key": connection_key, + "connection_position": connection_position, + }, + updatevalues={"connection_position": None}, + ) + # Fetch and create a mapping from required state ID to the actual # required state for the connection. rows = self.db_pool.simple_select_list_txn( @@ -527,8 +554,146 @@ def _get_and_clear_connection_positions_txn( receipts=RoomStatusMap(receipts), account_data=RoomStatusMap(account_data), room_configs=room_configs, + room_lazy_membership={}, + ) + + async def get_sliding_sync_connection_lazy_members( + self, + connection_position: int, + room_id: str, + user_ids: AbstractSet[str], + ) -> Mapping[str, int]: + """Get which user IDs in the room we have previously sent lazy + membership for. + + Args: + connection_position: The sliding sync connection position. + room_id: The room ID to get lazy members for. + user_ids: The user IDs to check for lazy membership. + + Returns: + The mapping of user IDs to the last seen timestamp for those user + IDs. + """ + + def get_sliding_sync_connection_lazy_members_txn( + txn: LoggingTransaction, + ) -> Mapping[str, int]: + user_clause, user_args = make_in_list_sql_clause( + txn.database_engine, "user_id", user_ids + ) + + sql = f""" + SELECT user_id, connection_position, last_seen_ts + FROM sliding_sync_connection_lazy_members AS pos + WHERE room_id = ? AND {user_clause} + """ + + txn.execute(sql, (room_id, *user_args)) + + # Filter out any cache entries that only apply to forked connection + # positions. Entries with `NULL` connection position apply to all + # positions on the connection. + return { + user_id: last_seen_ts + for user_id, db_connection_position, last_seen_ts in txn + if db_connection_position == connection_position + or db_connection_position is None + } + + return await self.db_pool.runInteraction( + "sliding_sync_connection_lazy_members", + get_sliding_sync_connection_lazy_members_txn, + db_autocommit=True, # Avoid transaction for single read ) + def _persist_sliding_sync_connection_lazy_members_txn( + self, + txn: LoggingTransaction, + connection_key: int, + new_connection_position: int, + all_changes: dict[str, RoomLazyMembershipChanges], + ) -> None: + """Persist that we have sent lazy membership for the given user IDs.""" + + now = self.clock.time_msec() + + # Figure out which cache entries to add or update. + # + # These are either a) new entries we've never sent before (i.e. with a + # None last_seen_ts), or b) where the `last_seen_ts` is old enough that + # we want to update it. + # + # We don't update the timestamp every time to avoid hammering the DB + # with writes, and we don't need the timestamp to be precise. It is used + # to evict old entries that haven't been used in a while. + to_update: list[tuple[str, str]] = [] + for room_id, room_changes in all_changes.items(): + for ( + user_id, + last_seen_ts, + ) in room_changes.returned_user_id_to_last_seen_ts_map.items(): + if last_seen_ts is None: + # We've never sent this user before, so we need to record that + # we've sent it at the new connection position. + to_update.append((room_id, user_id)) + elif last_seen_ts + LAZY_MEMBERS_UPDATE_INTERVAL_MS < now: + # We last saw this user over + # `LAZY_MEMBERS_UPDATE_INTERVAL_MS` ago, so we update the + # timestamp (c.f. comment above). + to_update.append((room_id, user_id)) + + if to_update: + # Upsert the new/updated entries. + # + # Ignore conflicts where the existing entry has a different + # connection position (i.e. from a forked connection position). This + # may mean that we lose some updates, but that's acceptable as this + # is a cache and its fine for it to *not* include rows. (Downstream + # this will cause us to maybe send a few extra lazy members down + # sync, but we're allowed to send extra members). + sql = """ + INSERT INTO sliding_sync_connection_lazy_members + (connection_key, connection_position, room_id, user_id, last_seen_ts) + VALUES {value_placeholder} + ON CONFLICT (connection_key, room_id, user_id) + DO UPDATE SET last_seen_ts = EXCLUDED.last_seen_ts + WHERE sliding_sync_connection_lazy_members.connection_position IS NULL + OR sliding_sync_connection_lazy_members.connection_position = EXCLUDED.connection_position + """ + + args = [ + (connection_key, new_connection_position, room_id, user_id, now) + for room_id, user_id in to_update + ] + + if isinstance(self.database_engine, PostgresEngine): + sql = sql.format(value_placeholder="?") + txn.execute_values(sql, args, fetch=False) + else: + sql = sql.format(value_placeholder="(?, ?, ?, ?, ?)") + txn.execute_batch(sql, args) + + # Remove any invalidated entries. + to_remove: list[tuple[str, str]] = [] + for room_id, room_changes in all_changes.items(): + for user_id in room_changes.invalidated_user_ids: + to_remove.append((room_id, user_id)) + + if to_remove: + # We don't try and match on connection position here: it's fine to + # remove it from all forks. This is a cache so it's fine to expire + # arbitrary entries, the worst that happens is we send a few extra + # lazy members down sync. + self.db_pool.simple_delete_many_batch_txn( + txn, + table="sliding_sync_connection_lazy_members", + keys=("connection_key", "room_id", "user_id"), + values=[ + (connection_key, room_id, user_id) for room_id, user_id in to_remove + ], + ) + @wrap_as_background_process("delete_old_sliding_sync_connections") async def delete_old_sliding_sync_connections(self) -> None: """Delete sliding sync connections that have not been used for a long time.""" @@ -556,6 +721,8 @@ class PerConnectionStateDB: serialized to strings. When persisting this *only* contains updates to the state. + + The `room_lazy_membership` field is only used when persisting. """ last_used_ts: int | None @@ -566,6 +733,8 @@ class PerConnectionStateDB: room_configs: Mapping[str, "RoomSyncConfig"] + room_lazy_membership: dict[str, RoomLazyMembershipChanges] + @staticmethod async def from_state( per_connection_state: "MutablePerConnectionState", store: "DataStore" @@ -620,6 +789,7 @@ async def from_state( receipts=RoomStatusMap(receipts), account_data=RoomStatusMap(account_data), room_configs=per_connection_state.room_configs.maps[0], + room_lazy_membership=per_connection_state.room_lazy_membership, ) async def to_state(self, store: "DataStore") -> "PerConnectionState": diff --git a/synapse/storage/schema/main/delta/93/02_sliding_sync_members.sql b/synapse/storage/schema/main/delta/93/02_sliding_sync_members.sql new file mode 100644 index 00000000000..6521c955ea9 --- /dev/null +++ b/synapse/storage/schema/main/delta/93/02_sliding_sync_members.sql @@ -0,0 +1,48 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2025 Element Creations Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + + +-- Tracks which member states have been sent to the client for lazy-loaded +-- members in sliding sync. This is a *cache* as it doesn't matter if we send +-- down members we've previously sent down, i.e. it's safe to delete any rows. +-- +-- We track a *rough* `last_seen_ts` for each user in each room which indicates +-- when we last would've sent their member state to the client. This is used so +-- that we can remove members which haven't been seen for a while to save space. +-- +-- Care must be taken when handling "forked" positions, i.e. we have responded +-- to a request with a position and then get another different request using the +-- previous position as a base. We track this by including a +-- `connection_position` for newly inserted rows. When we advance the position +-- we set this to NULL for all rows which were present at that position, and +-- delete all other rows. When reading rows we can then filter out any rows +-- which have a non-NULL `connection_position` which is not the current +-- position. +-- +-- I.e. `connection_position` is NULL for rows which are valid for *all* +-- positions on the connection, and is non-NULL for rows which are only valid +-- for a specific position. +-- +-- When invalidating rows, we can just delete them. Technically this could +-- invalidate for a forked position, but this is acceptable as equivalent to a +-- cache eviction. +CREATE TABLE sliding_sync_connection_lazy_members ( + connection_key BIGINT NOT NULL REFERENCES sliding_sync_connections(connection_key) ON DELETE CASCADE, + connection_position BIGINT REFERENCES sliding_sync_connection_positions(connection_position) ON DELETE CASCADE, + room_id TEXT NOT NULL, + user_id TEXT NOT NULL, + last_seen_ts BIGINT NOT NULL +); + +CREATE UNIQUE INDEX sliding_sync_connection_lazy_members_idx ON sliding_sync_connection_lazy_members (connection_key, room_id, user_id); +CREATE INDEX sliding_sync_connection_lazy_members_pos_idx ON sliding_sync_connection_lazy_members (connection_key, connection_position) WHERE connection_position IS NOT NULL; diff --git a/synapse/types/handlers/sliding_sync.py b/synapse/types/handlers/sliding_sync.py index 03b3bcb3caf..e66080baf09 100644 --- a/synapse/types/handlers/sliding_sync.py +++ b/synapse/types/handlers/sliding_sync.py @@ -891,6 +891,43 @@ def __len__(self) -> int: return len(self.rooms) + len(self.receipts) + len(self.room_configs) +@attr.s(auto_attribs=True) +class RoomLazyMembershipChanges: + """Changes to lazily-loaded room memberships for a given room. + + Attributes: + returned: Map from user ID to timestamp for users whose membership we + have lazily loaded. The timestamp indicates the time we previously + saw the membership if we have sent it down previously, or None if + we sent it down for the first time. + + Note: this will include users whose membership we would have sent + down but didn't due to us having previously sent them. + invalidated: Set of user IDs whose latest membership we have *not* sent + down + """ + + # A map from user ID -> timestamp. Indicates that those memberships have + # been lazily loaded. I.e. that either a) we sent those memberships down, or + # b) we did so previously. The timestamp indicates the time we previously + # saw the membership. + # + # We track a *rough* `last_seen_ts` for each user in each room which + # indicates when we last would've sent their member state to the client. + # This is used so that we can remove members which haven't been seen for a + # while to save space. + returned_user_id_to_last_seen_ts_map: Mapping[str, int | None] = attr.Factory(dict) + + # A set of user IDs whose membership change we have *not* sent + # down + invalidated_user_ids: AbstractSet[str] = attr.Factory(set) + + def __bool__(self) -> bool: + return bool( + self.returned_user_id_to_last_seen_ts_map or self.invalidated_user_ids + ) + + @attr.s(auto_attribs=True) class MutablePerConnectionState(PerConnectionState): """A mutable version of `PerConnectionState`""" @@ -903,12 +940,19 @@ class MutablePerConnectionState(PerConnectionState): room_configs: typing.ChainMap[str, RoomSyncConfig] + # A map from room ID -> user ID -> timestamp. Indicates that those + # memberships have been lazily loaded. I.e. that either a) we sent those + # memberships down, or b) we did so previously. The timestamp indicates the + # time we previously saw the membership. + room_lazy_membership: dict[str, RoomLazyMembershipChanges] = attr.Factory(dict) + def has_updates(self) -> bool: return ( bool(self.rooms.get_updates()) or bool(self.receipts.get_updates()) or bool(self.account_data.get_updates()) or bool(self.get_room_config_updates()) + or bool(self.room_lazy_membership) ) def get_room_config_updates(self) -> Mapping[str, RoomSyncConfig]: diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 45829064415..bb76c4b2324 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -18,7 +18,7 @@ # # import logging -from typing import AbstractSet, Mapping +from typing import AbstractSet from unittest.mock import patch import attr @@ -38,13 +38,17 @@ RoomSyncConfig, StateValues, _required_state_changes, + _RequiredStateChangesReturn, ) from synapse.rest import admin from synapse.rest.client import knock, login, room from synapse.server import HomeServer from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.types import JsonDict, StateMap, StreamToken, UserID, create_requester -from synapse.types.handlers.sliding_sync import PerConnectionState, SlidingSyncConfig +from synapse.types.handlers.sliding_sync import ( + PerConnectionState, + SlidingSyncConfig, +) from synapse.types.state import StateFilter from synapse.util.clock import Clock @@ -3827,12 +3831,11 @@ class RequiredStateChangesTestParameters: previous_required_state_map: dict[str, set[str]] request_required_state_map: dict[str, set[str]] state_deltas: StateMap[str] - expected_with_state_deltas: tuple[ - Mapping[str, AbstractSet[str]] | None, StateFilter - ] - expected_without_state_deltas: tuple[ - Mapping[str, AbstractSet[str]] | None, StateFilter - ] + expected_with_state_deltas: _RequiredStateChangesReturn + expected_without_state_deltas: _RequiredStateChangesReturn + + previously_returned_lazy_user_ids: AbstractSet[str] = frozenset() + lazy_load_user_ids: AbstractSet[str] = frozenset() class RequiredStateChangesTestCase(unittest.TestCase): @@ -3848,8 +3851,12 @@ class RequiredStateChangesTestCase(unittest.TestCase): request_required_state_map={"type1": {"state_key"}}, state_deltas={("type1", "state_key"): "$event_id"}, # No changes - expected_with_state_deltas=(None, StateFilter.none()), - expected_without_state_deltas=(None, StateFilter.none()), + expected_with_state_deltas=_RequiredStateChangesReturn( + None, StateFilter.none() + ), + expected_without_state_deltas=_RequiredStateChangesReturn( + None, StateFilter.none() + ), ), ), ( @@ -3862,14 +3869,14 @@ class RequiredStateChangesTestCase(unittest.TestCase): "type2": {"state_key"}, }, state_deltas={("type2", "state_key"): "$event_id"}, - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( # We've added a type so we should persist the changed required state # config. {"type1": {"state_key"}, "type2": {"state_key"}}, # We should see the new type added StateFilter.from_types([("type2", "state_key")]), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( {"type1": {"state_key"}, "type2": {"state_key"}}, StateFilter.from_types([("type2", "state_key")]), ), @@ -3885,7 +3892,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): "type2": {"state_key"}, }, state_deltas={("type2", "state_key"): "$event_id"}, - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( # We've added a type so we should persist the changed required state # config. {"type1": {"state_key"}, "type2": {"state_key"}}, @@ -3894,7 +3901,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): [("type1", "state_key"), ("type2", "state_key")] ), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( {"type1": {"state_key"}, "type2": {"state_key"}}, StateFilter.from_types( [("type1", "state_key"), ("type2", "state_key")] @@ -3909,14 +3916,14 @@ class RequiredStateChangesTestCase(unittest.TestCase): previous_required_state_map={"type": {"state_key1"}}, request_required_state_map={"type": {"state_key1", "state_key2"}}, state_deltas={("type", "state_key2"): "$event_id"}, - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( # We've added a key so we should persist the changed required state # config. {"type": {"state_key1", "state_key2"}}, # We should see the new state_keys added StateFilter.from_types([("type", "state_key2")]), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( {"type": {"state_key1", "state_key2"}}, StateFilter.from_types([("type", "state_key2")]), ), @@ -3929,7 +3936,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): previous_required_state_map={"type": {"state_key1"}}, request_required_state_map={"type": {"state_key2", "state_key3"}}, state_deltas={("type", "state_key2"): "$event_id"}, - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( # We've added a key so we should persist the changed required state # config. # @@ -3940,7 +3947,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): [("type", "state_key2"), ("type", "state_key3")] ), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( {"type": {"state_key1", "state_key2", "state_key3"}}, StateFilter.from_types( [("type", "state_key2"), ("type", "state_key3")] @@ -3964,7 +3971,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): }, request_required_state_map={"type1": {"state_key"}}, state_deltas={("type2", "state_key"): "$event_id"}, - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( # Remove `type2` since there's been a change to that state, # (persist the change to required state). That way next time, # they request `type2`, we see that we haven't sent it before @@ -3975,7 +3982,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): # less state now StateFilter.none(), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( # `type2` is no longer requested but since that state hasn't # changed, nothing should change (we should still keep track # that we've sent `type2` before). @@ -3998,7 +4005,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): }, request_required_state_map={}, state_deltas={("type2", "state_key"): "$event_id"}, - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( # Remove `type2` since there's been a change to that state, # (persist the change to required state). That way next time, # they request `type2`, we see that we haven't sent it before @@ -4009,7 +4016,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): # less state now StateFilter.none(), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( # `type2` is no longer requested but since that state hasn't # changed, nothing should change (we should still keep track # that we've sent `type2` before). @@ -4029,7 +4036,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): previous_required_state_map={"type": {"state_key1", "state_key2"}}, request_required_state_map={"type": {"state_key1"}}, state_deltas={("type", "state_key2"): "$event_id"}, - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( # Remove `(type, state_key2)` since there's been a change # to that state (persist the change to required state). # That way next time, they request `(type, state_key2)`, we see @@ -4041,7 +4048,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): # less state now StateFilter.none(), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( # `(type, state_key2)` is no longer requested but since that # state hasn't changed, nothing should change (we should still # keep track that we've sent `(type, state_key1)` and `(type, @@ -4073,11 +4080,11 @@ class RequiredStateChangesTestCase(unittest.TestCase): ("other_type", "state_key"): "$event_id", }, # We've added a wildcard, so we persist the change and request everything - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( {"type1": {"state_key2"}, StateValues.WILDCARD: {"state_key"}}, StateFilter.all(), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( {"type1": {"state_key2"}, StateValues.WILDCARD: {"state_key"}}, StateFilter.all(), ), @@ -4103,13 +4110,13 @@ class RequiredStateChangesTestCase(unittest.TestCase): ("other_type", "state_key"): "$event_id", }, # We've removed a type wildcard, so we persist the change but don't request anything - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( {"type1": {"state_key2"}}, # We don't need to request anything more if they are requesting # less state now StateFilter.none(), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( {"type1": {"state_key2"}}, # We don't need to request anything more if they are requesting # less state now @@ -4129,11 +4136,11 @@ class RequiredStateChangesTestCase(unittest.TestCase): state_deltas={("type2", "state_key"): "$event_id"}, # We've added a wildcard state_key, so we persist the change and # request all of the state for that type - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( {"type1": {"state_key"}, "type2": {StateValues.WILDCARD}}, StateFilter.from_types([("type2", None)]), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( {"type1": {"state_key"}, "type2": {StateValues.WILDCARD}}, StateFilter.from_types([("type2", None)]), ), @@ -4151,7 +4158,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): state_deltas={("type2", "state_key"): "$event_id"}, # We've removed a state_key wildcard, so we persist the change and # request nothing - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( {"type1": {"state_key"}}, # We don't need to request anything more if they are requesting # less state now @@ -4160,7 +4167,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): # We've removed a state_key wildcard but there have been no matching # state changes, so no changes needed, just persist the # `request_required_state_map` as-is. - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( None, # We don't need to request anything more if they are requesting # less state now @@ -4180,7 +4187,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): }, request_required_state_map={"type1": {"state_key1"}}, state_deltas={("type1", "state_key3"): "$event_id"}, - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( # We've removed some state keys from the type, but only state_key3 was # changed so only that one should be removed. {"type1": {"state_key1", "state_key2"}}, @@ -4188,7 +4195,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): # less state now StateFilter.none(), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( # No changes needed, just persist the # `request_required_state_map` as-is None, @@ -4207,14 +4214,14 @@ class RequiredStateChangesTestCase(unittest.TestCase): previous_required_state_map={}, request_required_state_map={"type1": {StateValues.ME}}, state_deltas={("type1", "@user:test"): "$event_id"}, - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( # We've added a type so we should persist the changed required state # config. {"type1": {StateValues.ME}}, # We should see the new state_keys added StateFilter.from_types([("type1", "@user:test")]), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( {"type1": {StateValues.ME}}, StateFilter.from_types([("type1", "@user:test")]), ), @@ -4229,7 +4236,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): previous_required_state_map={"type1": {StateValues.ME}}, request_required_state_map={}, state_deltas={("type1", "@user:test"): "$event_id"}, - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( # Remove `type1` since there's been a change to that state, # (persist the change to required state). That way next time, # they request `type1`, we see that we haven't sent it before @@ -4240,7 +4247,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): # less state now StateFilter.none(), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( # `type1` is no longer requested but since that state hasn't # changed, nothing should change (we should still keep track # that we've sent `type1` before). @@ -4260,14 +4267,14 @@ class RequiredStateChangesTestCase(unittest.TestCase): previous_required_state_map={}, request_required_state_map={"type1": {"@user:test"}}, state_deltas={("type1", "@user:test"): "$event_id"}, - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( # We've added a type so we should persist the changed required state # config. {"type1": {"@user:test"}}, # We should see the new state_keys added StateFilter.from_types([("type1", "@user:test")]), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( {"type1": {"@user:test"}}, StateFilter.from_types([("type1", "@user:test")]), ), @@ -4282,7 +4289,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): previous_required_state_map={"type1": {"@user:test"}}, request_required_state_map={}, state_deltas={("type1", "@user:test"): "$event_id"}, - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( # Remove `type1` since there's been a change to that state, # (persist the change to required state). That way next time, # they request `type1`, we see that we haven't sent it before @@ -4293,7 +4300,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): # less state now StateFilter.none(), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( # `type1` is no longer requested but since that state hasn't # changed, nothing should change (we should still keep track # that we've sent `type1` before). @@ -4313,13 +4320,13 @@ class RequiredStateChangesTestCase(unittest.TestCase): previous_required_state_map={}, request_required_state_map={EventTypes.Member: {StateValues.LAZY}}, state_deltas={(EventTypes.Member, "@user:test"): "$event_id"}, - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( # If a "$LAZY" has been added or removed we always update the # required state to what was requested for simplicity. {EventTypes.Member: {StateValues.LAZY}}, StateFilter.none(), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( {EventTypes.Member: {StateValues.LAZY}}, StateFilter.none(), ), @@ -4334,7 +4341,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): previous_required_state_map={EventTypes.Member: {StateValues.LAZY}}, request_required_state_map={}, state_deltas={(EventTypes.Member, "@user:test"): "$event_id"}, - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( # If a "$LAZY" has been added or removed we always update the # required state to what was requested for simplicity. {}, @@ -4342,7 +4349,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): # less state now StateFilter.none(), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( # `EventTypes.Member` is no longer requested but since that # state hasn't changed, nothing should change (we should still # keep track that we've sent `EventTypes.Member` before). @@ -4361,41 +4368,40 @@ class RequiredStateChangesTestCase(unittest.TestCase): we're sending down another response without any timeline events. """, RequiredStateChangesTestParameters( - previous_required_state_map={ - EventTypes.Member: { - StateValues.LAZY, - "@user2:test", - "@user3:test", - } - }, + previous_required_state_map={EventTypes.Member: {StateValues.LAZY}}, request_required_state_map={EventTypes.Member: {StateValues.LAZY}}, + previously_returned_lazy_user_ids={"@user2:test", "@user3:test"}, + lazy_load_user_ids=set(), state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"}, - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( + # The `request_required_state_map` hasn't changed + None, + # We don't need to request anything more if they are requesting + # less state now + StateFilter.none(), + # Previous request did not include any explicit members, + # so nothing to store. + lazy_members_previously_returned=frozenset(), # Remove "@user2:test" since that state has changed and is no # longer being requested anymore. Since something was removed, # we should persist the changed to required state. That way next # time, they request "@user2:test", we see that we haven't sent # it before and send the new state. (we should still keep track # that we've sent specific `EventTypes.Member` before) - { - EventTypes.Member: { - StateValues.LAZY, - "@user3:test", - } - }, - # We don't need to request anything more if they are requesting - # less state now - StateFilter.none(), + lazy_members_invalidated={"@user2:test"}, ), - expected_without_state_deltas=( - # We're not requesting any specific `EventTypes.Member` now but - # since that state hasn't changed, nothing should change (we - # should still keep track that we've sent specific - # `EventTypes.Member` before). + expected_without_state_deltas=_RequiredStateChangesReturn( + # The `request_required_state_map` hasn't changed None, # We don't need to request anything more if they are requesting # less state now StateFilter.none(), + # Previous request did not include any explicit members, + # so nothing to store. + lazy_members_previously_returned=frozenset(), + # Nothing should change (we should still keep track that + # we've sent specific `EventTypes.Member` before). + lazy_members_invalidated=frozenset(), ), ), ), @@ -4407,50 +4413,37 @@ class RequiredStateChangesTestCase(unittest.TestCase): we're sending down another response with a new event from user4. """, RequiredStateChangesTestParameters( - previous_required_state_map={ - EventTypes.Member: { - StateValues.LAZY, - "@user2:test", - "@user3:test", - } - }, - request_required_state_map={ - EventTypes.Member: {StateValues.LAZY, "@user4:test"} - }, + previous_required_state_map={EventTypes.Member: {StateValues.LAZY}}, + request_required_state_map={EventTypes.Member: {StateValues.LAZY}}, + previously_returned_lazy_user_ids={"@user2:test", "@user3:test"}, + lazy_load_user_ids={"@user4:test"}, state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"}, - expected_with_state_deltas=( - # Since "@user4:test" was added, we should persist the changed - # required state config. - # - # Also remove "@user2:test" since that state has changed and is no - # longer being requested anymore. Since something was removed, - # we also should persist the changed to required state. That way next - # time, they request "@user2:test", we see that we haven't sent - # it before and send the new state. (we should still keep track - # that we've sent specific `EventTypes.Member` before) - { - EventTypes.Member: { - StateValues.LAZY, - "@user3:test", - "@user4:test", - } - }, + expected_with_state_deltas=_RequiredStateChangesReturn( + # The `request_required_state_map` hasn't changed + None, # We should see the new state_keys added StateFilter.from_types([(EventTypes.Member, "@user4:test")]), + # Previous request did not include any explicit members, + # so nothing to store. + lazy_members_previously_returned=frozenset(), + # Remove "@user2:test" since that state has changed and + # is no longer being requested anymore. Since something + # was removed, we also should persist the changed to + # required state. That way next time, they request + # "@user2:test", we see that we haven't sent it before + # and send the new state. (we should still keep track + # that we've sent specific `EventTypes.Member` before) + lazy_members_invalidated={"@user2:test"}, ), - expected_without_state_deltas=( - # Since "@user4:test" was added, we should persist the changed - # required state config. - { - EventTypes.Member: { - StateValues.LAZY, - "@user2:test", - "@user3:test", - "@user4:test", - } - }, + expected_without_state_deltas=_RequiredStateChangesReturn( + # The `request_required_state_map` hasn't changed + None, # We should see the new state_keys added StateFilter.from_types([(EventTypes.Member, "@user4:test")]), + # Previous request did not include any explicit members, + # so nothing to store. + lazy_members_previously_returned=frozenset(), + lazy_members_invalidated=frozenset(), ), ), ), @@ -4464,40 +4457,39 @@ class RequiredStateChangesTestCase(unittest.TestCase): EventTypes.Member: {"@user2:test", "@user3:test"} }, request_required_state_map={EventTypes.Member: {StateValues.LAZY}}, + previously_returned_lazy_user_ids=frozenset(), + lazy_load_user_ids={"@user3:test"}, state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"}, - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( # Since `StateValues.LAZY` was added, we should persist the # changed required state config. + {EventTypes.Member: {StateValues.LAZY}}, + # We have already sent @user3 down before. # - # Also remove "@user2:test" since that state has changed and is no - # longer being requested anymore. Since something was removed, - # we also should persist the changed to required state. That way next - # time, they request "@user2:test", we see that we haven't sent - # it before and send the new state. (we should still keep track - # that we've sent specific `EventTypes.Member` before) - { - EventTypes.Member: { - StateValues.LAZY, - "@user3:test", - } - }, - # We don't need to request anything more if they are requesting - # less state now + # `@user3:test` is required for lazy loading, but we've + # already sent it down before, so we don't need to + # request it again. StateFilter.none(), + # Remember the fact that we've sent @user3 down before, + # but not @user2 as that has been invalidated. + lazy_members_previously_returned={"@user3:test"}, + # Nothing to invalidate as there are no existing lazy members. + lazy_members_invalidated=frozenset(), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( # Since `StateValues.LAZY` was added, we should persist the # changed required state config. - { - EventTypes.Member: { - StateValues.LAZY, - "@user2:test", - "@user3:test", - } - }, - # We don't need to request anything more if they are requesting - # less state now + {EventTypes.Member: {StateValues.LAZY}}, + # We have already sent @user3 down before. + # + # `@user3:test` is required for lazy loading, but we've + # already sent it down before, so we don't need to + # request it again. StateFilter.none(), + # Remember the fact that we've sent the users down before. + lazy_members_previously_returned={"@user2:test", "@user3:test"}, + # Nothing to invalidate as there are no existing lazy members. + lazy_members_invalidated=frozenset(), ), ), ), @@ -4507,36 +4499,33 @@ class RequiredStateChangesTestCase(unittest.TestCase): Test retracting the `required_state` to no longer lazy-loading room members. """, RequiredStateChangesTestParameters( - previous_required_state_map={ - EventTypes.Member: { - StateValues.LAZY, - "@user2:test", - "@user3:test", - } - }, + previous_required_state_map={EventTypes.Member: {StateValues.LAZY}}, request_required_state_map={}, + previously_returned_lazy_user_ids={"@user2:test", "@user3:test"}, + lazy_load_user_ids=set(), state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"}, - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( # Remove `EventTypes.Member` since there's been a change to that - # state, (persist the change to required state). That way next - # time, they request `EventTypes.Member`, we see that we haven't - # sent it before and send the new state. (if we were tracking - # that we sent any other state, we should still keep track - # that). - # - # This acts the same as the `simple_remove_type` test. It's - # possible that we could remember the specific `state_keys` that - # we have sent down before but this currently just acts the same - # as if a whole `type` was removed. Perhaps it's good that we - # "garbage collect" and forget what we've sent before for a - # given `type` when the client stops caring about a certain - # `type`. + # state, (persist the change to required state). {}, # We don't need to request anything more if they are requesting # less state now StateFilter.none(), + # Previous request did not include any explicit members, + # so nothing to store. + lazy_members_previously_returned=frozenset(), + # Explicitly remove the now invalidated @user2:test + # membership. + # + # We don't invalidate @user3:test as that membership + # hasn't changed. We continue to store the existing lazy + # members since they might be useful for future + # requests. (Alternatively, we could invalidate all + # members in the room when the client stops lazy + # loading, but we opt to keep track of them). + lazy_members_invalidated={"@user2:test"}, ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( # `EventTypes.Member` is no longer requested but since that # state hasn't changed, nothing should change (we should still # keep track that we've sent `EventTypes.Member` before). @@ -4544,6 +4533,11 @@ class RequiredStateChangesTestCase(unittest.TestCase): # We don't need to request anything more if they are requesting # less state now StateFilter.none(), + # Previous request did not include any explicit members, + # so nothing to store. + lazy_members_previously_returned=frozenset(), + # Nothing has been invalidated. + lazy_members_invalidated=frozenset(), ), ), ), @@ -4553,46 +4547,39 @@ class RequiredStateChangesTestCase(unittest.TestCase): Test retracting the `required_state` to no longer lazy-loading room members. """, RequiredStateChangesTestParameters( - previous_required_state_map={ - EventTypes.Member: { - StateValues.LAZY, - "@user2:test", - "@user3:test", - } - }, + previous_required_state_map={EventTypes.Member: {StateValues.LAZY}}, request_required_state_map={EventTypes.Member: {"@user4:test"}}, + previously_returned_lazy_user_ids={"@user2:test", "@user3:test"}, + lazy_load_user_ids={"@user4:test"}, state_deltas={(EventTypes.Member, "@user2:test"): "$event_id"}, - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( # Since "@user4:test" was added, we should persist the changed # required state config. - # + {EventTypes.Member: {"@user4:test"}}, + # We should see the new state_keys added + StateFilter.from_types([(EventTypes.Member, "@user4:test")]), + # Previous request did not include any explicit members, + # so nothing to store. + lazy_members_previously_returned=frozenset(), # Also remove "@user2:test" since that state has changed and is no # longer being requested anymore. Since something was removed, # we also should persist the changed to required state. That way next # time, they request "@user2:test", we see that we haven't sent # it before and send the new state. (we should still keep track # that we've sent specific `EventTypes.Member` before) - { - EventTypes.Member: { - "@user3:test", - "@user4:test", - } - }, - # We should see the new state_keys added - StateFilter.from_types([(EventTypes.Member, "@user4:test")]), + lazy_members_invalidated={"@user2:test"}, ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( # Since "@user4:test" was added, we should persist the changed # required state config. - { - EventTypes.Member: { - "@user2:test", - "@user3:test", - "@user4:test", - } - }, + {EventTypes.Member: {"@user4:test"}}, # We should see the new state_keys added StateFilter.from_types([(EventTypes.Member, "@user4:test")]), + # Previous request did not include any explicit members, + # so nothing to store. + lazy_members_previously_returned=frozenset(), + # We don't invalidate user2 as they haven't changed + lazy_members_invalidated=frozenset(), ), ), ), @@ -4613,7 +4600,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): # room required state config to match the request. And since we we're previously # already fetching everything, we don't have to fetch anything now that they've # narrowed. - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( { StateValues.WILDCARD: { "state_key1", @@ -4623,7 +4610,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): }, StateFilter.none(), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( { StateValues.WILDCARD: { "state_key1", @@ -4649,11 +4636,11 @@ class RequiredStateChangesTestCase(unittest.TestCase): }, state_deltas={("type1", "state_key1"): "$event_id"}, # We've added a wildcard, so we persist the change and request everything - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( {StateValues.WILDCARD: {StateValues.WILDCARD}}, StateFilter.all(), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( {StateValues.WILDCARD: {StateValues.WILDCARD}}, StateFilter.all(), ), @@ -4673,7 +4660,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): # request. And since we we're previously already fetching # everything, we don't have to fetch anything now that they've # narrowed. - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( { "type1": { "state_key1", @@ -4683,7 +4670,7 @@ class RequiredStateChangesTestCase(unittest.TestCase): }, StateFilter.none(), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( { "type1": { "state_key1", @@ -4708,11 +4695,11 @@ class RequiredStateChangesTestCase(unittest.TestCase): # update the effective room required state config to match the # request. And we need to request all of the state for that type # because we previously, only sent down a few keys. - expected_with_state_deltas=( + expected_with_state_deltas=_RequiredStateChangesReturn( {"type1": {StateValues.WILDCARD, "state_key2", "state_key3"}}, StateFilter.from_types([("type1", None)]), ), - expected_without_state_deltas=( + expected_without_state_deltas=_RequiredStateChangesReturn( { "type1": { StateValues.WILDCARD, @@ -4734,42 +4721,66 @@ def test_xxx( test_parameters: RequiredStateChangesTestParameters, ) -> None: # Without `state_deltas` - changed_required_state_map, added_state_filter = _required_state_changes( + state_changes = _required_state_changes( user_id="@user:test", prev_required_state_map=test_parameters.previous_required_state_map, request_required_state_map=test_parameters.request_required_state_map, + previously_returned_lazy_user_ids=test_parameters.previously_returned_lazy_user_ids, + lazy_load_user_ids=test_parameters.lazy_load_user_ids, state_deltas={}, ) self.assertEqual( - changed_required_state_map, - test_parameters.expected_without_state_deltas[0], + state_changes.required_state_map_change, + test_parameters.expected_without_state_deltas.required_state_map_change, "changed_required_state_map does not match (without state_deltas)", ) self.assertEqual( - added_state_filter, - test_parameters.expected_without_state_deltas[1], + state_changes.added_state_filter, + test_parameters.expected_without_state_deltas.added_state_filter, "added_state_filter does not match (without state_deltas)", ) + self.assertEqual( + state_changes.lazy_members_invalidated, + test_parameters.expected_without_state_deltas.lazy_members_invalidated, + "lazy_members_invalidated does not match (without state_deltas)", + ) + self.assertEqual( + state_changes.lazy_members_previously_returned, + test_parameters.expected_without_state_deltas.lazy_members_previously_returned, + "lazy_members_previously_returned does not match (without state_deltas)", + ) # With `state_deltas` - changed_required_state_map, added_state_filter = _required_state_changes( + state_changes = _required_state_changes( user_id="@user:test", prev_required_state_map=test_parameters.previous_required_state_map, request_required_state_map=test_parameters.request_required_state_map, + previously_returned_lazy_user_ids=test_parameters.previously_returned_lazy_user_ids, + lazy_load_user_ids=test_parameters.lazy_load_user_ids, state_deltas=test_parameters.state_deltas, ) self.assertEqual( - changed_required_state_map, - test_parameters.expected_with_state_deltas[0], + state_changes.required_state_map_change, + test_parameters.expected_with_state_deltas.required_state_map_change, "changed_required_state_map does not match (with state_deltas)", ) self.assertEqual( - added_state_filter, - test_parameters.expected_with_state_deltas[1], + state_changes.added_state_filter, + test_parameters.expected_with_state_deltas.added_state_filter, "added_state_filter does not match (with state_deltas)", ) + self.assertEqual( + state_changes.lazy_members_invalidated, + test_parameters.expected_with_state_deltas.lazy_members_invalidated, + "lazy_members_invalidated does not match (with state_deltas)", + ) + self.assertEqual( + state_changes.lazy_members_previously_returned, + test_parameters.expected_with_state_deltas.lazy_members_previously_returned, + "lazy_members_previously_returned does not match (with state_deltas)", + ) @parameterized.expand( [ @@ -4805,12 +4816,16 @@ def test_limit_retained_previous_state_keys( } # (function under test) - changed_required_state_map, added_state_filter = _required_state_changes( + state_changes = _required_state_changes( user_id="@user:test", prev_required_state_map=previous_required_state_map, request_required_state_map=request_required_state_map, + previously_returned_lazy_user_ids=frozenset(), + lazy_load_user_ids=frozenset(), state_deltas={}, ) + changed_required_state_map = state_changes.required_state_map_change + assert changed_required_state_map is not None # We should only remember up to the maximum number of state keys @@ -4874,12 +4889,16 @@ def test_request_more_state_keys_than_remember_limit(self) -> None: ) # (function under test) - changed_required_state_map, added_state_filter = _required_state_changes( + state_changes = _required_state_changes( user_id="@user:test", prev_required_state_map=previous_required_state_map, request_required_state_map=request_required_state_map, + previously_returned_lazy_user_ids=frozenset(), + lazy_load_user_ids=frozenset(), state_deltas={}, ) + changed_required_state_map = state_changes.required_state_map_change + assert changed_required_state_map is not None # Should include all of the requested state diff --git a/tests/rest/client/sliding_sync/test_rooms_required_state.py b/tests/rest/client/sliding_sync/test_rooms_required_state.py index 210280bc488..aa13273e668 100644 --- a/tests/rest/client/sliding_sync/test_rooms_required_state.py +++ b/tests/rest/client/sliding_sync/test_rooms_required_state.py @@ -23,7 +23,11 @@ from synapse.handlers.sliding_sync import StateValues from synapse.rest.client import knock, login, room, sync from synapse.server import HomeServer +from synapse.storage.databases.main.events import DeltaState, SlidingSyncTableChanges +from synapse.storage.databases.main.sliding_sync import LAZY_MEMBERS_UPDATE_INTERVAL_MS +from synapse.types import SlidingSyncStreamToken from synapse.util.clock import Clock +from synapse.util.constants import MILLISECONDS_PER_SECOND from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase from tests.test_utils.event_injection import mark_event_as_partial_state @@ -642,11 +646,6 @@ def test_rooms_required_state_changed_membership_in_timeline_lazy_loading_room_m self._assertRequiredStateIncludes( response_body["rooms"][room_id1]["required_state"], { - # This appears because *some* membership in the room changed and the - # heroes are recalculated and is thrown in because we have it. But this - # is technically optional and not needed because we've already seen user2 - # in the last sync (and their membership hasn't changed). - state_map[(EventTypes.Member, user2_id)], # Appears because there is a message in the timeline from this user state_map[(EventTypes.Member, user4_id)], # Appears because there is a membership event in the timeline from this user @@ -841,6 +840,257 @@ def test_rooms_required_state_expand_retract_expand_lazy_loading_room_members_in exact=True, ) + def test_lazy_members_limited_sync(self) -> None: + """Test that when using lazy loading for room members and a limited sync + missing a membership change, we include the membership change next time + said user says something. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Send a message from each user to the room so that both memberships are sent down. + self.helper.send(room_id1, "1", tok=user1_tok) + self.helper.send(room_id1, "2", tok=user2_tok) + + # Make a first sync with lazy loading for the room members to establish + # a position + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Member, StateValues.LAZY], + ], + "timeline_limit": 2, + } + } + } + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + # We should see both membership events in required_state + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Member, user1_id)], + state_map[(EventTypes.Member, user2_id)], + }, + exact=True, + ) + + # User2 changes their display name (causing a membership change) + self.helper.send_state( + room_id1, + event_type=EventTypes.Member, + state_key=user2_id, + body={ + EventContentFields.MEMBERSHIP: Membership.JOIN, + EventContentFields.MEMBERSHIP_DISPLAYNAME: "New Name", + }, + tok=user2_tok, + ) + + # Send a couple of messages to the room to push out the membership change + self.helper.send(room_id1, "3", tok=user1_tok) + self.helper.send(room_id1, "4", tok=user1_tok) + + # Make an incremental Sliding Sync request + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + + # The membership change should *not* be included yet as user2 doesn't + # have any events in the timeline. + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1].get("required_state", []), + set(), + exact=True, + ) + + # Now user2 sends a message to the room + self.helper.send(room_id1, "5", tok=user2_tok) + + # Make another incremental Sliding Sync request + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + + # The membership change should now be included as user2 has an event + # in the timeline. + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1].get("required_state", []), + { + state_map[(EventTypes.Member, user2_id)], + }, + exact=True, + ) + + def test_lazy_members_across_multiple_rooms(self) -> None: + """Test that lazy loading room members are tracked per-room correctly.""" + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # Create two rooms with both users in them and send a message in each + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.send(room_id1, "room1-msg1", tok=user2_tok) + + room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id2, user1_id, tok=user1_tok) + self.helper.send(room_id2, "room2-msg1", tok=user2_tok) + + # Make a sync with lazy loading for the room members to establish + # a position + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Member, StateValues.LAZY], + ], + "timeline_limit": 1, + } + } + } + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + # We expect to see only user2's membership in both rooms + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Member, user2_id)], + }, + exact=True, + ) + + # Send a message in room1 from user1 + self.helper.send(room_id1, "room1-msg2", tok=user1_tok) + + # Make an incremental Sliding Sync request and check that we get user1's + # membership. + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Member, user1_id)], + }, + exact=True, + ) + + # Send a message in room2 from user1 + self.helper.send(room_id2, "room2-msg2", tok=user1_tok) + + # Make an incremental Sliding Sync request and check that we get user1's + # membership. + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id2) + ) + self._assertRequiredStateIncludes( + response_body["rooms"][room_id2]["required_state"], + { + state_map[(EventTypes.Member, user1_id)], + }, + exact=True, + ) + + def test_lazy_members_forked_position(self) -> None: + """Test that lazy loading room members are tracked correctly when a + connection position is reused""" + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + self.helper.send(room_id1, "1", tok=user2_tok) + + # Make a sync with lazy loading for the room members to establish + # a position + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Member, StateValues.LAZY], + ], + "timeline_limit": 1, + } + } + } + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + # We expect to see only user2's membership in the room + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Member, user2_id)], + }, + exact=True, + ) + + # Send a message in room1 from user1 + self.helper.send(room_id1, "2", tok=user1_tok) + + # Make an incremental Sliding Sync request and check that we get user1's + # membership. + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Member, user1_id)], + }, + exact=True, + ) + + # Now, reuse the original position and check we still get user1's + # membership. + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Member, user1_id)], + }, + exact=True, + ) + def test_rooms_required_state_me(self) -> None: """ Test `rooms.required_state` correctly handles $ME. @@ -1686,3 +1936,254 @@ def test_rooms_required_state_expand_deduplicate(self) -> None: # We should not see the room name again, as we have already sent that # down. self.assertIsNone(response_body["rooms"][room_id1].get("required_state")) + + def test_lazy_loaded_last_seen_ts(self) -> None: + """Test that the `last_seen_ts` column in + `sliding_sync_connection_lazy_members` is correctly kept up to date""" + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + self.helper.join(room_id, user1_id, tok=user1_tok) + + # Send a message so that user1 comes down sync. + self.helper.send(room_id, "msg", tok=user1_tok) + + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Member, StateValues.LAZY], + ], + "timeline_limit": 1, + } + } + } + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Check that user1 is returned + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id) + ) + self._assertRequiredStateIncludes( + response_body["rooms"][room_id]["required_state"], + { + state_map[(EventTypes.Member, user1_id)], + }, + exact=True, + ) + + # Check that we have an entry in sliding_sync_connection_lazy_members + connection_pos1 = self.get_success( + SlidingSyncStreamToken.from_string(self.store, from_token) + ).connection_position + lazy_member_entries = self.get_success( + self.store.get_sliding_sync_connection_lazy_members( + connection_pos1, room_id, {user1_id} + ) + ) + self.assertIn(user1_id, lazy_member_entries) + + prev_timestamp = lazy_member_entries[user1_id] + + # If user1 is sent down again, the last_seen_ts should NOT be updated as + # not enough time has passed. + self.helper.send(room_id, "msg2", tok=user1_tok) + + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + + # We expect the required_state map to be empty as nothing has changed. + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id) + ) + self._assertRequiredStateIncludes( + response_body["rooms"][room_id].get("required_state", []), + {}, + exact=True, + ) + + connection_pos2 = self.get_success( + SlidingSyncStreamToken.from_string(self.store, from_token) + ).connection_position + + lazy_member_entries = self.get_success( + self.store.get_sliding_sync_connection_lazy_members( + connection_pos2, room_id, {user1_id} + ) + ) + + # The timestamp should be unchanged. + self.assertEqual(lazy_member_entries[user1_id], prev_timestamp) + + # Now advance the time by `LAZY_MEMBERS_UPDATE_INTERVAL_MS` so that we + # would update the timestamp. + self.reactor.advance(LAZY_MEMBERS_UPDATE_INTERVAL_MS / MILLISECONDS_PER_SECOND) + + # Send a message from user2 + self.helper.send(room_id, "msg3", tok=user2_tok) + + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + + connection_pos3 = self.get_success( + SlidingSyncStreamToken.from_string(self.store, from_token) + ).connection_position + + lazy_member_entries = self.get_success( + self.store.get_sliding_sync_connection_lazy_members( + connection_pos3, room_id, {user1_id} + ) + ) + + # The timestamp for user1 should be unchanged, as they were not sent down. + self.assertEqual(lazy_member_entries[user1_id], prev_timestamp) + + # If user1 sends a message, then the timestamp should be updated. + self.helper.send(room_id, "msg4", tok=user1_tok) + + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + connection_pos4 = self.get_success( + SlidingSyncStreamToken.from_string(self.store, from_token) + ).connection_position + + lazy_member_entries = self.get_success( + self.store.get_sliding_sync_connection_lazy_members( + connection_pos4, room_id, {user1_id} + ) + ) + # The timestamp for user1 should be updated. + self.assertGreater(lazy_member_entries[user1_id], prev_timestamp) + + def test_lazy_load_state_reset(self) -> None: + """Test that when using lazy-loaded members, if a membership state is + reset to a previous state and the sync is not limited, then we send down + the state reset. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + content = self.helper.join(room_id, user1_id, tok=user1_tok) + first_event_id = content["event_id"] + + # Send a message so that user1 comes down sync. + self.helper.send(room_id, "msg", tok=user1_tok) + + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Member, StateValues.LAZY], + ], + "timeline_limit": 1, + } + } + } + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Check that user1 is returned + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id) + ) + self._assertRequiredStateIncludes( + response_body["rooms"][room_id]["required_state"], + { + state_map[(EventTypes.Member, user1_id)], + }, + exact=True, + ) + + # user1 changes their display name + content = self.helper.send_state( + room_id, + EventTypes.Member, + body={"membership": "join", "displayname": "New display name"}, + state_key=user1_id, + tok=user1_tok, + ) + second_event_id = content["event_id"] + + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + + # We should see the updated membership state + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id) + ) + self._assertRequiredStateIncludes( + response_body["rooms"][room_id]["required_state"], + { + state_map[(EventTypes.Member, user1_id)], + }, + exact=True, + ) + self.assertEqual( + response_body["rooms"][room_id]["required_state"][0]["event_id"], + second_event_id, + ) + + # Now, fake a reset the membership state to the first event + persist_event_store = self.hs.get_datastores().persist_events + assert persist_event_store is not None + + self.get_success( + persist_event_store.update_current_state( + room_id, + DeltaState( + to_insert={(EventTypes.Member, user1_id): first_event_id}, + to_delete=[], + ), + # We don't need to worry about sliding sync changes for this test + SlidingSyncTableChanges( + room_id=room_id, + joined_room_bump_stamp_to_fully_insert=None, + joined_room_updates={}, + membership_snapshot_shared_insert_values={}, + to_insert_membership_snapshots=[], + to_delete_membership_snapshots=[], + ), + ) + ) + + # Send a message from *user2* so that user1 wouldn't normally get + # synced. + self.helper.send(room_id, "msg2", tok=user2_tok) + + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok + ) + + # This should be a non-limited sync + self.assertFalse( + response_body["rooms"][room_id].get("limited", False), + "Expected a non-limited timeline", + ) + + # We should see the reset membership state of user1 + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id) + ) + self._assertRequiredStateIncludes( + response_body["rooms"][room_id]["required_state"], + { + state_map[(EventTypes.Member, user1_id)], + }, + ) + self.assertEqual( + response_body["rooms"][room_id]["required_state"][0]["event_id"], + first_event_id, + )