Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19229.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move towards using a dedicated `Duration` type.
9 changes: 7 additions & 2 deletions rust/src/rendezvous/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use mime::Mime;
use pyo3::{
exceptions::PyValueError,
pyclass, pymethods,
types::{PyAnyMethods, PyModule, PyModuleMethods},
types::{IntoPyDict, PyAnyMethods, PyModule, PyModuleMethods},
Bound, IntoPyObject, Py, PyAny, PyResult, Python,
};
use ulid::Ulid;
Expand Down Expand Up @@ -132,6 +132,11 @@ impl RendezvousHandler {
.unwrap_infallible()
.unbind();

let duration_module = py.import("synapse.util.duration")?;

let kwargs = [("milliseconds", eviction_interval)].into_py_dict(py)?;
let eviction_duration = duration_module.call_method("Duration", (), Some(&kwargs))?;
Comment on lines +135 to +138
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this is a little horrible. Is it possible to make a Rust struct that wraps synapse.util.duration that we can easily work with?


// Construct a Python object so that we can get a reference to the
// evict method and schedule it to run.
let self_ = Py::new(
Expand All @@ -149,7 +154,7 @@ impl RendezvousHandler {
let evict = self_.getattr(py, "_evict")?;
homeserver.call_method0("get_clock")?.call_method(
"looping_call",
(evict, eviction_interval),
(evict, eviction_duration),
None,
)?;

Expand Down
3 changes: 2 additions & 1 deletion synapse/api/ratelimiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from synapse.storage.databases.main import DataStore
from synapse.types import Requester
from synapse.util.clock import Clock
from synapse.util.duration import Duration
from synapse.util.wheel_timer import WheelTimer

if TYPE_CHECKING:
Expand Down Expand Up @@ -100,7 +101,7 @@ def __init__(
# and doesn't affect correctness.
self._timer: WheelTimer[Hashable] = WheelTimer()

self.clock.looping_call(self._prune_message_counts, 15 * 1000)
self.clock.looping_call(self._prune_message_counts, Duration(seconds=15))

def _get_key(self, requester: Requester | None, key: Hashable | None) -> Hashable:
"""Use the requester's MXID as a fallback key if no key is provided."""
Expand Down
12 changes: 6 additions & 6 deletions synapse/app/phone_stats_home.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,13 @@ def performance_stats_init() -> None:
# table will decrease
clock.looping_call(
hs.get_datastores().main.generate_user_daily_visits,
Duration(minutes=5).as_millis(),
Duration(minutes=5),
)

# monthly active user limiting functionality
clock.looping_call(
hs.get_datastores().main.reap_monthly_active_users,
Duration(hours=1).as_millis(),
Duration(hours=1),
)
hs.get_datastores().main.reap_monthly_active_users()

Expand Down Expand Up @@ -263,29 +263,29 @@ async def _generate_monthly_active_users() -> None:

if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only:
generate_monthly_active_users()
clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
clock.looping_call(generate_monthly_active_users, Duration(minutes=5))
# End of monthly active user settings

if hs.config.metrics.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals")
clock.looping_call(
phone_stats_home,
PHONE_HOME_INTERVAL.as_millis(),
PHONE_HOME_INTERVAL,
hs,
stats,
)

# We need to defer this init for the cases that we daemonize
# otherwise the process ID we get is that of the non-daemon process
clock.call_later(
0,
Duration(seconds=0),
performance_stats_init,
)

# We wait 5 minutes to send the first set of stats as the server can
# be quite busy the first few minutes
clock.call_later(
INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME.as_secs(),
INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME,
phone_stats_home,
hs,
stats,
Expand Down
3 changes: 2 additions & 1 deletion synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
from synapse.storage.databases.main import DataStore
from synapse.types import DeviceListUpdates, JsonMapping
from synapse.util.clock import Clock, DelayedCallWrapper
from synapse.util.duration import Duration

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -504,7 +505,7 @@ def __init__(
self.scheduled_recovery: DelayedCallWrapper | None = None

def recover(self) -> None:
delay = 2**self.backoff_counter
delay = Duration(seconds=2**self.backoff_counter)
logger.info("Scheduling retries on %s in %fs", self.service.id, delay)
self.scheduled_recovery = self.clock.call_later(
delay,
Expand Down
3 changes: 2 additions & 1 deletion synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
from synapse.types.handlers.policy_server import RECOMMENDATION_OK, RECOMMENDATION_SPAM
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.duration import Duration
from synapse.util.retryutils import NotRetryingDestination

if TYPE_CHECKING:
Expand Down Expand Up @@ -132,7 +133,7 @@ def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self.pdu_destination_tried: dict[str, dict[str, int]] = {}
self._clock.looping_call(self._clear_tried_cache, 60 * 1000)
self._clock.looping_call(self._clear_tried_cache, Duration(minutes=1))
self.state = hs.get_state_handler()
self.transport_layer = hs.get_federation_transport_client()

Expand Down
7 changes: 5 additions & 2 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute, gather_results
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.duration import Duration
from synapse.util.stringutils import parse_server_name

if TYPE_CHECKING:
Expand Down Expand Up @@ -226,7 +227,7 @@ async def _handle_old_staged_events(self) -> None:
)

# We pause a bit so that we don't start handling all rooms at once.
await self._clock.sleep(random.uniform(0, 0.1))
await self._clock.sleep(Duration(seconds=random.uniform(0, 0.1)))

async def on_backfill_request(
self, origin: str, room_id: str, versions: list[str], limit: int
Expand Down Expand Up @@ -301,7 +302,9 @@ async def on_incoming_transaction(
# Start a periodic check for old staged events. This is to handle
# the case where locks time out, e.g. if another process gets killed
# without dropping its locks.
self._clock.looping_call(self._handle_old_staged_events, 60 * 1000)
self._clock.looping_call(
self._handle_old_staged_events, Duration(minutes=1)
)

# keep this as early as possible to make the calculated origin ts as
# accurate as possible.
Expand Down
3 changes: 2 additions & 1 deletion synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from synapse.metrics import SERVER_NAME_LABEL, LaterGauge
from synapse.replication.tcp.streams.federation import FederationStream
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection
from synapse.util.duration import Duration
from synapse.util.metrics import Measure

from .units import Edu
Expand Down Expand Up @@ -137,7 +138,7 @@ def register(queue_name: QueueNames, queue: Sized) -> None:
assert isinstance(queue, Sized)
register(queue_name, queue=queue)

self.clock.looping_call(self._clear_queue, 30 * 1000)
self.clock.looping_call(self._clear_queue, Duration(seconds=30))

def shutdown(self) -> None:
"""Stops this federation sender instance from sending further transactions."""
Expand Down
13 changes: 7 additions & 6 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
get_domain_from_id,
)
from synapse.util.clock import Clock
from synapse.util.duration import Duration
from synapse.util.metrics import Measure
from synapse.util.retryutils import filter_destinations_by_retry_limiter

Expand Down Expand Up @@ -218,12 +219,12 @@
# Please note that rate limiting still applies, so while the loop is
# executed every X seconds the destinations may not be woken up because
# they are being rate limited following previous attempt failures.
WAKEUP_RETRY_PERIOD_SEC = 60
WAKEUP_RETRY_PERIOD = Duration(minutes=1)

# Time (in s) to wait in between waking up each destination, i.e. one destination
# Time to wait in between waking up each destination, i.e. one destination
# will be woken up every <x> seconds until we have woken every destination
# has outstanding catch-up.
WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC = 5
WAKEUP_INTERVAL_BETWEEN_DESTINATIONS = Duration(seconds=5)


class AbstractFederationSender(metaclass=abc.ABCMeta):
Expand Down Expand Up @@ -379,7 +380,7 @@ async def _handle(self) -> None:

queue.attempt_new_transaction()

await self.clock.sleep(current_sleep_seconds)
await self.clock.sleep(Duration(seconds=current_sleep_seconds))

if not self.queue:
break
Expand Down Expand Up @@ -468,7 +469,7 @@ def __init__(self, hs: "HomeServer"):
# Regularly wake up destinations that have outstanding PDUs to be caught up
self.clock.looping_call_now(
self.hs.run_as_background_process,
WAKEUP_RETRY_PERIOD_SEC * 1000.0,
WAKEUP_RETRY_PERIOD,
"wake_destinations_needing_catchup",
self._wake_destinations_needing_catchup,
)
Expand Down Expand Up @@ -1161,4 +1162,4 @@ async def _wake_destinations_needing_catchup(self) -> None:
last_processed,
)
self.wake_destination(destination)
await self.clock.sleep(WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC)
await self.clock.sleep(WAKEUP_INTERVAL_BETWEEN_DESTINATIONS)
3 changes: 2 additions & 1 deletion synapse/handlers/account_validity.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from synapse.types import UserID
from synapse.util import stringutils
from synapse.util.async_helpers import delay_cancellation
from synapse.util.duration import Duration

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -73,7 +74,7 @@ def __init__(self, hs: "HomeServer"):

# Check the renewal emails to send and send them every 30min.
if hs.config.worker.run_background_tasks:
self.clock.looping_call(self._send_renewal_emails, 30 * 60 * 1000)
self.clock.looping_call(self._send_renewal_emails, Duration(minutes=30))

async def is_user_expired(self, user_id: str) -> bool:
"""Checks if a user has expired against third-party modules.
Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
from synapse.types import JsonDict, Requester, StrCollection, UserID
from synapse.util import stringutils as stringutils
from synapse.util.async_helpers import delay_cancellation, maybe_awaitable
from synapse.util.duration import Duration
from synapse.util.msisdn import phone_number_to_msisdn
from synapse.util.stringutils import base62_encode
from synapse.util.threepids import canonicalise_email
Expand Down Expand Up @@ -242,7 +243,7 @@ def __init__(self, hs: "HomeServer"):
if hs.config.worker.run_background_tasks:
self._clock.looping_call(
run_as_background_process,
5 * 60 * 1000,
Duration(minutes=5),
"expire_old_sessions",
self.server_name,
self._expire_old_sessions,
Expand Down
9 changes: 5 additions & 4 deletions synapse/handlers/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
UserID,
create_requester,
)
from synapse.util.duration import Duration
from synapse.util.events import generate_fake_event_id
from synapse.util.metrics import Measure
from synapse.util.sentinel import Sentinel
Expand Down Expand Up @@ -92,7 +93,7 @@ async def _schedule_db_events() -> None:
# Kick off again (without blocking) to catch any missed notifications
# that may have fired before the callback was added.
self._clock.call_later(
0,
Duration(seconds=0),
self.notify_new_event,
)

Expand Down Expand Up @@ -501,17 +502,17 @@ def _schedule_next_at_or_none(self, next_send_ts: Timestamp | None) -> None:

def _schedule_next_at(self, next_send_ts: Timestamp) -> None:
delay = next_send_ts - self._get_current_ts()
delay_sec = delay / 1000 if delay > 0 else 0
delay_duration = Duration(milliseconds=max(delay, 0))

if self._next_delayed_event_call is None:
self._next_delayed_event_call = self._clock.call_later(
delay_sec,
delay_duration,
self.hs.run_as_background_process,
"_send_on_timeout",
self._send_on_timeout,
)
else:
self._next_delayed_event_call.reset(delay_sec)
self._next_delayed_event_call.reset(delay_duration.as_secs())

async def get_all_for_user(self, requester: Requester) -> list[JsonDict]:
"""Return all pending delayed events requested by the given user."""
Expand Down
13 changes: 6 additions & 7 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.cancellation import cancellable
from synapse.util.duration import Duration
from synapse.util.metrics import measure_func
from synapse.util.retryutils import (
NotRetryingDestination,
Expand All @@ -85,7 +86,7 @@

DELETE_DEVICE_MSGS_TASK_NAME = "delete_device_messages"
MAX_DEVICE_DISPLAY_NAME_LEN = 100
DELETE_STALE_DEVICES_INTERVAL_MS = 24 * 60 * 60 * 1000
DELETE_STALE_DEVICES_INTERVAL = Duration(days=1)


def _check_device_name_length(name: str | None) -> None:
Expand Down Expand Up @@ -186,7 +187,7 @@ def __init__(self, hs: "HomeServer"):
):
self.clock.looping_call(
self.hs.run_as_background_process,
DELETE_STALE_DEVICES_INTERVAL_MS,
DELETE_STALE_DEVICES_INTERVAL,
desc="delete_stale_devices",
func=self._delete_stale_devices,
)
Expand Down Expand Up @@ -915,7 +916,7 @@ async def handle_new_device_update(self) -> None:
)

DEVICE_MSGS_DELETE_BATCH_LIMIT = 1000
DEVICE_MSGS_DELETE_SLEEP_MS = 100
DEVICE_MSGS_DELETE_SLEEP = Duration(milliseconds=100)

async def _delete_device_messages(
self,
Expand All @@ -941,9 +942,7 @@ async def _delete_device_messages(
if from_stream_id is None:
return TaskStatus.COMPLETE, None, None

await self.clock.sleep(
DeviceWriterHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0
)
await self.clock.sleep(DeviceWriterHandler.DEVICE_MSGS_DELETE_SLEEP)


class DeviceWriterHandler(DeviceHandler):
Expand Down Expand Up @@ -1469,7 +1468,7 @@ def __init__(self, hs: "HomeServer", device_handler: DeviceWriterHandler):
self._resync_retry_lock = Lock()
self.clock.looping_call(
self.hs.run_as_background_process,
30 * 1000,
Duration(seconds=30),
func=self._maybe_retry_device_resync,
desc="_maybe_retry_device_resync",
)
Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
)
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.cancellation import cancellable
from synapse.util.duration import Duration
from synapse.util.json import json_decoder
from synapse.util.retryutils import (
NotRetryingDestination,
Expand Down Expand Up @@ -1634,7 +1635,7 @@ async def _delete_old_one_time_keys_task(
# matrix.org has about 15M users in the e2e_one_time_keys_json table
# (comprising 20M devices). We want this to take about a week, so we need
# to do about one batch of 100 users every 4 seconds.
await self.clock.sleep(4)
await self.clock.sleep(Duration(seconds=4))


def _check_cross_signing_key(
Expand Down
5 changes: 4 additions & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
from synapse.types import JsonDict, StrCollection, get_domain_from_id
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer
from synapse.util.duration import Duration
from synapse.util.retryutils import NotRetryingDestination
from synapse.visibility import filter_events_for_server

Expand Down Expand Up @@ -1972,7 +1973,9 @@ async def _sync_partial_state_room(
logger.warning(
"%s; waiting for %d ms...", e, e.retry_after_ms
)
await self.clock.sleep(e.retry_after_ms / 1000)
await self.clock.sleep(
Duration(milliseconds=e.retry_after_ms)
)

# Success, no need to try the rest of the destinations.
break
Expand Down
Loading
Loading