# # This file is licensed under the Affero General Public License (AGPL) version 3. # # Copyright (C) 2025 Element Creations Ltd # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # See the GNU Affero General Public License for more details: # . # # # import collections import logging from typing import ( TYPE_CHECKING, Awaitable, Callable, Generic, TypeVar, ) from twisted.internet import defer from synapse.util.async_helpers import DeferredEvent from synapse.util.duration import Duration if TYPE_CHECKING: from synapse.server import HomeServer logger = logging.getLogger(__name__) T = TypeVar("T") class BackgroundQueue(Generic[T]): """A single-producer single-consumer async queue processing items in the background. This is optimised for the case where we receive many items, but processing each one takes a short amount of time. In this case we don't want to pay the overhead of a new background process each time. Instead, we spawn a background process that will wait for new items to arrive. If the background process has been idle for a while, it will exit, and a new background process will be spawned when new items arrive. Args: hs: The homeserver. name: The name of the background process. callback: The async callback to process each item. timeout_ms: The time in milliseconds to wait for new items before exiting the background process. """ def __init__( self, hs: "HomeServer", name: str, callback: Callable[[T], Awaitable[None]], timeout_ms: int = 1000, ) -> None: self._hs = hs self._name = name self._callback = callback self._timeout_ms = Duration(milliseconds=timeout_ms) # The queue of items to process. self._queue: collections.deque[T] = collections.deque() # Indicates if a background process is running, and if so whether there # is new data in the queue. Used to signal to an existing background # process that there is new data added to the queue. self._wakeup_event: DeferredEvent | None = None def add(self, item: T) -> None: """Add an item into the queue.""" self._queue.append(item) if self._wakeup_event is None: self._hs.run_as_background_process(self._name, self._process_queue) else: self._wakeup_event.set() async def _process_queue(self) -> None: """Process items in the queue until it is empty.""" # Make sure we're the only background process. if self._wakeup_event is not None: # If there is already a background process then we signal it to wake # up and exit. We do not want multiple background processes running # at a time. self._wakeup_event.set() return self._wakeup_event = DeferredEvent(self._hs.get_clock()) try: while True: # Clear the event before checking the queue. If we cleared after # we run the risk of the wakeup signal racing with us checking # the queue. (This can't really happen in Python due to the # single threaded nature, but let's be a bit defensive anyway.) self._wakeup_event.clear() while self._queue: item = self._queue.popleft() try: await self._callback(item) except defer.CancelledError: raise except Exception: logger.exception("Error processing background queue item") # Wait for new data to arrive, timing out after a while to avoid # keeping the background process alive forever. # # New data may have arrived and been processed while we were # pulling from the queue, so this may return that there is new # data immediately even though there isn't. That's fine, we'll # just loop round, clear the event, recheck the queue, and then # wait here again. new_data = await self._wakeup_event.wait( timeout_seconds=self._timeout_ms.as_secs() ) if not new_data: # Timed out waiting for new data, so exit the loop break finally: # This background process is exiting, so clear the wakeup event to # indicate that a new one should be started when new data arrives. self._wakeup_event = None # The queue must be empty here. assert not self._queue def __len__(self) -> int: return len(self._queue)