Skip to content
1 change: 1 addition & 0 deletions changelog.d/19226.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add log to determine whether clients are using `/messages` as expected.
116 changes: 79 additions & 37 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,25 @@
import logging
from typing import TYPE_CHECKING, cast

import attr

from twisted.python.failure import Failure

from synapse.api.constants import Direction, EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.events.utils import SerializeEventConfig
from synapse.events import EventBase
from synapse.handlers.relations import BundledAggregations
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging.opentracing import trace
from synapse.rest.admin._base import assert_user_is_admin
from synapse.streams.config import PaginationConfig
from synapse.types import (
JsonDict,
JsonMapping,
Requester,
ScheduledTask,
StreamKeyType,
StreamToken,
TaskStatus,
)
from synapse.types.handlers import ShutdownRoomParams, ShutdownRoomResponse
Expand Down Expand Up @@ -70,6 +73,58 @@
SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME = "shutdown_and_purge_room"


@attr.s(slots=True, frozen=True, auto_attribs=True)
class GetMessagesResult:
"""
Everything needed to serialize a `/messages` response.
"""

messages_chunk: list[EventBase]
"""
A list of room events.

- When the request is `Direction.FORWARDS`, events will be in the range:
`start_token` < x <= `end_token`, (ascending topological_order)
- When the request is `Direction.BACKWARDS`, events will be in the range:
`start_token` >= x > `end_token`, (descending topological_order)

Note that an empty chunk does not necessarily imply that no more events are
available. Clients should continue to paginate until no `end_token` property is returned.
"""

bundled_aggregations: dict[str, BundledAggregations]
"""
A map of event ID to the bundled aggregations for the events in the chunk.

If an event doesn't have any bundled aggregations, it may not appear in the map.
"""

state: list[EventBase] | None
"""
A list of state events relevant to showing the chunk. For example, if
lazy_load_members is enabled in the filter then this may contain the membership
events for the senders of events in the chunk.

Omitted from the response when `None`.
"""

start_token: StreamToken
"""
Token corresponding to the start of chunk. This will be the same as the value given
in `from` query parameter of the `/messages` request.
"""

end_token: StreamToken | None
"""
A token corresponding to the end of chunk. This token can be passed back to this
endpoint to request further events.

If no further events are available (either because we have reached the start of the
timeline, or because the user does not have permission to see any more events), this
property is omitted from the response.
"""


class PaginationHandler:
"""Handles pagination and purge history requests.

Expand Down Expand Up @@ -418,7 +473,7 @@ async def get_messages(
as_client_event: bool = True,
event_filter: Filter | None = None,
use_admin_priviledge: bool = False,
) -> JsonDict:
) -> GetMessagesResult:
"""Get messages in a room.

Args:
Expand Down Expand Up @@ -617,10 +672,13 @@ async def get_messages(
# In that case we do not return end, to tell the client
# there is no need for further queries.
if not events:
return {
"chunk": [],
"start": await from_token.to_string(self.store),
}
return GetMessagesResult(
messages_chunk=[],
bundled_aggregations={},
state=None,
start_token=from_token,
end_token=None,
)

if event_filter:
events = await event_filter.filter(events)
Expand All @@ -636,11 +694,13 @@ async def get_messages(
# if after the filter applied there are no more events
# return immediately - but there might be more in next_token batch
if not events:
return {
"chunk": [],
"start": await from_token.to_string(self.store),
"end": await next_token.to_string(self.store),
}
return GetMessagesResult(
messages_chunk=[],
bundled_aggregations={},
state=None,
start_token=from_token,
end_token=next_token,
)

state = None
if event_filter and event_filter.lazy_load_members and len(events) > 0:
Expand All @@ -657,38 +717,20 @@ async def get_messages(

if state_ids:
state_dict = await self.store.get_events(list(state_ids.values()))
state = state_dict.values()
state = list(state_dict.values())

aggregations = await self._relations_handler.get_bundled_aggregations(
events, user_id
)

time_now = self.clock.time_msec()

serialize_options = SerializeEventConfig(
as_client_event=as_client_event, requester=requester
return GetMessagesResult(
messages_chunk=events,
bundled_aggregations=aggregations,
state=state,
start_token=from_token,
end_token=next_token,
)

chunk = {
"chunk": (
await self._event_serializer.serialize_events(
events,
time_now,
config=serialize_options,
bundle_aggregations=aggregations,
)
),
"start": await from_token.to_string(self.store),
"end": await next_token.to_string(self.store),
}

if state:
chunk["state"] = await self._event_serializer.serialize_events(
state, time_now, config=serialize_options
)

return chunk
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better separation of concerns. We now do the serializing/encoding at the REST layer instead of in the handler (see encode_messages_response)


async def _shutdown_and_purge_room(
self,
task: ScheduledTask,
Expand Down
35 changes: 33 additions & 2 deletions synapse/rest/admin/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@
from synapse.api.constants import Direction, EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
from synapse.api.filtering import Filter
from synapse.events.utils import (
SerializeEventConfig,
)
from synapse.handlers.pagination import (
PURGE_ROOM_ACTION_NAME,
SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME,
GetMessagesResult,
)
from synapse.http.servlet import (
ResolveRoomIdMixin,
Expand All @@ -44,11 +48,13 @@
parse_string,
)
from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import trace
from synapse.rest.admin._base import (
admin_patterns,
assert_requester_is_admin,
assert_user_is_admin,
)
from synapse.rest.client.room import SerializeMessagesDeps, encode_messages_response
from synapse.storage.databases.main.room import RoomSortOrder
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, RoomID, ScheduledTask, UserID, create_requester
Expand Down Expand Up @@ -976,6 +982,7 @@ def __init__(self, hs: "HomeServer"):
self._pagination_handler = hs.get_pagination_handler()
self._auth = hs.get_auth()
self._store = hs.get_datastores().main
self._event_serializer = hs.get_event_client_serializer()

async def on_GET(
self, request: SynapseRequest, room_id: str
Expand All @@ -999,7 +1006,11 @@ async def on_GET(
):
as_client_event = False

msgs = await self._pagination_handler.get_messages(
serialize_options = SerializeEventConfig(
as_client_event=as_client_event, requester=requester
)

get_messages_result = await self._pagination_handler.get_messages(
room_id=room_id,
requester=requester,
pagin_config=pagination_config,
Expand All @@ -1008,7 +1019,27 @@ async def on_GET(
use_admin_priviledge=True,
)

return HTTPStatus.OK, msgs
response_content = await self.encode_response(
get_messages_result, serialize_options
)

return HTTPStatus.OK, response_content

@trace
async def encode_response(
self,
get_messages_result: GetMessagesResult,
serialize_options: SerializeEventConfig,
) -> JsonDict:
return await encode_messages_response(
get_messages_result=get_messages_result,
serialize_options=serialize_options,
serialize_deps=SerializeMessagesDeps(
clock=self._clock,
event_serializer=self._event_serializer,
store=self._store,
),
)


class RoomTimestampToEventRestServlet(RestServlet):
Expand Down
Loading
Loading