diff --git a/changelog.d/18724.misc b/changelog.d/18724.misc new file mode 100644 index 0000000000..21465b19fe --- /dev/null +++ b/changelog.d/18724.misc @@ -0,0 +1 @@ +Refactor `Histogram` metrics to be homeserver-scoped. diff --git a/scripts-dev/mypy_synapse_plugin.py b/scripts-dev/mypy_synapse_plugin.py index 3eab2b3b73..2a7f7ace10 100644 --- a/scripts-dev/mypy_synapse_plugin.py +++ b/scripts-dev/mypy_synapse_plugin.py @@ -61,6 +61,7 @@ class SynapsePlugin(Plugin): ) -> Optional[Callable[[FunctionSigContext], FunctionLike]]: if fullname in ( "prometheus_client.metrics.Counter", + "prometheus_client.metrics.Histogram", "prometheus_client.metrics.Gauge", # TODO: Add other prometheus_client metrics that need checking as we # refactor, see https://github.com/element-hq/synapse/issues/18592 diff --git a/synapse/_scripts/review_recent_signups.py b/synapse/_scripts/review_recent_signups.py index 62723c539d..0ff7fae567 100644 --- a/synapse/_scripts/review_recent_signups.py +++ b/synapse/_scripts/review_recent_signups.py @@ -29,19 +29,21 @@ import attr from synapse.config._base import ( Config, + ConfigError, RootConfig, find_config_files, read_config_files, ) from synapse.config.database import DatabaseConfig +from synapse.config.server import ServerConfig from synapse.storage.database import DatabasePool, LoggingTransaction, make_conn from synapse.storage.engines import create_engine class ReviewConfig(RootConfig): - "A config class that just pulls out the database config" + "A config class that just pulls out the server and database config" - config_classes = [DatabaseConfig] + config_classes = [ServerConfig, DatabaseConfig] @attr.s(auto_attribs=True) @@ -148,6 +150,10 @@ def main() -> None: config_dict = read_config_files(config_files) config.parse_config_dict(config_dict, "", "") + server_name = config.server.server_name + if not isinstance(server_name, str): + raise ConfigError("Must be a string", ("server_name",)) + since_ms = time.time() * 1000 - Config.parse_duration(config_args.since) exclude_users_with_email = config_args.exclude_emails exclude_users_with_appservice = config_args.exclude_app_service @@ -159,7 +165,12 @@ def main() -> None: engine = create_engine(database_config.config) - with make_conn(database_config, engine, "review_recent_signups") as db_conn: + with make_conn( + db_config=database_config, + engine=engine, + default_txn_name="review_recent_signups", + server_name=server_name, + ) as db_conn: # This generates a type of Cursor, not LoggingTransaction. user_infos = get_recent_users( db_conn.cursor(), diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 9a0b459e65..0f54cfc64a 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -672,8 +672,14 @@ class Porter: engine = create_engine(db_config.config) hs = MockHomeserver(self.hs_config) + server_name = hs.hostname - with make_conn(db_config, engine, "portdb") as db_conn: + with make_conn( + db_config=db_config, + engine=engine, + default_txn_name="portdb", + server_name=server_name, + ) as db_conn: engine.check_database( db_conn, allow_outdated_version=allow_outdated_version ) diff --git a/synapse/api/auth/msc3861_delegated.py b/synapse/api/auth/msc3861_delegated.py index 567f2e834c..6f0e505fba 100644 --- a/synapse/api/auth/msc3861_delegated.py +++ b/synapse/api/auth/msc3861_delegated.py @@ -47,6 +47,7 @@ from synapse.logging.opentracing import ( inject_request_headers, start_active_span, ) +from synapse.metrics import SERVER_NAME_LABEL from synapse.synapse_rust.http_client import HttpClient from synapse.types import Requester, UserID, create_requester from synapse.util import json_decoder @@ -62,7 +63,7 @@ logger = logging.getLogger(__name__) introspection_response_timer = Histogram( "synapse_api_auth_delegated_introspection_response", "Time taken to get a response for an introspection request", - ["code"], + labelnames=["code", SERVER_NAME_LABEL], ) @@ -341,17 +342,23 @@ class MSC3861DelegatedAuth(BaseAuth): ) except HttpResponseException as e: end_time = self._clock.time() - introspection_response_timer.labels(e.code).observe(end_time - start_time) + introspection_response_timer.labels( + code=e.code, **{SERVER_NAME_LABEL: self.server_name} + ).observe(end_time - start_time) raise except Exception: end_time = self._clock.time() - introspection_response_timer.labels("ERR").observe(end_time - start_time) + introspection_response_timer.labels( + code="ERR", **{SERVER_NAME_LABEL: self.server_name} + ).observe(end_time - start_time) raise logger.debug("Fetched token from MAS") end_time = self._clock.time() - introspection_response_timer.labels(200).observe(end_time - start_time) + introspection_response_timer.labels( + code=200, **{SERVER_NAME_LABEL: self.server_name} + ).observe(end_time - start_time) resp = json_decoder.decode(resp_body.decode("utf-8")) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 7e1e05580d..127518e1f7 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -122,6 +122,7 @@ received_queries_counter = Counter( pdu_process_time = Histogram( "synapse_federation_server_pdu_process_time", "Time taken to process an event", + labelnames=[SERVER_NAME_LABEL], ) last_pdu_ts_metric = Gauge( @@ -1324,9 +1325,9 @@ class FederationServer(FederationBase): origin, event.event_id ) if received_ts is not None: - pdu_process_time.observe( - (self._clock.time_msec() - received_ts) / 1000 - ) + pdu_process_time.labels( + **{SERVER_NAME_LABEL: self.server_name} + ).observe((self._clock.time_msec() - received_ts) / 1000) next = await self._get_next_nonspam_staged_event_for_room( room_id, room_version diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 1d8bea9943..8befbe3722 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -667,7 +667,8 @@ class FederationSender(AbstractFederationSender): ts = event_to_received_ts[event.event_id] assert ts is not None synapse.metrics.event_processing_lag_by_event.labels( - "federation_sender" + name="federation_sender", + **{SERVER_NAME_LABEL: self.server_name}, ).observe((now - ts) / 1000) async def handle_room_events(events: List[EventBase]) -> None: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 9397e2cc8c..5bd239e5fe 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -187,7 +187,8 @@ class ApplicationServicesHandler: assert ts is not None synapse.metrics.event_processing_lag_by_event.labels( - "appservice_sender" + name="appservice_sender", + **{SERVER_NAME_LABEL: self.server_name}, ).observe((now - ts) / 1000) async def handle_room_events(events: Iterable[EventBase]) -> None: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b66215caea..34aae7ef3c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -71,6 +71,7 @@ from synapse.handlers.pagination import PURGE_PAGINATION_LOCK_NAME from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import nested_logging_context from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace +from synapse.metrics import SERVER_NAME_LABEL from synapse.metrics.background_process_metrics import run_as_background_process from synapse.module_api import NOT_SPAM from synapse.storage.databases.main.events_worker import EventRedactBehaviour @@ -90,7 +91,7 @@ logger = logging.getLogger(__name__) backfill_processing_before_timer = Histogram( "synapse_federation_backfill_processing_before_time_seconds", "sec", - [], + labelnames=[SERVER_NAME_LABEL], buckets=( 0.1, 0.5, @@ -533,9 +534,9 @@ class FederationHandler: # backfill points regardless of `current_depth`. if processing_start_time is not None: processing_end_time = self.clock.time_msec() - backfill_processing_before_timer.observe( - (processing_end_time - processing_start_time) / 1000 - ) + backfill_processing_before_timer.labels( + **{SERVER_NAME_LABEL: self.server_name} + ).observe((processing_end_time - processing_start_time) / 1000) success = await try_backfill(likely_domains) if success: diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 63679638c0..2ef7e77b1d 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -113,7 +113,7 @@ soft_failed_event_counter = Counter( backfill_processing_after_timer = Histogram( "synapse_federation_backfill_processing_after_time_seconds", "sec", - [], + labelnames=[SERVER_NAME_LABEL], buckets=( 0.1, 0.25, @@ -692,7 +692,9 @@ class FederationEventHandler: if not events: return - with backfill_processing_after_timer.time(): + with backfill_processing_after_timer.labels( + **{SERVER_NAME_LABEL: self.server_name} + ).time(): # if there are any events in the wrong room, the remote server is buggy and # should not be trusted. for ev in events: diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index cb56eb53fc..a9573ba0f1 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -38,6 +38,7 @@ from synapse.logging.opentracing import ( tag_args, trace, ) +from synapse.metrics import SERVER_NAME_LABEL from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary from synapse.storage.databases.main.state_deltas import StateDelta from synapse.storage.databases.main.stream import PaginateFunction @@ -79,7 +80,7 @@ logger = logging.getLogger(__name__) sync_processing_time = Histogram( "synapse_sliding_sync_processing_time", "Time taken to generate a sliding sync response, ignoring wait times.", - ["initial"], + labelnames=["initial", SERVER_NAME_LABEL], ) # Limit the number of state_keys we should remember sending down the connection for each @@ -94,6 +95,7 @@ MAX_NUMBER_PREVIOUS_STATE_KEYS_TO_REMEMBER = 100 class SlidingSyncHandler: def __init__(self, hs: "HomeServer"): + self.server_name = hs.hostname self.clock = hs.get_clock() self.store = hs.get_datastores().main self.storage_controllers = hs.get_storage_controllers() @@ -368,9 +370,9 @@ class SlidingSyncHandler: set_tag(SynapseTags.FUNC_ARG_PREFIX + "sync_config.user", user_id) end_time_s = self.clock.time() - sync_processing_time.labels(from_token is not None).observe( - end_time_s - start_time_s - ) + sync_processing_time.labels( + initial=from_token is not None, **{SERVER_NAME_LABEL: self.server_name} + ).observe(end_time_s - start_time_s) return sliding_sync_result diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index 21f520e772..bea0316e6c 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -240,9 +240,11 @@ class RequestMetrics: response_count.labels(**response_base_labels).inc() - response_timer.labels(code=response_code_str, **response_base_labels).observe( - time_sec - self.start_ts - ) + response_timer.labels( + code=response_code_str, + **response_base_labels, + **{SERVER_NAME_LABEL: self.our_server_name}, + ).observe(time_sec - self.start_ts) resource_usage = context.get_resource_usage() diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index c042dd27d2..120e151b7f 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -583,7 +583,7 @@ event_processing_lag = Gauge( event_processing_lag_by_event = Histogram( "synapse_event_processing_lag_by_event", "Time between an event being persisted and it being queued up to be sent to the relevant remote servers", - ["name"], + labelnames=["name", SERVER_NAME_LABEL], ) # Build info of the running server. @@ -607,7 +607,7 @@ threepid_send_requests = Histogram( " there is a request with try count of 4, then there would have been one" " each for 1, 2 and 3", buckets=(1, 2, 3, 4, 5, 10), - labelnames=("type", "reason"), + labelnames=("type", "reason", SERVER_NAME_LABEL), ) threadpool_total_threads = Gauge( diff --git a/synapse/metrics/_gc.py b/synapse/metrics/_gc.py index ee86e27479..053e2da48e 100644 --- a/synapse/metrics/_gc.py +++ b/synapse/metrics/_gc.py @@ -54,9 +54,9 @@ running_on_pypy = platform.python_implementation() == "PyPy" # Python GC metrics # -# This is a process-level metric, so it does not have the `SERVER_NAME_LABEL`. +# These are process-level metrics, so they do not have the `SERVER_NAME_LABEL`. gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"]) # type: ignore[missing-server-name-label] -gc_time = Histogram( +gc_time = Histogram( # type: ignore[missing-server-name-label] "python_gc_time", "Time taken to GC (sec)", ["gen"], diff --git a/synapse/metrics/_reactor_metrics.py b/synapse/metrics/_reactor_metrics.py index fda0cd018b..e1f8570f3d 100644 --- a/synapse/metrics/_reactor_metrics.py +++ b/synapse/metrics/_reactor_metrics.py @@ -62,7 +62,8 @@ logger = logging.getLogger(__name__) # Twisted reactor metrics # -tick_time = Histogram( +# This is a process-level metric, so it does not have the `SERVER_NAME_LABEL`. +tick_time = Histogram( # type: ignore[missing-server-name-label] "python_twisted_reactor_tick_time", "Tick time of the Twisted reactor (sec)", buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5], diff --git a/synapse/replication/tcp/external_cache.py b/synapse/replication/tcp/external_cache.py index 2c531ae40c..497b26fcaf 100644 --- a/synapse/replication/tcp/external_cache.py +++ b/synapse/replication/tcp/external_cache.py @@ -49,7 +49,7 @@ get_counter = Counter( response_timer = Histogram( "synapse_external_cache_response_time_seconds", "Time taken to get a response from Redis for a cache get/set request", - labelnames=["method"], + labelnames=["method", SERVER_NAME_LABEL], buckets=( 0.001, 0.002, @@ -110,7 +110,9 @@ class ExternalCache: "ExternalCache.set", tags={opentracing.SynapseTags.CACHE_NAME: cache_name}, ): - with response_timer.labels("set").time(): + with response_timer.labels( + method="set", **{SERVER_NAME_LABEL: self.server_name} + ).time(): return await make_deferred_yieldable( self._redis_connection.set( self._get_redis_key(cache_name, key), @@ -129,7 +131,9 @@ class ExternalCache: "ExternalCache.get", tags={opentracing.SynapseTags.CACHE_NAME: cache_name}, ): - with response_timer.labels("get").time(): + with response_timer.labels( + method="get", **{SERVER_NAME_LABEL: self.server_name} + ).time(): result = await make_deferred_yieldable( self._redis_connection.get(self._get_redis_key(cache_name, key)) ) diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py index 7d6c0afd9a..667e79abdf 100644 --- a/synapse/rest/client/account.py +++ b/synapse/rest/client/account.py @@ -47,7 +47,7 @@ from synapse.http.servlet import ( parse_string, ) from synapse.http.site import SynapseRequest -from synapse.metrics import threepid_send_requests +from synapse.metrics import SERVER_NAME_LABEL, threepid_send_requests from synapse.push.mailer import Mailer from synapse.types import JsonDict from synapse.types.rest import RequestBodyModel @@ -76,6 +76,7 @@ class EmailPasswordRequestTokenRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): super().__init__() self.hs = hs + self.server_name = hs.hostname self.datastore = hs.get_datastores().main self.config = hs.config self.identity_handler = hs.get_identity_handler() @@ -136,9 +137,11 @@ class EmailPasswordRequestTokenRestServlet(RestServlet): self.mailer.send_password_reset_mail, body.next_link, ) - threepid_send_requests.labels(type="email", reason="password_reset").observe( - body.send_attempt - ) + threepid_send_requests.labels( + type="email", + reason="password_reset", + **{SERVER_NAME_LABEL: self.server_name}, + ).observe(body.send_attempt) # Wrap the session id in a JSON object return 200, {"sid": sid} @@ -325,6 +328,7 @@ class EmailThreepidRequestTokenRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): super().__init__() self.hs = hs + self.server_name = hs.hostname self.config = hs.config self.identity_handler = hs.get_identity_handler() self.store = self.hs.get_datastores().main @@ -394,9 +398,11 @@ class EmailThreepidRequestTokenRestServlet(RestServlet): body.next_link, ) - threepid_send_requests.labels(type="email", reason="add_threepid").observe( - body.send_attempt - ) + threepid_send_requests.labels( + type="email", + reason="add_threepid", + **{SERVER_NAME_LABEL: self.server_name}, + ).observe(body.send_attempt) # Wrap the session id in a JSON object return 200, {"sid": sid} @@ -407,6 +413,7 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): self.hs = hs + self.server_name = hs.hostname super().__init__() self.store = self.hs.get_datastores().main self.identity_handler = hs.get_identity_handler() @@ -469,9 +476,11 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet): body.next_link, ) - threepid_send_requests.labels(type="msisdn", reason="add_threepid").observe( - body.send_attempt - ) + threepid_send_requests.labels( + type="msisdn", + reason="add_threepid", + **{SERVER_NAME_LABEL: self.server_name}, + ).observe(body.send_attempt) logger.info("MSISDN %s: got response from identity server: %s", msisdn, ret) return 200, ret diff --git a/synapse/rest/client/register.py b/synapse/rest/client/register.py index 13148072d7..6930a5fd84 100644 --- a/synapse/rest/client/register.py +++ b/synapse/rest/client/register.py @@ -56,7 +56,7 @@ from synapse.http.servlet import ( parse_string, ) from synapse.http.site import SynapseRequest -from synapse.metrics import threepid_send_requests +from synapse.metrics import SERVER_NAME_LABEL, threepid_send_requests from synapse.push.mailer import Mailer from synapse.types import JsonDict from synapse.util.msisdn import phone_number_to_msisdn @@ -82,6 +82,7 @@ class EmailRegisterRequestTokenRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): super().__init__() self.hs = hs + self.server_name = hs.hostname self.identity_handler = hs.get_identity_handler() self.config = hs.config @@ -163,9 +164,11 @@ class EmailRegisterRequestTokenRestServlet(RestServlet): next_link, ) - threepid_send_requests.labels(type="email", reason="register").observe( - send_attempt - ) + threepid_send_requests.labels( + type="email", + reason="register", + **{SERVER_NAME_LABEL: self.server_name}, + ).observe(send_attempt) # Wrap the session id in a JSON object return 200, {"sid": sid} @@ -177,6 +180,7 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): super().__init__() self.hs = hs + self.server_name = hs.hostname self.identity_handler = hs.get_identity_handler() async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: @@ -240,9 +244,11 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet): next_link, ) - threepid_send_requests.labels(type="msisdn", reason="register").observe( - send_attempt - ) + threepid_send_requests.labels( + type="msisdn", + reason="register", + **{SERVER_NAME_LABEL: self.server_name}, + ).observe(send_attempt) return 200, ret diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 1d1901e617..64deae7650 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -65,6 +65,7 @@ from synapse.http.servlet import ( from synapse.http.site import SynapseRequest from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.opentracing import set_tag +from synapse.metrics import SERVER_NAME_LABEL from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.client._base import client_patterns from synapse.rest.client.transactions import HttpTransactionCache @@ -120,7 +121,7 @@ messsages_response_timer = Histogram( # picture of /messages response time for bigger rooms. We don't want the # tiny rooms that can always respond fast skewing our results when we're trying # to optimize the bigger cases. - ["room_size"], + labelnames=["room_size", SERVER_NAME_LABEL], buckets=( 0.005, 0.01, @@ -801,6 +802,7 @@ class RoomMessageListRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): super().__init__() self._hs = hs + self.server_name = hs.hostname self.clock = hs.get_clock() self.pagination_handler = hs.get_pagination_handler() self.auth = hs.get_auth() @@ -849,7 +851,8 @@ class RoomMessageListRestServlet(RestServlet): processing_end_time = self.clock.time_msec() room_member_count = await make_deferred_yieldable(room_member_count_deferred) messsages_response_timer.labels( - room_size=_RoomSize.from_member_count(room_member_count) + room_size=_RoomSize.from_member_count(room_member_count), + **{SERVER_NAME_LABEL: self.server_name}, ).observe((processing_end_time - processing_start_time) / 1000) return 200, msgs diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index ac7399df06..c9f952b817 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -75,6 +75,7 @@ metrics_logger = logging.getLogger("synapse.state.metrics") state_groups_histogram = Histogram( "synapse_state_number_state_groups_in_resolution", "Number of state groups used when performing a state resolution", + labelnames=[SERVER_NAME_LABEL], buckets=(1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"), ) @@ -620,10 +621,12 @@ _biggest_room_by_db_counter = Counter( _cpu_times = Histogram( "synapse_state_res_cpu_for_all_rooms_seconds", "CPU time (utime+stime) spent computing a single state resolution", + labelnames=[SERVER_NAME_LABEL], ) _db_times = Histogram( "synapse_state_res_db_for_all_rooms_seconds", "Database time spent computing a single state resolution", + labelnames=[SERVER_NAME_LABEL], ) @@ -738,7 +741,9 @@ class StateResolutionHandler: f"State groups have been deleted: {shortstr(missing_state_groups)}" ) - state_groups_histogram.observe(len(state_groups_ids)) + state_groups_histogram.labels( + **{SERVER_NAME_LABEL: self.server_name} + ).observe(len(state_groups_ids)) new_state = await self.resolve_events_with_store( room_id, @@ -825,8 +830,12 @@ class StateResolutionHandler: room_metrics.db_time += rusage.db_txn_duration_sec room_metrics.db_events += rusage.evt_db_fetch_count - _cpu_times.observe(rusage.ru_utime + rusage.ru_stime) - _db_times.observe(rusage.db_txn_duration_sec) + _cpu_times.labels(**{SERVER_NAME_LABEL: self.server_name}).observe( + rusage.ru_utime + rusage.ru_stime + ) + _db_times.labels(**{SERVER_NAME_LABEL: self.server_name}).observe( + rusage.db_txn_duration_sec + ) def _report_metrics(self) -> None: if not self._state_res_metrics: diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index f6c1079ceb..95a34f7be1 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -106,6 +106,7 @@ state_delta_reuse_delta_counter = Counter( forward_extremities_counter = Histogram( "synapse_storage_events_forward_extremities_persisted", "Number of forward extremities for each new event", + labelnames=[SERVER_NAME_LABEL], buckets=(1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"), ) @@ -114,6 +115,7 @@ forward_extremities_counter = Histogram( stale_forward_extremities_counter = Histogram( "synapse_storage_events_stale_forward_extremities_persisted", "Number of unchanged forward extremities for each new event", + labelnames=[SERVER_NAME_LABEL], buckets=(0, 1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"), ) @@ -847,9 +849,13 @@ class EventsPersistenceStorageController: # We only update metrics for events that change forward extremities # (e.g. we ignore backfill/outliers/etc) if result != latest_event_ids: - forward_extremities_counter.observe(len(result)) + forward_extremities_counter.labels( + **{SERVER_NAME_LABEL: self.server_name} + ).observe(len(result)) stale = latest_event_ids & result - stale_forward_extremities_counter.observe(len(stale)) + stale_forward_extremities_counter.labels( + **{SERVER_NAME_LABEL: self.server_name} + ).observe(len(stale)) return result diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 6946abd021..f7aec16c96 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -82,9 +82,13 @@ sql_logger = logging.getLogger("synapse.storage.SQL") transaction_logger = logging.getLogger("synapse.storage.txn") perf_logger = logging.getLogger("synapse.storage.TIME") -sql_scheduling_timer = Histogram("synapse_storage_schedule_time", "sec") +sql_scheduling_timer = Histogram( + "synapse_storage_schedule_time", "sec", labelnames=[SERVER_NAME_LABEL] +) -sql_query_timer = Histogram("synapse_storage_query_time", "sec", ["verb"]) +sql_query_timer = Histogram( + "synapse_storage_query_time", "sec", labelnames=["verb", SERVER_NAME_LABEL] +) sql_txn_count = Counter( "synapse_storage_transaction_time_count", "sec", @@ -144,7 +148,12 @@ def make_pool( # etc. with LoggingContext("db.on_new_connection"): engine.on_new_connection( - LoggingDatabaseConnection(conn, engine, "on_new_connection") + LoggingDatabaseConnection( + conn=conn, + engine=engine, + default_txn_name="on_new_connection", + server_name=server_name, + ) ) connection_pool = adbapi.ConnectionPool( @@ -164,9 +173,11 @@ def make_pool( def make_conn( + *, db_config: DatabaseConnectionConfig, engine: BaseDatabaseEngine, default_txn_name: str, + server_name: str, ) -> "LoggingDatabaseConnection": """Make a new connection to the database and return it. @@ -180,13 +191,18 @@ def make_conn( if not k.startswith("cp_") } native_db_conn = engine.module.connect(**db_params) - db_conn = LoggingDatabaseConnection(native_db_conn, engine, default_txn_name) + db_conn = LoggingDatabaseConnection( + conn=native_db_conn, + engine=engine, + default_txn_name=default_txn_name, + server_name=server_name, + ) engine.on_new_connection(db_conn) return db_conn -@attr.s(slots=True, auto_attribs=True) +@attr.s(slots=True, auto_attribs=True, kw_only=True) class LoggingDatabaseConnection: """A wrapper around a database connection that returns `LoggingTransaction` as its cursor class. @@ -197,6 +213,7 @@ class LoggingDatabaseConnection: conn: Connection engine: BaseDatabaseEngine default_txn_name: str + server_name: str def cursor( self, @@ -210,8 +227,9 @@ class LoggingDatabaseConnection: txn_name = self.default_txn_name return LoggingTransaction( - self.conn.cursor(), + txn=self.conn.cursor(), name=txn_name, + server_name=self.server_name, database_engine=self.engine, after_callbacks=after_callbacks, async_after_callbacks=async_after_callbacks, @@ -280,6 +298,7 @@ class LoggingTransaction: __slots__ = [ "txn", "name", + "server_name", "database_engine", "after_callbacks", "async_after_callbacks", @@ -288,8 +307,10 @@ class LoggingTransaction: def __init__( self, + *, txn: Cursor, name: str, + server_name: str, database_engine: BaseDatabaseEngine, after_callbacks: Optional[List[_CallbackListEntry]] = None, async_after_callbacks: Optional[List[_AsyncCallbackListEntry]] = None, @@ -297,6 +318,7 @@ class LoggingTransaction: ): self.txn = txn self.name = name + self.server_name = server_name self.database_engine = database_engine self.after_callbacks = after_callbacks self.async_after_callbacks = async_after_callbacks @@ -507,7 +529,9 @@ class LoggingTransaction: finally: secs = time.time() - start sql_logger.debug("[SQL time] {%s} %f sec", self.name, secs) - sql_query_timer.labels(sql.split()[0]).observe(secs) + sql_query_timer.labels( + verb=sql.split()[0], **{SERVER_NAME_LABEL: self.server_name} + ).observe(secs) def close(self) -> None: self.txn.close() @@ -1031,7 +1055,9 @@ class DatabasePool: operation_name="db.connection", ): sched_duration_sec = monotonic_time() - start_time - sql_scheduling_timer.observe(sched_duration_sec) + sql_scheduling_timer.labels( + **{SERVER_NAME_LABEL: self.server_name} + ).observe(sched_duration_sec) context.add_database_scheduled(sched_duration_sec) if self._txn_limit > 0: @@ -1064,7 +1090,10 @@ class DatabasePool: ) db_conn = LoggingDatabaseConnection( - conn, self.engine, "runWithConnection" + conn=conn, + engine=self.engine, + default_txn_name="runWithConnection", + server_name=self.server_name, ) return func(db_conn, *args, **kwargs) finally: diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py index 81886ff765..6442ab6c7a 100644 --- a/synapse/storage/databases/__init__.py +++ b/synapse/storage/databases/__init__.py @@ -69,11 +69,18 @@ class Databases(Generic[DataStoreT]): state_deletion: Optional[StateDeletionDataStore] = None persist_events: Optional[PersistEventsStore] = None + server_name = hs.hostname + for database_config in hs.config.database.databases: db_name = database_config.name engine = create_engine(database_config.config) - with make_conn(database_config, engine, "startup") as db_conn: + with make_conn( + db_config=database_config, + engine=engine, + default_txn_name="startup", + server_name=server_name, + ) as db_conn: logger.info("[database config %r]: Checking database server", db_name) engine.check_database(db_conn) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index f22e5710f8..f5e592d80e 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -75,7 +75,7 @@ rate_limit_reject_counter = Counter( queue_wait_timer = Histogram( "synapse_rate_limit_queue_wait_time_seconds", "Amount of time spent waiting for the rate limiter to let our request through.", - ["rate_limiter_name"], + labelnames=["rate_limiter_name", SERVER_NAME_LABEL], buckets=( 0.005, 0.01, @@ -289,7 +289,10 @@ class _PerHostRatelimiter: async def _on_enter_with_tracing(self, request_id: object) -> None: maybe_metrics_cm: ContextManager = contextlib.nullcontext() if self.metrics_name: - maybe_metrics_cm = queue_wait_timer.labels(self.metrics_name).time() + maybe_metrics_cm = queue_wait_timer.labels( + rate_limiter_name=self.metrics_name, + **{SERVER_NAME_LABEL: self.our_server_name}, + ).time() with start_active_span("ratelimit wait"), maybe_metrics_cm: await self._on_enter(request_id) diff --git a/tests/server.py b/tests/server.py index 9e8ca9ccfb..6c4e521850 100644 --- a/tests/server.py +++ b/tests/server.py @@ -97,6 +97,7 @@ from synapse.module_api.callbacks.third_party_event_rules_callbacks import ( load_legacy_third_party_event_rules, ) from synapse.server import HomeServer +from synapse.server_notices.consent_server_notices import ConfigError from synapse.storage import DataStore from synapse.storage.database import LoggingDatabaseConnection, make_pool from synapse.storage.engines import BaseDatabaseEngine, create_engine @@ -1087,12 +1088,19 @@ def setup_test_homeserver( "args": {"database": test_db_location, "cp_min": 1, "cp_max": 1}, } + server_name = config.server.server_name + if not isinstance(server_name, str): + raise ConfigError("Must be a string", ("server_name",)) + # Check if we have set up a DB that we can use as a template. global PREPPED_SQLITE_DB_CONN if PREPPED_SQLITE_DB_CONN is None: temp_engine = create_engine(database_config) PREPPED_SQLITE_DB_CONN = LoggingDatabaseConnection( - sqlite3.connect(":memory:"), temp_engine, "PREPPED_CONN" + conn=sqlite3.connect(":memory:"), + engine=temp_engine, + default_txn_name="PREPPED_CONN", + server_name=server_name, ) database = DatabaseConnectionConfig("master", database_config) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 10533f45d7..b16b77057a 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -63,9 +63,15 @@ class ApplicationServiceStoreTestCase(unittest.HomeserverTestCase): self._add_appservice("token3", "as3", "some_url", "some_hs_token", "bob") # must be done after inserts database = self.hs.get_datastores().databases[0] + self.server_name = self.hs.hostname self.store = ApplicationServiceStore( database, - make_conn(database._database_config, database.engine, "test"), + make_conn( + db_config=database._database_config, + engine=database.engine, + default_txn_name="test", + server_name=self.server_name, + ), self.hs, ) @@ -138,9 +144,17 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase): self.db_pool = database._db_pool self.engine = database.engine + server_name = self.hs.hostname db_config = self.hs.config.database.get_single_database() self.store = TestTransactionStore( - database, make_conn(db_config, self.engine, "test"), self.hs + database, + make_conn( + db_config=db_config, + engine=self.engine, + default_txn_name="test", + server_name=server_name, + ), + self.hs, ) def _add_service(self, url: str, as_token: str, id: str) -> None: @@ -488,10 +502,16 @@ class ApplicationServiceStoreConfigTestCase(unittest.HomeserverTestCase): self.hs.config.appservice.app_service_config_files = [f1, f2] self.hs.config.caches.event_cache_size = 1 + server_name = self.hs.hostname database = self.hs.get_datastores().databases[0] ApplicationServiceStore( database, - make_conn(database._database_config, database.engine, "test"), + make_conn( + db_config=database._database_config, + engine=database.engine, + default_txn_name="test", + server_name=server_name, + ), self.hs, ) @@ -503,10 +523,16 @@ class ApplicationServiceStoreConfigTestCase(unittest.HomeserverTestCase): self.hs.config.caches.event_cache_size = 1 with self.assertRaises(ConfigError) as cm: + server_name = self.hs.hostname database = self.hs.get_datastores().databases[0] ApplicationServiceStore( database, - make_conn(database._database_config, database.engine, "test"), + make_conn( + db_config=database._database_config, + engine=database.engine, + default_txn_name="test", + server_name=server_name, + ), self.hs, ) @@ -523,10 +549,16 @@ class ApplicationServiceStoreConfigTestCase(unittest.HomeserverTestCase): self.hs.config.caches.event_cache_size = 1 with self.assertRaises(ConfigError) as cm: + server_name = self.hs.hostname database = self.hs.get_datastores().databases[0] ApplicationServiceStore( database, - make_conn(database._database_config, database.engine, "test"), + make_conn( + db_config=database._database_config, + engine=database.engine, + default_txn_name="test", + server_name=server_name, + ), self.hs, ) diff --git a/tests/storage/test_rollback_worker.py b/tests/storage/test_rollback_worker.py index 909aee043e..52c84ce5f4 100644 --- a/tests/storage/test_rollback_worker.py +++ b/tests/storage/test_rollback_worker.py @@ -69,9 +69,10 @@ class WorkerSchemaTests(HomeserverTestCase): db_pool = self.hs.get_datastores().main.db_pool db_conn = LoggingDatabaseConnection( - db_pool._db_pool.connect(), - db_pool.engine, - "tests", + conn=db_pool._db_pool.connect(), + engine=db_pool.engine, + default_txn_name="tests", + server_name="test_server", ) cur = db_conn.cursor() @@ -85,9 +86,10 @@ class WorkerSchemaTests(HomeserverTestCase): """Test that workers don't start if the DB has an older schema version""" db_pool = self.hs.get_datastores().main.db_pool db_conn = LoggingDatabaseConnection( - db_pool._db_pool.connect(), - db_pool.engine, - "tests", + conn=db_pool._db_pool.connect(), + engine=db_pool.engine, + default_txn_name="tests", + server_name="test_server", ) cur = db_conn.cursor() @@ -105,9 +107,10 @@ class WorkerSchemaTests(HomeserverTestCase): """ db_pool = self.hs.get_datastores().main.db_pool db_conn = LoggingDatabaseConnection( - db_pool._db_pool.connect(), - db_pool.engine, - "tests", + conn=db_pool._db_pool.connect(), + engine=db_pool.engine, + default_txn_name="tests", + server_name="test_server", ) # Set the schema version of the database to the current version diff --git a/tests/storage/test_unsafe_locale.py b/tests/storage/test_unsafe_locale.py index 4f652fc179..3c012642aa 100644 --- a/tests/storage/test_unsafe_locale.py +++ b/tests/storage/test_unsafe_locale.py @@ -36,8 +36,14 @@ class UnsafeLocaleTest(HomeserverTestCase): def test_unsafe_locale(self, mock_db_locale: MagicMock) -> None: mock_db_locale.return_value = ("B", "B") database = self.hs.get_datastores().databases[0] + server_name = self.hs.hostname - db_conn = make_conn(database._database_config, database.engine, "test_unsafe") + db_conn = make_conn( + db_config=database._database_config, + engine=database.engine, + default_txn_name="test_unsafe", + server_name=server_name, + ) with self.assertRaises(IncorrectDatabaseSetup): database.engine.check_database(db_conn) with self.assertRaises(IncorrectDatabaseSetup): @@ -47,8 +53,14 @@ class UnsafeLocaleTest(HomeserverTestCase): def test_safe_locale(self) -> None: database = self.hs.get_datastores().databases[0] assert isinstance(database.engine, PostgresEngine) + server_name = self.hs.hostname - db_conn = make_conn(database._database_config, database.engine, "test_unsafe") + db_conn = make_conn( + db_config=database._database_config, + engine=database.engine, + default_txn_name="test_unsafe", + server_name=server_name, + ) with db_conn.cursor() as txn: res = database.engine.get_db_locale(txn) self.assertEqual(res, ("C", "C")) diff --git a/tests/utils.py b/tests/utils.py index 0006bd7a8d..d1b66d4159 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -113,7 +113,12 @@ def setupdb() -> None: port=POSTGRES_PORT, password=POSTGRES_PASSWORD, ) - logging_conn = LoggingDatabaseConnection(db_conn, db_engine, "tests") + logging_conn = LoggingDatabaseConnection( + conn=db_conn, + engine=db_engine, + default_txn_name="tests", + server_name="test_server", + ) prepare_database(logging_conn, db_engine, None) logging_conn.close()