From b7b924c370fd9123086257e4f735b7ad0d0c7033 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 25 Nov 2025 14:10:43 -0600 Subject: [PATCH 1/9] Better typing and separation of concerns for the `/messages` endpoint --- synapse/handlers/pagination.py | 113 ++++++++++++++++++++++----------- synapse/rest/client/room.py | 48 +++++++++++++- 2 files changed, 121 insertions(+), 40 deletions(-) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index a90ed3193cd..07f8d9a5880 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -21,22 +21,25 @@ import logging from typing import TYPE_CHECKING, cast +import attr + from twisted.python.failure import Failure from synapse.api.constants import Direction, EventTypes, Membership from synapse.api.errors import SynapseError from synapse.api.filtering import Filter -from synapse.events.utils import SerializeEventConfig +from synapse.events import EventBase +from synapse.handlers.relations import BundledAggregations from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME from synapse.logging.opentracing import trace from synapse.rest.admin._base import assert_user_is_admin from synapse.streams.config import PaginationConfig from synapse.types import ( - JsonDict, JsonMapping, Requester, ScheduledTask, StreamKeyType, + StreamToken, TaskStatus, ) from synapse.types.handlers import ShutdownRoomParams, ShutdownRoomResponse @@ -69,6 +72,55 @@ SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME = "shutdown_and_purge_room" +@attr.s(slots=True, frozen=True, auto_attribs=True) +class GetMessagesResult: + """ + Everything needed to serialize a `/messages` response. + """ + + messages_chunk: list[EventBase] + """ + A list of room events. + + When the request was `Direction.FORWARDS`, the events are in chronological order. + + When the request was `Direction.BACKWARDS`, the events are in reverse chronological order. + + Note that an empty chunk does not necessarily imply that no more events are + available. Clients should continue to paginate until no `end_token` property is returned. + """ + + bundled_aggregations: dict[str, BundledAggregations] + """ + A map of event ID to the bundled aggregations for the events in the chunk. + + If an event doesn't have any bundled aggregations, it may not appear in the map. + """ + + state: list[EventBase] | None + """ + A list of state events relevant to showing the chunk. For example, if + lazy_load_members is enabled in the filter then this may contain the membership + events for the senders of events in the chunk. + """ + + start_token: StreamToken + """ + Token corresponding to the start of chunk. This will be the same as the value given + in `from` query parameter of the `/messages` request. + """ + + end_token: StreamToken | None + """ + A token corresponding to the end of chunk. This token can be passed back to this + endpoint to request further events. + + If no further events are available (either because we have reached the start of the + timeline, or because the user does not have permission to see any more events), this + property is omitted from the response. + """ + + class PaginationHandler: """Handles pagination and purge history requests. @@ -417,7 +469,7 @@ async def get_messages( as_client_event: bool = True, event_filter: Filter | None = None, use_admin_priviledge: bool = False, - ) -> JsonDict: + ) -> GetMessagesResult: """Get messages in a room. Args: @@ -616,10 +668,13 @@ async def get_messages( # In that case we do not return end, to tell the client # there is no need for further queries. if not events: - return { - "chunk": [], - "start": await from_token.to_string(self.store), - } + return GetMessagesResult( + messages_chunk=[], + bundled_aggregations={}, + state=None, + start_token=from_token, + end_token=None, + ) if event_filter: events = await event_filter.filter(events) @@ -635,11 +690,13 @@ async def get_messages( # if after the filter applied there are no more events # return immediately - but there might be more in next_token batch if not events: - return { - "chunk": [], - "start": await from_token.to_string(self.store), - "end": await next_token.to_string(self.store), - } + return GetMessagesResult( + messages_chunk=[], + bundled_aggregations={}, + state=None, + start_token=from_token, + end_token=next_token, + ) state = None if event_filter and event_filter.lazy_load_members and len(events) > 0: @@ -656,38 +713,20 @@ async def get_messages( if state_ids: state_dict = await self.store.get_events(list(state_ids.values())) - state = state_dict.values() + state = list(state_dict.values()) aggregations = await self._relations_handler.get_bundled_aggregations( events, user_id ) - time_now = self.clock.time_msec() - - serialize_options = SerializeEventConfig( - as_client_event=as_client_event, requester=requester + return GetMessagesResult( + messages_chunk=events, + bundled_aggregations=aggregations, + state=state, + start_token=from_token, + end_token=next_token, ) - chunk = { - "chunk": ( - await self._event_serializer.serialize_events( - events, - time_now, - config=serialize_options, - bundle_aggregations=aggregations, - ) - ), - "start": await from_token.to_string(self.store), - "end": await next_token.to_string(self.store), - } - - if state: - chunk["state"] = await self._event_serializer.serialize_events( - state, time_now, config=serialize_options - ) - - return chunk - async def _shutdown_and_purge_room( self, task: ScheduledTask, diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 81a6bd57fc3..775eff635e0 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -49,6 +49,7 @@ format_event_for_client_v2, serialize_event, ) +from synapse.handlers.pagination import GetMessagesResult from synapse.http.server import HttpServer from synapse.http.servlet import ( ResolveRoomIdMixin, @@ -64,7 +65,7 @@ ) from synapse.http.site import SynapseRequest from synapse.logging.context import make_deferred_yieldable, run_in_background -from synapse.logging.opentracing import set_tag +from synapse.logging.opentracing import set_tag, trace from synapse.metrics import SERVER_NAME_LABEL from synapse.rest.client._base import client_patterns from synapse.rest.client.transactions import HttpTransactionCache @@ -806,6 +807,7 @@ def __init__(self, hs: "HomeServer"): self.pagination_handler = hs.get_pagination_handler() self.auth = hs.get_auth() self.store = hs.get_datastores().main + self.event_serializer = hs.get_event_client_serializer() async def on_GET( self, request: SynapseRequest, room_id: str @@ -839,7 +841,11 @@ async def on_GET( ): as_client_event = False - msgs = await self.pagination_handler.get_messages( + serialize_options = SerializeEventConfig( + as_client_event=as_client_event, requester=requester + ) + + get_messages_result = await self.pagination_handler.get_messages( room_id=room_id, requester=requester, pagin_config=pagination_config, @@ -847,6 +853,10 @@ async def on_GET( event_filter=event_filter, ) + response_content = await self.encode_response( + get_messages_result, serialize_options + ) + processing_end_time = self.clock.time_msec() room_member_count = await make_deferred_yieldable(room_member_count_deferred) messsages_response_timer.labels( @@ -854,7 +864,39 @@ async def on_GET( **{SERVER_NAME_LABEL: self.server_name}, ).observe((processing_end_time - processing_start_time) / 1000) - return 200, msgs + return 200, response_content + + @trace + async def encode_response( + self, + get_messages_result: GetMessagesResult, + serialize_options: SerializeEventConfig, + ) -> JsonDict: + time_now = self.clock.time_msec() + + serialized_result = { + "chunk": ( + await self.event_serializer.serialize_events( + get_messages_result.messages_chunk, + time_now, + config=serialize_options, + bundle_aggregations=get_messages_result.bundled_aggregations, + ) + ), + "start": await get_messages_result.start_token.to_string(self.store), + } + + if get_messages_result.end_token: + serialized_result["end"] = await get_messages_result.end_token.to_string( + self.store + ) + + if get_messages_result.state: + serialized_result["state"] = await self.event_serializer.serialize_events( + get_messages_result.state, time_now, config=serialize_options + ) + + return serialized_result # TODO: Needs unit testing From 61ed7e6720ea080b8a399d5e0027061a9f746686 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 25 Nov 2025 14:23:54 -0600 Subject: [PATCH 2/9] Update admin endpoint --- synapse/rest/admin/rooms.py | 35 ++++++++++++++- synapse/rest/client/room.py | 86 ++++++++++++++++++++++++++----------- 2 files changed, 95 insertions(+), 26 deletions(-) diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index cf24bc628ab..a886859ffab 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -28,9 +28,13 @@ from synapse.api.constants import Direction, EventTypes, JoinRules, Membership from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError from synapse.api.filtering import Filter +from synapse.events.utils import ( + SerializeEventConfig, +) from synapse.handlers.pagination import ( PURGE_ROOM_ACTION_NAME, SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME, + GetMessagesResult, ) from synapse.http.servlet import ( ResolveRoomIdMixin, @@ -44,11 +48,13 @@ parse_string, ) from synapse.http.site import SynapseRequest +from synapse.logging.opentracing import trace from synapse.rest.admin._base import ( admin_patterns, assert_requester_is_admin, assert_user_is_admin, ) +from synapse.rest.client.room import SerializeMessagesDeps, encode_messages_response from synapse.storage.databases.main.room import RoomSortOrder from synapse.streams.config import PaginationConfig from synapse.types import JsonDict, RoomID, ScheduledTask, UserID, create_requester @@ -976,6 +982,7 @@ def __init__(self, hs: "HomeServer"): self._pagination_handler = hs.get_pagination_handler() self._auth = hs.get_auth() self._store = hs.get_datastores().main + self._event_serializer = hs.get_event_client_serializer() async def on_GET( self, request: SynapseRequest, room_id: str @@ -999,7 +1006,11 @@ async def on_GET( ): as_client_event = False - msgs = await self._pagination_handler.get_messages( + serialize_options = SerializeEventConfig( + as_client_event=as_client_event, requester=requester + ) + + get_messages_result = await self._pagination_handler.get_messages( room_id=room_id, requester=requester, pagin_config=pagination_config, @@ -1008,7 +1019,27 @@ async def on_GET( use_admin_priviledge=True, ) - return HTTPStatus.OK, msgs + response_content = await self.encode_response( + get_messages_result, serialize_options + ) + + return HTTPStatus.OK, response_content + + @trace + async def encode_response( + self, + get_messages_result: GetMessagesResult, + serialize_options: SerializeEventConfig, + ) -> JsonDict: + return await encode_messages_response( + get_messages_result=get_messages_result, + serialize_options=serialize_options, + serialize_deps=SerializeMessagesDeps( + clock=self._clock, + event_serializer=self._event_serializer, + store=self._store, + ), + ) class RoomTimestampToEventRestServlet(RestServlet): diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 775eff635e0..1cd0535b6d1 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -28,6 +28,7 @@ from typing import TYPE_CHECKING, Awaitable from urllib import parse as urlparse +import attr from prometheus_client.core import Histogram from twisted.web.server import Request @@ -45,6 +46,7 @@ ) from synapse.api.filtering import Filter from synapse.events.utils import ( + EventClientSerializer, SerializeEventConfig, format_event_for_client_v2, serialize_event, @@ -70,10 +72,12 @@ from synapse.rest.client._base import client_patterns from synapse.rest.client.transactions import HttpTransactionCache from synapse.state import CREATE_KEY, POWER_KEY +from synapse.storage.databases.main import DataStore from synapse.streams.config import PaginationConfig from synapse.types import JsonDict, Requester, StreamToken, ThirdPartyInstanceID, UserID from synapse.types.state import StateFilter from synapse.util.cancellation import cancellable +from synapse.util.clock import Clock from synapse.util.events import generate_fake_event_id from synapse.util.stringutils import parse_and_validate_server_name @@ -791,6 +795,56 @@ async def on_GET( return 200, {"joined": users_with_profile} +@attr.s(slots=True, frozen=True, auto_attribs=True) +class SerializeMessagesDeps: + clock: Clock + event_serializer: EventClientSerializer + store: DataStore + + +@trace +async def encode_messages_response( + *, + get_messages_result: GetMessagesResult, + serialize_options: SerializeEventConfig, + serialize_deps: SerializeMessagesDeps, +) -> JsonDict: + """ + Serialize a `GetMessagesResult` into the JSON response format for the `/messages` + endpoint. + + This logic is shared between the client API and Synapse admin API. + """ + + time_now = serialize_deps.clock.time_msec() + + serialized_result = { + "chunk": ( + await serialize_deps.event_serializer.serialize_events( + get_messages_result.messages_chunk, + time_now, + config=serialize_options, + bundle_aggregations=get_messages_result.bundled_aggregations, + ) + ), + "start": await get_messages_result.start_token.to_string(serialize_deps.store), + } + + if get_messages_result.end_token: + serialized_result["end"] = await get_messages_result.end_token.to_string( + serialize_deps.store + ) + + if get_messages_result.state: + serialized_result[ + "state" + ] = await serialize_deps.event_serializer.serialize_events( + get_messages_result.state, time_now, config=serialize_options + ) + + return serialized_result + + # TODO: Needs better unit testing class RoomMessageListRestServlet(RestServlet): PATTERNS = client_patterns("/rooms/(?P[^/]*)/messages$", v1=True) @@ -872,31 +926,15 @@ async def encode_response( get_messages_result: GetMessagesResult, serialize_options: SerializeEventConfig, ) -> JsonDict: - time_now = self.clock.time_msec() - - serialized_result = { - "chunk": ( - await self.event_serializer.serialize_events( - get_messages_result.messages_chunk, - time_now, - config=serialize_options, - bundle_aggregations=get_messages_result.bundled_aggregations, - ) + return await encode_messages_response( + get_messages_result=get_messages_result, + serialize_options=serialize_options, + serialize_deps=SerializeMessagesDeps( + clock=self.clock, + event_serializer=self.event_serializer, + store=self.store, ), - "start": await get_messages_result.start_token.to_string(self.store), - } - - if get_messages_result.end_token: - serialized_result["end"] = await get_messages_result.end_token.to_string( - self.store - ) - - if get_messages_result.state: - serialized_result["state"] = await self.event_serializer.serialize_events( - get_messages_result.state, time_now, config=serialize_options - ) - - return serialized_result + ) # TODO: Needs unit testing From 1b50622b8132a4428d88f4bd11aa6b503e723dae Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 25 Nov 2025 14:55:20 -0600 Subject: [PATCH 3/9] Add log for `/messages` responses --- synapse/rest/client/room.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 1cd0535b6d1..39cfe8bc31a 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -907,6 +907,18 @@ async def on_GET( event_filter=event_filter, ) + # Useful for debugging timeline/pagination issues. For example, if a client + # isn't seeing the full history, we can check the homeserver logs to see if the + # client just never made the next request with the given `end` token. + logger.info( + "Responding to `/messages` request: {%s} %s -> %d messages with end_token=%s", + requester.user.to_string(), + request.get_method(), + request.get_redacted_uri(), + len(get_messages_result.messages_chunk), + get_messages_result.end_token, + ) + response_content = await self.encode_response( get_messages_result, serialize_options ) From 085ad0ada64a6b0ec1988eea53e03112a5c97453 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 25 Nov 2025 15:03:09 -0600 Subject: [PATCH 4/9] Add changelog --- changelog.d/19226.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/19226.misc diff --git a/changelog.d/19226.misc b/changelog.d/19226.misc new file mode 100644 index 00000000000..c38d1d3ef69 --- /dev/null +++ b/changelog.d/19226.misc @@ -0,0 +1 @@ +Add log to determine whether clients are using `/messages` as expected. From 643aaf536729081cd060683c360d8c4f526b1054 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 25 Nov 2025 15:08:36 -0600 Subject: [PATCH 5/9] Fix format string --- synapse/rest/client/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 39cfe8bc31a..faf11a22bc9 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -911,7 +911,7 @@ async def on_GET( # isn't seeing the full history, we can check the homeserver logs to see if the # client just never made the next request with the given `end` token. logger.info( - "Responding to `/messages` request: {%s} %s -> %d messages with end_token=%s", + "Responding to `/messages` request: {%s} %s %s -> %d messages with end_token=%s", requester.user.to_string(), request.get_method(), request.get_redacted_uri(), From 1401a7d9e0fb5f0df887da0dfaeb3a7b897f0c3e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 25 Nov 2025 15:08:48 -0600 Subject: [PATCH 6/9] Serialize token for easy ctrl+f in the next request --- synapse/rest/client/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index faf11a22bc9..6a1d3a6a703 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -916,7 +916,7 @@ async def on_GET( request.get_method(), request.get_redacted_uri(), len(get_messages_result.messages_chunk), - get_messages_result.end_token, + await get_messages_result.end_token.to_string(self.store), ) response_content = await self.encode_response( From 958647b5cf9bc8817f7ba2f131fa4d2491b908a6 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 25 Nov 2025 15:11:00 -0600 Subject: [PATCH 7/9] Fix lint --- synapse/rest/client/room.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 6a1d3a6a703..e972d4c559f 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -916,7 +916,9 @@ async def on_GET( request.get_method(), request.get_redacted_uri(), len(get_messages_result.messages_chunk), - await get_messages_result.end_token.to_string(self.store), + (await get_messages_result.end_token.to_string(self.store)) + if get_messages_result.end_token + else None, ) response_content = await self.encode_response( From b9e2a4694d249ad04295385ac3d00c2f356b372f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 25 Nov 2025 16:32:10 -0600 Subject: [PATCH 8/9] Better explanation of order --- synapse/handlers/pagination.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 07f8d9a5880..e26894470a5 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -82,9 +82,10 @@ class GetMessagesResult: """ A list of room events. - When the request was `Direction.FORWARDS`, the events are in chronological order. - - When the request was `Direction.BACKWARDS`, the events are in reverse chronological order. + - When the request is `Direction.FORWARDS`, events will be in the range: + `start_token` < x <= `end_token`, (ascending topological_order) + - When the request is `Direction.BACKWARDS`, events will be in the range: + `start_token` >= x > `end_token`, (descending topological_order) Note that an empty chunk does not necessarily imply that no more events are available. Clients should continue to paginate until no `end_token` property is returned. From cee53441c767efbcd7db551bcb71e41a2eb6027a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 1 Dec 2025 15:05:07 -0600 Subject: [PATCH 9/9] Omit when `None` --- synapse/handlers/pagination.py | 2 ++ synapse/rest/client/room.py | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 1eae1ce5d23..63e5dfa70c5 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -104,6 +104,8 @@ class GetMessagesResult: A list of state events relevant to showing the chunk. For example, if lazy_load_members is enabled in the filter then this may contain the membership events for the senders of events in the chunk. + + Omitted from the response when `None`. """ start_token: StreamToken diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index e972d4c559f..5e7dcb01911 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -830,12 +830,12 @@ async def encode_messages_response( "start": await get_messages_result.start_token.to_string(serialize_deps.store), } - if get_messages_result.end_token: + if get_messages_result.end_token is not None: serialized_result["end"] = await get_messages_result.end_token.to_string( serialize_deps.store ) - if get_messages_result.state: + if get_messages_result.state is not None: serialized_result[ "state" ] = await serialize_deps.event_serializer.serialize_events(