Merge branch 'develop' into madlittlemods/hs-specific-metrics

Conflicts:
	synapse/metrics/__init__.py
This commit is contained in:
Eric Eastwood 2025-06-20 11:24:57 -05:00
commit 1c51b74e2d
102 changed files with 3077 additions and 527 deletions

View file

@ -14,7 +14,7 @@ jobs:
# There's a 'download artifact' action, but it hasn't been updated for the workflow_run action
# (https://github.com/actions/download-artifact/issues/60) so instead we get this mess:
- name: 📥 Download artifact
uses: dawidd6/action-download-artifact@07ab29fd4a977ae4d2b275087cf67563dfdf0295 # v9
uses: dawidd6/action-download-artifact@ac66b43f0e6a346234dd65d4d0c8fbb31cb316e5 # v11
with:
workflow: docs-pr.yaml
run_id: ${{ github.event.workflow_run.id }}

View file

@ -5,6 +5,9 @@ on:
paths:
- schema/**
- docs/usage/configuration/config_documentation.md
push:
branches: ["develop", "release-*"]
workflow_dispatch:
jobs:
validate-schema:
@ -12,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/setup-python@8d9ed9ac5c53483de85588cdf95a591a75ab9f55 # v5.5.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: "3.x"
- name: Install check-jsonschema
@ -38,7 +41,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/setup-python@8d9ed9ac5c53483de85588cdf95a591a75ab9f55 # v5.5.0
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: "3.x"
- name: Install PyYAML

View file

@ -85,7 +85,7 @@ jobs:
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Install Rust
uses: dtolnay/rust-toolchain@e05ebb0e73db581a4877c6ce762e29fe1e0b5073 # 1.66.0
uses: dtolnay/rust-toolchain@c1678930c21fb233e4987c4ae12158f9125e5762 # 1.81.0
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
- uses: matrix-org/setup-python-poetry@5bbf6603c5c930615ec8a29f1b5d7d258d905aa4 # v2.0.0
with:
@ -149,7 +149,7 @@ jobs:
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Install Rust
uses: dtolnay/rust-toolchain@e05ebb0e73db581a4877c6ce762e29fe1e0b5073 # 1.66.0
uses: dtolnay/rust-toolchain@c1678930c21fb233e4987c4ae12158f9125e5762 # 1.81.0
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
- name: Setup Poetry
@ -210,7 +210,7 @@ jobs:
with:
ref: ${{ github.event.pull_request.head.sha }}
- name: Install Rust
uses: dtolnay/rust-toolchain@e05ebb0e73db581a4877c6ce762e29fe1e0b5073 # 1.66.0
uses: dtolnay/rust-toolchain@c1678930c21fb233e4987c4ae12158f9125e5762 # 1.81.0
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
- uses: matrix-org/setup-python-poetry@5bbf6603c5c930615ec8a29f1b5d7d258d905aa4 # v2.0.0
with:
@ -227,7 +227,7 @@ jobs:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Install Rust
uses: dtolnay/rust-toolchain@e05ebb0e73db581a4877c6ce762e29fe1e0b5073 # 1.66.0
uses: dtolnay/rust-toolchain@0d72692bcfbf448b1e2afa01a67f71b455a9dcec # 1.86.0
with:
components: clippy
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
@ -247,7 +247,7 @@ jobs:
- name: Install Rust
uses: dtolnay/rust-toolchain@56f84321dbccf38fb67ce29ab63e4754056677e0 # master (rust 1.85.1)
with:
toolchain: nightly-2022-12-01
toolchain: nightly-2025-04-23
components: clippy
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
@ -265,7 +265,7 @@ jobs:
uses: dtolnay/rust-toolchain@56f84321dbccf38fb67ce29ab63e4754056677e0 # master (rust 1.85.1)
with:
# We use nightly so that it correctly groups together imports
toolchain: nightly-2022-12-01
toolchain: nightly-2025-04-23
components: rustfmt
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
@ -362,7 +362,7 @@ jobs:
postgres:${{ matrix.job.postgres-version }}
- name: Install Rust
uses: dtolnay/rust-toolchain@e05ebb0e73db581a4877c6ce762e29fe1e0b5073 # 1.66.0
uses: dtolnay/rust-toolchain@c1678930c21fb233e4987c4ae12158f9125e5762 # 1.81.0
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
- uses: matrix-org/setup-python-poetry@5bbf6603c5c930615ec8a29f1b5d7d258d905aa4 # v2.0.0
@ -404,7 +404,7 @@ jobs:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Install Rust
uses: dtolnay/rust-toolchain@e05ebb0e73db581a4877c6ce762e29fe1e0b5073 # 1.66.0
uses: dtolnay/rust-toolchain@c1678930c21fb233e4987c4ae12158f9125e5762 # 1.81.0
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
# There aren't wheels for some of the older deps, so we need to install
@ -519,7 +519,7 @@ jobs:
run: cat sytest-blacklist .ci/worker-blacklist > synapse-blacklist-with-workers
- name: Install Rust
uses: dtolnay/rust-toolchain@e05ebb0e73db581a4877c6ce762e29fe1e0b5073 # 1.66.0
uses: dtolnay/rust-toolchain@c1678930c21fb233e4987c4ae12158f9125e5762 # 1.81.0
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
- name: Run SyTest
@ -663,7 +663,7 @@ jobs:
path: synapse
- name: Install Rust
uses: dtolnay/rust-toolchain@e05ebb0e73db581a4877c6ce762e29fe1e0b5073 # 1.66.0
uses: dtolnay/rust-toolchain@c1678930c21fb233e4987c4ae12158f9125e5762 # 1.81.0
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
- name: Prepare Complement's Prerequisites
@ -695,7 +695,7 @@ jobs:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Install Rust
uses: dtolnay/rust-toolchain@e05ebb0e73db581a4877c6ce762e29fe1e0b5073 # 1.66.0
uses: dtolnay/rust-toolchain@c1678930c21fb233e4987c4ae12158f9125e5762 # 1.81.0
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6 # v2.7.8
- run: cargo test

View file

@ -1,3 +1,12 @@
# Synapse 1.132.0 (2025-06-17)
### Improved Documentation
- Improvements to generate config documentation from JSON Schema file. ([\#18522](https://github.com/element-hq/synapse/issues/18522))
# Synapse 1.132.0rc1 (2025-06-10)
### Features

1323
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1 @@
Add support for the [MSC4260 user report API](https://github.com/matrix-org/matrix-spec-proposals/pull/4260).

1
changelog.d/18357.misc Normal file
View file

@ -0,0 +1 @@
Increase performance of introspecting access tokens when using delegated auth.

View file

@ -1 +0,0 @@
Generate config documentation from JSON Schema file.

1
changelog.d/18535.bugfix Normal file
View file

@ -0,0 +1 @@
Fix long-standing bug where sliding sync did not honour the `room_id_to_include` config option.

1
changelog.d/18541.misc Normal file
View file

@ -0,0 +1 @@
Log user deactivations.

1
changelog.d/18542.misc Normal file
View file

@ -0,0 +1 @@
Enable [`flake8-logging`](https://docs.astral.sh/ruff/rules/#flake8-logging-log) and [`flake8-logging-format`](https://docs.astral.sh/ruff/rules/#flake8-logging-format-g) rules in Ruff and fix related issues throughout the codebase.

1
changelog.d/18543.bugfix Normal file
View file

@ -0,0 +1 @@
Fix an issue where "Lock timeout is getting excessive" warnings would be logged even when the lock timeout was <10 minutes.

1
changelog.d/18545.bugfix Normal file
View file

@ -0,0 +1 @@
Fix an issue where Synapse could calculate the wrong power level for the creator of the room if there was no power levels event.

1
changelog.d/18546.misc Normal file
View file

@ -0,0 +1 @@
Clean up old, unused rows from the `device_federation_inbox` table.

1
changelog.d/18547.bugfix Normal file
View file

@ -0,0 +1 @@
Fix an issue where during state resolution for v11 rooms Synapse would incorrectly calculate the power level of the creator when there was no power levels event in the room.

1
changelog.d/18551.misc Normal file
View file

@ -0,0 +1 @@
Run config schema CI on develop and release branches.

1
changelog.d/18561.misc Normal file
View file

@ -0,0 +1 @@
Increase performance of introspecting access tokens when using delegated auth.

1
changelog.d/18568.doc Normal file
View file

@ -0,0 +1 @@
Fix typo in user type documentation.

6
debian/changelog vendored
View file

@ -1,3 +1,9 @@
matrix-synapse-py3 (1.132.0) stable; urgency=medium
* New Synapse release 1.132.0.
-- Synapse Packaging team <packages@matrix.org> Tue, 17 Jun 2025 13:16:20 +0100
matrix-synapse-py3 (1.132.0~rc1) stable; urgency=medium
* New Synapse release 1.132.0rc1.

View file

@ -770,7 +770,7 @@ This setting has the following sub-options:
* `default_user_type` (string|null): The default user type to use for registering new users when no value has been specified. Defaults to none. Defaults to `null`.
* `extra_user_types` (list): Array of additional user types to allow. These are treated as real users. Defaults to `[]`.
* `extra_user_types` (array): Array of additional user types to allow. These are treated as real users. Defaults to `[]`.
Example configuration:
```yaml
@ -1937,6 +1937,33 @@ rc_delayed_event_mgmt:
burst_count: 20.0
```
---
### `rc_reports`
*(object)* Ratelimiting settings for reporting content.
This is a ratelimiting option that ratelimits reports made by users about content they see.
Setting this to a high value allows users to report content quickly, possibly in duplicate. This can result in higher database usage.
This setting has the following sub-options:
* `per_second` (number): Maximum number of requests a client can send per second.
* `burst_count` (number): Maximum number of requests a client can send before being throttled.
Default configuration:
```yaml
rc_reports:
per_user:
per_second: 1.0
burst_count: 5.0
```
Example configuration:
```yaml
rc_reports:
per_second: 2.0
burst_count: 20.0
```
---
### `federation_rr_transactions_per_room_per_second`
*(integer)* Sets outgoing federation transaction frequency for sending read-receipts, per-room.

14
poetry.lock generated
View file

@ -2256,19 +2256,19 @@ rpds-py = ">=0.7.0"
[[package]]
name = "requests"
version = "2.32.2"
version = "2.32.4"
description = "Python HTTP for Humans."
optional = false
python-versions = ">=3.8"
groups = ["main", "dev"]
files = [
{file = "requests-2.32.2-py3-none-any.whl", hash = "sha256:fc06670dd0ed212426dfeb94fc1b983d917c4f9847c863f313c9dfaaffb7c23c"},
{file = "requests-2.32.2.tar.gz", hash = "sha256:dd951ff5ecf3e3b3aa26b40703ba77495dab41da839ae72ef3c8e5d8e2433289"},
{file = "requests-2.32.4-py3-none-any.whl", hash = "sha256:27babd3cda2a6d50b30443204ee89830707d396671944c998b5975b031ac2b2c"},
{file = "requests-2.32.4.tar.gz", hash = "sha256:27d0316682c8a29834d3264820024b62a36942083d52caf2f14c0591336d3422"},
]
[package.dependencies]
certifi = ">=2017.4.17"
charset-normalizer = ">=2,<4"
charset_normalizer = ">=2,<4"
idna = ">=2.5,<4"
urllib3 = ">=1.21.1,<3"
@ -3058,14 +3058,14 @@ files = [
[[package]]
name = "types-requests"
version = "2.32.0.20250328"
version = "2.32.4.20250611"
description = "Typing stubs for requests"
optional = false
python-versions = ">=3.9"
groups = ["dev"]
files = [
{file = "types_requests-2.32.0.20250328-py3-none-any.whl", hash = "sha256:72ff80f84b15eb3aa7a8e2625fffb6a93f2ad5a0c20215fc1dcfa61117bcb2a2"},
{file = "types_requests-2.32.0.20250328.tar.gz", hash = "sha256:c9e67228ea103bd811c96984fac36ed2ae8da87a36a633964a21f199d60baf32"},
{file = "types_requests-2.32.4.20250611-py3-none-any.whl", hash = "sha256:ad2fe5d3b0cb3c2c902c8815a70e7fb2302c4b8c1f77bdcd738192cdb3878072"},
{file = "types_requests-2.32.4.20250611.tar.gz", hash = "sha256:741c8777ed6425830bf51e54d6abe245f79b4dcb9019f1622b773463946bf826"},
]
[package.dependencies]

View file

@ -74,6 +74,10 @@ select = [
"PIE",
# flake8-executable
"EXE",
# flake8-logging
"LOG",
# flake8-logging-format
"G",
]
[tool.ruff.lint.isort]
@ -97,7 +101,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry]
name = "matrix-synapse"
version = "1.132.0rc1"
version = "1.132.0"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "AGPL-3.0-or-later"

View file

@ -7,7 +7,7 @@ name = "synapse"
version = "0.1.0"
edition = "2021"
rust-version = "1.66.0"
rust-version = "1.81.0"
[lib]
name = "synapse"
@ -36,13 +36,21 @@ pyo3 = { version = "0.24.2", features = [
"abi3",
"abi3-py39",
] }
pyo3-log = "0.12.0"
pyo3-log = "0.12.3"
pythonize = "0.24.0"
regex = "1.6.0"
sha2 = "0.10.8"
serde = { version = "1.0.144", features = ["derive"] }
serde_json = "1.0.85"
ulid = "1.1.2"
reqwest = { version = "0.12.15", default-features = false, features = [
"http2",
"stream",
"rustls-tls-native-roots",
] }
http-body-util = "0.1.3"
futures = "0.3.31"
tokio = { version = "1.44.2", features = ["rt", "rt-multi-thread"] }
[features]
extension-module = ["pyo3/extension-module"]

View file

@ -58,3 +58,15 @@ impl NotFoundError {
NotFoundError::new_err(())
}
}
import_exception!(synapse.api.errors, HttpResponseException);
impl HttpResponseException {
pub fn new(status: StatusCode, bytes: Vec<u8>) -> pyo3::PyErr {
HttpResponseException::new_err((
status.as_u16(),
status.canonical_reason().unwrap_or_default(),
bytes,
))
}
}

218
rust/src/http_client.rs Normal file
View file

@ -0,0 +1,218 @@
/*
* This file is licensed under the Affero General Public License (AGPL) version 3.
*
* Copyright (C) 2025 New Vector, Ltd
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* See the GNU Affero General Public License for more details:
* <https://www.gnu.org/licenses/agpl-3.0.html>.
*/
use std::{collections::HashMap, future::Future, panic::AssertUnwindSafe, sync::LazyLock};
use anyhow::Context;
use futures::{FutureExt, TryStreamExt};
use pyo3::{exceptions::PyException, prelude::*, types::PyString};
use reqwest::RequestBuilder;
use tokio::runtime::Runtime;
use crate::errors::HttpResponseException;
/// The tokio runtime that we're using to run async Rust libs.
static RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.unwrap()
});
/// A reference to the `Deferred` python class.
static DEFERRED_CLASS: LazyLock<PyObject> = LazyLock::new(|| {
Python::with_gil(|py| {
py.import("twisted.internet.defer")
.expect("module 'twisted.internet.defer' should be importable")
.getattr("Deferred")
.expect("module 'twisted.internet.defer' should have a 'Deferred' class")
.unbind()
})
});
/// A reference to the twisted `reactor`.
static TWISTED_REACTOR: LazyLock<Py<PyModule>> = LazyLock::new(|| {
Python::with_gil(|py| {
py.import("twisted.internet.reactor")
.expect("module 'twisted.internet.reactor' should be importable")
.unbind()
})
});
/// Called when registering modules with python.
pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
let child_module: Bound<'_, PyModule> = PyModule::new(py, "http_client")?;
child_module.add_class::<HttpClient>()?;
// Make sure we fail early if we can't build the lazy statics.
LazyLock::force(&RUNTIME);
LazyLock::force(&DEFERRED_CLASS);
m.add_submodule(&child_module)?;
// We need to manually add the module to sys.modules to make `from
// synapse.synapse_rust import acl` work.
py.import("sys")?
.getattr("modules")?
.set_item("synapse.synapse_rust.http_client", child_module)?;
Ok(())
}
#[pyclass]
#[derive(Clone)]
struct HttpClient {
client: reqwest::Client,
}
#[pymethods]
impl HttpClient {
#[new]
pub fn py_new(user_agent: &str) -> PyResult<HttpClient> {
// The twisted reactor can only be imported after Synapse has been
// imported, to allow Synapse to change the twisted reactor. If we try
// and import the reactor too early twisted installs a default reactor,
// which can't be replaced.
LazyLock::force(&TWISTED_REACTOR);
Ok(HttpClient {
client: reqwest::Client::builder()
.user_agent(user_agent)
.build()
.context("building reqwest client")?,
})
}
pub fn get<'a>(
&self,
py: Python<'a>,
url: String,
response_limit: usize,
) -> PyResult<Bound<'a, PyAny>> {
self.send_request(py, self.client.get(url), response_limit)
}
pub fn post<'a>(
&self,
py: Python<'a>,
url: String,
response_limit: usize,
headers: HashMap<String, String>,
request_body: String,
) -> PyResult<Bound<'a, PyAny>> {
let mut builder = self.client.post(url);
for (name, value) in headers {
builder = builder.header(name, value);
}
builder = builder.body(request_body);
self.send_request(py, builder, response_limit)
}
}
impl HttpClient {
fn send_request<'a>(
&self,
py: Python<'a>,
builder: RequestBuilder,
response_limit: usize,
) -> PyResult<Bound<'a, PyAny>> {
create_deferred(py, async move {
let response = builder.send().await.context("sending request")?;
let status = response.status();
let mut stream = response.bytes_stream();
let mut buffer = Vec::new();
while let Some(chunk) = stream.try_next().await.context("reading body")? {
if buffer.len() + chunk.len() > response_limit {
Err(anyhow::anyhow!("Response size too large"))?;
}
buffer.extend_from_slice(&chunk);
}
if !status.is_success() {
return Err(HttpResponseException::new(status, buffer));
}
let r = Python::with_gil(|py| buffer.into_pyobject(py).map(|o| o.unbind()))?;
Ok(r)
})
}
}
/// Creates a twisted deferred from the given future, spawning the task on the
/// tokio runtime.
///
/// Does not handle deferred cancellation or contextvars.
fn create_deferred<F, O>(py: Python, fut: F) -> PyResult<Bound<'_, PyAny>>
where
F: Future<Output = PyResult<O>> + Send + 'static,
for<'a> O: IntoPyObject<'a>,
{
let deferred = DEFERRED_CLASS.bind(py).call0()?;
let deferred_callback = deferred.getattr("callback")?.unbind();
let deferred_errback = deferred.getattr("errback")?.unbind();
RUNTIME.spawn(async move {
// TODO: Is it safe to assert unwind safety here? I think so, as we
// don't use anything that could be tainted by the panic afterwards.
// Note that `.spawn(..)` asserts unwind safety on the future too.
let res = AssertUnwindSafe(fut).catch_unwind().await;
Python::with_gil(move |py| {
// Flatten the panic into standard python error
let res = match res {
Ok(r) => r,
Err(panic_err) => {
let panic_message = get_panic_message(&panic_err);
Err(PyException::new_err(
PyString::new(py, panic_message).unbind(),
))
}
};
// Send the result to the deferred, via `.callback(..)` or `.errback(..)`
match res {
Ok(obj) => {
TWISTED_REACTOR
.call_method(py, "callFromThread", (deferred_callback, obj), None)
.expect("callFromThread should not fail"); // There's nothing we can really do with errors here
}
Err(err) => {
TWISTED_REACTOR
.call_method(py, "callFromThread", (deferred_errback, err), None)
.expect("callFromThread should not fail"); // There's nothing we can really do with errors here
}
}
});
});
Ok(deferred)
}
/// Try and get the panic message out of the panic
fn get_panic_message<'a>(panic_err: &'a (dyn std::any::Any + Send + 'static)) -> &'a str {
// Apparently this is how you extract the panic message from a panic
if let Some(str_slice) = panic_err.downcast_ref::<&str>() {
str_slice
} else if let Some(string) = panic_err.downcast_ref::<String>() {
string
} else {
"unknown error"
}
}

View file

@ -8,6 +8,7 @@ pub mod acl;
pub mod errors;
pub mod events;
pub mod http;
pub mod http_client;
pub mod identifier;
pub mod matrix_const;
pub mod push;
@ -50,6 +51,7 @@ fn synapse_rust(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
acl::register_module(py, m)?;
push::register_module(py, m)?;
events::register_module(py, m)?;
http_client::register_module(py, m)?;
rendezvous::register_module(py, m)?;
Ok(())

View file

@ -2184,6 +2184,23 @@ properties:
examples:
- per_second: 2.0
burst_count: 20.0
rc_reports:
$ref: "#/$defs/rc"
description: >-
Ratelimiting settings for reporting content.
This is a ratelimiting option that ratelimits reports made by users
about content they see.
Setting this to a high value allows users to report content quickly, possibly in
duplicate. This can result in higher database usage.
default:
per_user:
per_second: 1.0
burst_count: 5.0
examples:
- per_second: 2.0
burst_count: 20.0
federation_rr_transactions_per_room_per_second:
type: integer
description: >-

View file

@ -243,7 +243,7 @@ def do_lint() -> Set[str]:
importlib.import_module(module_info.name)
except ModelCheckerException as e:
logger.warning(
f"Bad annotation found when importing {module_info.name}"
"Bad annotation found when importing %s", module_info.name
)
failures.add(format_model_checker_exception(e))

View file

@ -37,7 +37,9 @@ from synapse.appservice import ApplicationService
from synapse.http import get_request_user_agent
from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import trace
from synapse.state import CREATE_KEY, POWER_KEY
from synapse.types import Requester, create_requester
from synapse.types.state import StateFilter
from synapse.util.cancellation import cancellable
if TYPE_CHECKING:
@ -216,18 +218,20 @@ class BaseAuth:
# by checking if they would (theoretically) be able to change the
# m.room.canonical_alias events
power_level_event = (
await self._storage_controllers.state.get_current_state_event(
room_id, EventTypes.PowerLevels, ""
)
auth_events = await self._storage_controllers.state.get_current_state(
room_id,
StateFilter.from_types(
[
POWER_KEY,
CREATE_KEY,
]
),
)
auth_events = {}
if power_level_event:
auth_events[(EventTypes.PowerLevels, "")] = power_level_event
send_level = event_auth.get_send_level(
EventTypes.CanonicalAlias, "", power_level_event
EventTypes.CanonicalAlias,
"",
auth_events.get(POWER_KEY),
)
user_level = event_auth.get_user_power_level(
requester.user.to_string(), auth_events

View file

@ -30,9 +30,6 @@ from authlib.oauth2.rfc7662 import IntrospectionToken
from authlib.oidc.discovery import OpenIDProviderMetadata, get_well_known_url
from prometheus_client import Histogram
from twisted.web.client import readBody
from twisted.web.http_headers import Headers
from synapse.api.auth.base import BaseAuth
from synapse.api.errors import (
AuthError,
@ -43,8 +40,14 @@ from synapse.api.errors import (
UnrecognizedRequestError,
)
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.opentracing import active_span, force_tracing, start_active_span
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.opentracing import (
active_span,
force_tracing,
inject_request_headers,
start_active_span,
)
from synapse.synapse_rust.http_client import HttpClient
from synapse.types import Requester, UserID, create_requester
from synapse.util import json_decoder
from synapse.util.caches.cached_call import RetryOnExceptionCachedCall
@ -179,6 +182,10 @@ class MSC3861DelegatedAuth(BaseAuth):
self._admin_token: Callable[[], Optional[str]] = self._config.admin_token
self._force_tracing_for_users = hs.config.tracing.force_tracing_for_users
self._rust_http_client = HttpClient(
user_agent=self._http_client.user_agent.decode("utf8")
)
# # Token Introspection Cache
# This remembers what users/devices are represented by which access tokens,
# in order to reduce overall system load:
@ -302,7 +309,6 @@ class MSC3861DelegatedAuth(BaseAuth):
introspection_endpoint = await self._introspection_endpoint()
raw_headers: Dict[str, str] = {
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": str(self._http_client.user_agent, "utf-8"),
"Accept": "application/json",
# Tell MAS that we support reading the device ID as an explicit
# value, not encoded in the scope. This is supported by MAS 0.15+
@ -316,38 +322,34 @@ class MSC3861DelegatedAuth(BaseAuth):
uri, raw_headers, body = self._client_auth.prepare(
method="POST", uri=introspection_endpoint, headers=raw_headers, body=body
)
headers = Headers({k: [v] for (k, v) in raw_headers.items()})
# Do the actual request
# We're not using the SimpleHttpClient util methods as we don't want to
# check the HTTP status code, and we do the body encoding ourselves.
logger.debug("Fetching token from MAS")
start_time = self._clock.time()
try:
response = await self._http_client.request(
method="POST",
uri=uri,
data=body.encode("utf-8"),
headers=headers,
)
resp_body = await make_deferred_yieldable(readBody(response))
with start_active_span("mas-introspect-token"):
inject_request_headers(raw_headers)
with PreserveLoggingContext():
resp_body = await self._rust_http_client.post(
url=uri,
response_limit=1 * 1024 * 1024,
headers=raw_headers,
request_body=body,
)
except HttpResponseException as e:
end_time = self._clock.time()
introspection_response_timer.labels(e.code).observe(end_time - start_time)
raise
except Exception:
end_time = self._clock.time()
introspection_response_timer.labels("ERR").observe(end_time - start_time)
raise
end_time = self._clock.time()
introspection_response_timer.labels(response.code).observe(
end_time - start_time
)
logger.debug("Fetched token from MAS")
if response.code < 200 or response.code >= 300:
raise HttpResponseException(
response.code,
response.phrase.decode("ascii", errors="replace"),
resp_body,
)
end_time = self._clock.time()
introspection_response_timer.labels(200).observe(end_time - start_time)
resp = json_decoder.decode(resp_body.decode("utf-8"))
@ -476,7 +478,7 @@ class MSC3861DelegatedAuth(BaseAuth):
# XXX: This is a temporary solution so that the admin API can be called by
# the OIDC provider. This will be removed once we have OIDC client
# credentials grant support in matrix-authentication-service.
logging.info("Admin toked used")
logger.info("Admin toked used")
# XXX: that user doesn't exist and won't be provisioned.
# This is mostly fine for admin calls, but we should also think about doing
# requesters without a user_id.

View file

@ -434,8 +434,8 @@ def listen_http(
# getHost() returns a UNIXAddress which contains an instance variable of 'name'
# encoded as a byte string. Decode as utf-8 so pretty.
logger.info(
"Synapse now listening on Unix Socket at: "
f"{ports[0].getHost().name.decode('utf-8')}"
"Synapse now listening on Unix Socket at: %s",
ports[0].getHost().name.decode("utf-8"),
)
return ports

View file

@ -28,15 +28,13 @@ from prometheus_client import Gauge
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.types import JsonDict
from synapse.util.constants import ONE_HOUR_SECONDS, ONE_MINUTE_SECONDS
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger("synapse.app.homeserver")
ONE_MINUTE_SECONDS = 60
ONE_HOUR_SECONDS = 60 * ONE_MINUTE_SECONDS
MILLISECONDS_PER_SECOND = 1000
INITIAL_DELAY_BEFORE_FIRST_PHONE_HOME_SECONDS = 5 * ONE_MINUTE_SECONDS
@ -173,7 +171,7 @@ async def phone_stats_home(
stats["log_level"] = logging.getLevelName(log_level)
logger.info(
"Reporting stats to %s: %s" % (hs.config.metrics.report_stats_endpoint, stats)
"Reporting stats to %s: %s", hs.config.metrics.report_stats_endpoint, stats
)
try:
await hs.get_proxied_http_client().put_json(

View file

@ -461,7 +461,7 @@ class _TransactionController:
recoverer = self.recoverers.get(service.id)
if not recoverer:
# No need to force a retry on a happy AS.
logger.info(f"{service.id} is not in recovery, not forcing retry")
logger.info("%s is not in recovery, not forcing retry", service.id)
return
recoverer.force_retry()

View file

@ -51,6 +51,8 @@ if TYPE_CHECKING:
from synapse.config.homeserver import HomeServerConfig
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
DEFAULT_LOG_CONFIG = Template(
"""\
# Log configuration for Synapse.
@ -291,7 +293,7 @@ def _load_logging_config(log_config_path: str) -> None:
log_config = yaml.safe_load(f.read())
if not log_config:
logging.warning("Loaded a blank logging config?")
logger.warning("Loaded a blank logging config?")
# If the old structured logging configuration is being used, raise an error.
if "structured" in log_config and log_config.get("structured"):
@ -312,7 +314,7 @@ def _reload_logging_config(log_config_path: Optional[str]) -> None:
return
_load_logging_config(log_config_path)
logging.info("Reloaded log config from %s due to SIGHUP", log_config_path)
logger.info("Reloaded log config from %s due to SIGHUP", log_config_path)
def setup_logging(
@ -349,17 +351,17 @@ def setup_logging(
appbase.register_sighup(_reload_logging_config, log_config_path)
# Log immediately so we can grep backwards.
logging.warning("***** STARTING SERVER *****")
logging.warning(
logger.warning("***** STARTING SERVER *****")
logger.warning(
"Server %s version %s",
sys.argv[0],
SYNAPSE_VERSION,
)
logging.warning("Copyright (c) 2023 New Vector, Inc")
logging.warning(
logger.warning("Copyright (c) 2023 New Vector, Inc")
logger.warning(
"Licensed under the AGPL 3.0 license. Website: https://github.com/element-hq/synapse"
)
logging.info("Server hostname: %s", config.server.server_name)
logging.info("Public Base URL: %s", config.server.public_baseurl)
logging.info("Instance name: %s", hs.get_instance_name())
logging.info("Twisted reactor: %s", type(reactor).__name__)
logger.info("Server hostname: %s", config.server.server_name)
logger.info("Public Base URL: %s", config.server.public_baseurl)
logger.info("Instance name: %s", hs.get_instance_name())
logger.info("Twisted reactor: %s", type(reactor).__name__)

View file

@ -240,3 +240,9 @@ class RatelimitConfig(Config):
"rc_delayed_event_mgmt",
defaults={"per_second": 1, "burst_count": 5},
)
self.rc_reports = RatelimitSettings.parse(
config,
"rc_reports",
defaults={"per_second": 1, "burst_count": 5},
)

View file

@ -27,7 +27,7 @@ from synapse.types import JsonDict
from ._base import Config, ConfigError
logger = logging.Logger(__name__)
logger = logging.getLogger(__name__)
class RoomDefaultEncryptionTypes:

View file

@ -41,7 +41,7 @@ from synapse.util.stringutils import parse_and_validate_server_name
from ._base import Config, ConfigError
from ._util import validate_config
logger = logging.Logger(__name__)
logger = logging.getLogger(__name__)
DIRECT_TCP_ERROR = """
Using direct TCP replication for workers is no longer supported.

View file

@ -64,6 +64,7 @@ from synapse.api.room_versions import (
RoomVersion,
RoomVersions,
)
from synapse.state import CREATE_KEY
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
MutableStateMap,
@ -308,6 +309,13 @@ def check_state_dependent_auth_rules(
auth_dict = {(e.type, e.state_key): e for e in auth_events}
# Later code relies on there being a create event e.g _can_federate, _is_membership_change_allowed
# so produce a more intelligible error if we don't have one.
if auth_dict.get(CREATE_KEY) is None:
raise AuthError(
403, f"Event {event.event_id} is missing a create event in auth_events."
)
# additional check for m.federate
creating_domain = get_domain_from_id(event.room_id)
originating_domain = get_domain_from_id(event.sender)
@ -1010,11 +1018,16 @@ def get_user_power_level(user_id: str, auth_events: StateMap["EventBase"]) -> in
user_id: user's id to look up in power_levels
auth_events:
state in force at this point in the room (or rather, a subset of
it including at least the create event and power levels event.
it including at least the create event, and possibly a power levels event).
Returns:
the user's power level in this room.
"""
create_event = auth_events.get(CREATE_KEY)
assert create_event is not None, (
"A create event in the auth events chain is required to calculate user power level correctly,"
" but was not found. This indicates a bug"
)
power_level_event = get_power_level_event(auth_events)
if power_level_event:
level = power_level_event.content.get("users", {}).get(user_id)
@ -1028,18 +1041,12 @@ def get_user_power_level(user_id: str, auth_events: StateMap["EventBase"]) -> in
else:
# if there is no power levels event, the creator gets 100 and everyone
# else gets 0.
# some things which call this don't pass the create event: hack around
# that.
key = (EventTypes.Create, "")
create_event = auth_events.get(key)
if create_event is not None:
if create_event.room_version.implicit_room_creator:
creator = create_event.sender
else:
creator = create_event.content[EventContentFields.ROOM_CREATOR]
if creator == user_id:
return 100
if create_event.room_version.implicit_room_creator:
creator = create_event.sender
else:
creator = create_event.content[EventContentFields.ROOM_CREATOR]
if creator == user_id:
return 100
return 0

View file

@ -195,15 +195,18 @@ class InviteAutoAccepter:
except SynapseError as e:
if e.code == HTTPStatus.FORBIDDEN:
logger.debug(
f"Update_room_membership was forbidden. This can sometimes be expected for remote invites. Exception: {e}"
"Update_room_membership was forbidden. This can sometimes be expected for remote invites. Exception: %s",
e,
)
else:
logger.warn(
f"Update_room_membership raised the following unexpected (SynapseError) exception: {e}"
logger.warning(
"Update_room_membership raised the following unexpected (SynapseError) exception: %s",
e,
)
except Exception as e:
logger.warn(
f"Update_room_membership raised the following unexpected exception: {e}"
logger.warning(
"Update_room_membership raised the following unexpected exception: %s",
e,
)
sleep = 2**retries

View file

@ -1825,7 +1825,7 @@ class FederationClient(FederationBase):
)
return timestamp_to_event_response
except SynapseError as e:
logger.warn(
logger.warning(
"timestamp_to_event(room_id=%s, timestamp=%s, direction=%s): encountered error when trying to fetch from destinations: %s",
room_id,
timestamp,

View file

@ -952,7 +952,8 @@ class FederationServer(FederationBase):
# joins) or the full state (for full joins).
# Return a 404 as we would if we weren't in the room at all.
logger.info(
f"Rejecting /send_{membership_type} to %s because it's a partial state room",
"Rejecting /send_%s to %s because it's a partial state room",
membership_type,
room_id,
)
raise SynapseError(

View file

@ -495,7 +495,7 @@ class AdminHandler:
)
except Exception as ex:
logger.info(
f"Redaction of event {event.event_id} failed due to: {ex}"
"Redaction of event %s failed due to: %s", event.event_id, ex
)
result["failed_redactions"][event.event_id] = str(ex)
await self._task_scheduler.update_task(task.id, result=result)

View file

@ -476,9 +476,7 @@ class ApplicationServicesHandler:
service, "read_receipt"
)
if new_token is not None and new_token.stream <= from_key:
logger.debug(
"Rejecting token lower than or equal to stored: %s" % (new_token,)
)
logger.debug("Rejecting token lower than or equal to stored: %s", new_token)
return []
from_token = MultiWriterStreamToken(stream=from_key)
@ -520,9 +518,7 @@ class ApplicationServicesHandler:
service, "presence"
)
if new_token is not None and new_token <= from_key:
logger.debug(
"Rejecting token lower than or equal to stored: %s" % (new_token,)
)
logger.debug("Rejecting token lower than or equal to stored: %s", new_token)
return []
for user in users:

View file

@ -1896,7 +1896,7 @@ def load_single_legacy_password_auth_provider(
try:
provider = module(config=config, account_handler=api)
except Exception as e:
logger.error("Error while initializing %r: %s", module, e)
logger.exception("Error while initializing %r: %s", module, e)
raise
# All methods that the module provides should be async, but this wasn't enforced
@ -2429,7 +2429,7 @@ class PasswordAuthProvider:
except CancelledError:
raise
except Exception as e:
logger.error("Module raised an exception in is_3pid_allowed: %s", e)
logger.exception("Module raised an exception in is_3pid_allowed: %s", e)
raise SynapseError(code=500, msg="Internal Server Error")
return True

View file

@ -96,6 +96,14 @@ class DeactivateAccountHandler:
403, "Deactivation of this user is forbidden", Codes.FORBIDDEN
)
logger.info(
"%s requested deactivation of %s erase_data=%s id_server=%s",
requester.user,
user_id,
erase_data,
id_server,
)
# FIXME: Theoretically there is a race here wherein user resets
# password using threepid.

View file

@ -1604,7 +1604,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
if prev_stream_id is not None and cached_devices == {
d["device_id"]: d for d in devices
}:
logging.info(
logger.info(
"Skipping device list resync for %s, as our cache matches already",
user_id,
)

View file

@ -282,7 +282,7 @@ class DirectoryHandler:
except RequestSendFailed:
raise SynapseError(502, "Failed to fetch alias")
except CodeMessageException as e:
logging.warning(
logger.warning(
"Error retrieving alias %s -> %s %s", room_alias, e.code, e.msg
)
if e.code == 404:

View file

@ -1095,7 +1095,9 @@ class FederationHandler:
rule = invite_config.get_invite_rule(event.sender)
if rule == InviteRule.BLOCK:
logger.info(
f"Automatically rejecting invite from {event.sender} due to the invite filtering rules of {event.state_key}"
"Automatically rejecting invite from %s due to the invite filtering rules of %s",
event.sender,
event.state_key,
)
raise SynapseError(
403,

View file

@ -218,7 +218,7 @@ class IdentityHandler:
return data
except HttpResponseException as e:
logger.error("3PID bind failed with Matrix error: %r", e)
logger.exception("3PID bind failed with Matrix error: %r", e)
raise e.to_synapse_error()
except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
@ -323,7 +323,7 @@ class IdentityHandler:
# The remote server probably doesn't support unbinding (yet)
logger.warning("Received %d response while unbinding threepid", e.code)
else:
logger.error("Failed to unbind threepid on identity server: %s", e)
logger.exception("Failed to unbind threepid on identity server: %s", e)
raise SynapseError(500, "Failed to contact identity server")
except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")

View file

@ -460,7 +460,7 @@ class MessageHandler:
# date from the database in the same database transaction.
await self.store.expire_event(event_id)
except Exception as e:
logger.error("Could not expire event %s: %r", event_id, e)
logger.exception("Could not expire event %s: %r", event_id, e)
# Schedule the expiry of the next event to expire.
await self._schedule_next_expiry()
@ -2065,7 +2065,8 @@ class EventCreationHandler:
# dependent on _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY
logger.info(
"Failed to send dummy event into room %s. Will exclude it from "
"future attempts until cache expires" % (room_id,)
"future attempts until cache expires",
room_id,
)
now = self.clock.time_msec()
self._rooms_to_exclude_from_dummy_event_insertion[room_id] = now
@ -2124,7 +2125,9 @@ class EventCreationHandler:
except AuthError:
logger.info(
"Failed to send dummy event into room %s for user %s due to "
"lack of power. Will try another user" % (room_id, user_id)
"lack of power. Will try another user",
room_id,
user_id,
)
return False

View file

@ -563,12 +563,13 @@ class OidcProvider:
raise ValueError("Unexpected subject")
except Exception:
logger.warning(
f"OIDC Back-Channel Logout is enabled for issuer {self.issuer!r} "
"OIDC Back-Channel Logout is enabled for issuer %r "
"but it looks like the configured `user_mapping_provider` "
"does not use the `sub` claim as subject. If it is the case, "
"and you want Synapse to ignore the `sub` claim in OIDC "
"Back-Channel Logouts, set `backchannel_logout_ignore_sub` "
"to `true` in the issuer config."
"to `true` in the issuer config.",
self.issuer,
)
@property
@ -826,10 +827,10 @@ class OidcProvider:
if response.code < 400:
logger.debug(
"Invalid response from the authorization server: "
'responded with a "{status}" '
"but body has an error field: {error!r}".format(
status=status, error=resp["error"]
)
'responded with a "%s" '
"but body has an error field: %r",
status,
resp["error"],
)
description = resp.get("error_description", error)
@ -1385,7 +1386,8 @@ class OidcProvider:
# support dynamic registration in Synapse at some point.
if not self._config.backchannel_logout_enabled:
logger.warning(
f"Received an OIDC Back-Channel Logout request from issuer {self.issuer!r} but it is disabled in config"
"Received an OIDC Back-Channel Logout request from issuer %r but it is disabled in config",
self.issuer,
)
# TODO: this responds with a 400 status code, which is what the OIDC
@ -1797,5 +1799,5 @@ class JinjaOidcMappingProvider(OidcMappingProvider[JinjaOidcMappingConfig]):
extras[key] = template.render(user=userinfo).strip()
except Exception as e:
# Log an error and skip this value (don't break login for this).
logger.error("Failed to render OIDC extra attribute %s: %s" % (key, e))
logger.exception("Failed to render OIDC extra attribute %s: %s", key, e)
return extras

View file

@ -506,7 +506,7 @@ class RegistrationHandler:
ratelimit=False,
)
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e)
logger.exception("Failed to join new user to %r: %r", r, e)
async def _join_rooms(self, user_id: str) -> None:
"""
@ -596,7 +596,7 @@ class RegistrationHandler:
# moving away from bare excepts is a good thing to do.
logger.error("Failed to join new user to %r: %r", r, e)
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e, exc_info=True)
logger.exception("Failed to join new user to %r: %r", r, e)
async def _auto_join_rooms(self, user_id: str) -> None:
"""Automatically joins users to auto join rooms - creating the room in the first place

View file

@ -0,0 +1,98 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
#
import logging
from http import HTTPStatus
from typing import TYPE_CHECKING
from synapse.api.errors import Codes, SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.types import (
Requester,
)
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class ReportsHandler:
def __init__(self, hs: "HomeServer"):
self._hs = hs
self._store = hs.get_datastores().main
self._clock = hs.get_clock()
# Ratelimiter for management of existing delayed events,
# keyed by the requesting user ID.
self._reports_ratelimiter = Ratelimiter(
store=self._store,
clock=self._clock,
cfg=hs.config.ratelimiting.rc_reports,
)
async def report_user(
self, requester: Requester, target_user_id: str, reason: str
) -> None:
"""Files a report against a user from a user.
Rate and size limits are applied to the report. If the user being reported
does not belong to this server, the report is ignored. This check is done
after the limits to reduce DoS potential.
If the user being reported belongs to this server, but doesn't exist, we
similarly ignore the report. The spec allows us to return an error if we
want to, but we choose to hide that user's existence instead.
If the report is otherwise valid (for a user which exists on our server),
we append it to the database for later processing.
Args:
requester - The user filing the report.
target_user_id - The user being reported.
reason - The user-supplied reason the user is being reported.
Raises:
SynapseError for BAD_REQUEST/BAD_JSON if the reason is too long.
"""
await self._check_limits(requester)
if len(reason) > 1000:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Reason must be less than 1000 characters",
Codes.BAD_JSON,
)
if not self._hs.is_mine_id(target_user_id):
return # hide that they're not ours/that we can't do anything about them
user = await self._store.get_user_by_id(target_user_id)
if user is None:
return # hide that they don't exist
await self._store.add_user_report(
target_user_id=target_user_id,
user_id=requester.user.to_string(),
reason=reason,
received_ts=self._clock.time_msec(),
)
async def _check_limits(self, requester: Requester) -> None:
await self._reports_ratelimiter.ratelimit(
requester,
requester.user.to_string(),
)

View file

@ -701,7 +701,7 @@ class RoomCreationHandler:
except SynapseError as e:
# again I'm not really expecting this to fail, but if it does, I'd rather
# we returned the new room to the client at this point.
logger.error("Unable to send updated alias events in old room: %s", e)
logger.exception("Unable to send updated alias events in old room: %s", e)
try:
await self.event_creation_handler.create_and_send_nonmember_event(
@ -718,7 +718,7 @@ class RoomCreationHandler:
except SynapseError as e:
# again I'm not really expecting this to fail, but if it does, I'd rather
# we returned the new room to the client at this point.
logger.error("Unable to send updated alias events in new room: %s", e)
logger.exception("Unable to send updated alias events in new room: %s", e)
async def create_room(
self,

View file

@ -922,7 +922,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
rule = invite_config.get_invite_rule(requester.user.to_string())
if rule == InviteRule.BLOCK:
logger.info(
f"Automatically rejecting invite from {target_id} due to the the invite filtering rules of {requester.user}"
"Automatically rejecting invite from %s due to the the invite filtering rules of %s",
target_id,
requester.user,
)
raise SynapseError(
403,
@ -1570,7 +1572,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
require_consent=False,
)
except Exception as e:
logger.exception("Error kicking guest user: %s" % (e,))
logger.exception("Error kicking guest user: %s", e)
async def lookup_room_alias(
self, room_alias: RoomAlias

View file

@ -124,7 +124,7 @@ class SamlHandler:
)
# Since SAML sessions timeout it is useful to log when they were created.
logger.info("Initiating a new SAML session: %s" % (reqid,))
logger.info("Initiating a new SAML session: %s", reqid)
now = self.clock.time_msec()
self._outstanding_requests_dict[reqid] = Saml2SessionData(

View file

@ -238,7 +238,7 @@ class SendEmailHandler:
multipart_msg.attach(text_part)
multipart_msg.attach(html_part)
logger.info("Sending email to %s" % email_address)
logger.info("Sending email to %s", email_address)
await self._sendmail(
self._reactor,

View file

@ -23,6 +23,7 @@ from typing import (
List,
Literal,
Mapping,
MutableMapping,
Optional,
Set,
Tuple,
@ -73,6 +74,7 @@ from synapse.types.handlers.sliding_sync import (
SlidingSyncResult,
)
from synapse.types.state import StateFilter
from synapse.util import MutableOverlayMapping
if TYPE_CHECKING:
from synapse.server import HomeServer
@ -245,9 +247,11 @@ class SlidingSyncRoomLists:
# Note: this won't include rooms the user has left themselves. We add back
# `newly_left` rooms below. This is more efficient than fetching all rooms and
# then filtering out the old left rooms.
room_membership_for_user_map = (
await self.store.get_sliding_sync_rooms_for_user_from_membership_snapshots(
user_id
room_membership_for_user_map: MutableMapping[str, RoomsForUserSlidingSync] = (
MutableOverlayMapping(
await self.store.get_sliding_sync_rooms_for_user_from_membership_snapshots(
user_id
)
)
)
# To play nice with the rewind logic below, we need to go fetch the rooms the
@ -268,26 +272,12 @@ class SlidingSyncRoomLists:
)
)
if self_leave_room_membership_for_user_map:
# FIXME: It would be nice to avoid this copy but since
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
# can't return a mutable value like a `dict`. We make the copy to get a
# mutable dict that we can change. We try to only make a copy when necessary
# (if we actually need to change something) as in most cases, the logic
# doesn't need to run.
room_membership_for_user_map = dict(room_membership_for_user_map)
room_membership_for_user_map.update(self_leave_room_membership_for_user_map)
# Remove invites from ignored users
ignored_users = await self.store.ignored_users(user_id)
invite_config = await self.store.get_invite_config_for_user(user_id)
if ignored_users:
# FIXME: It would be nice to avoid this copy but since
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
# can't return a mutable value like a `dict`. We make the copy to get a
# mutable dict that we can change. We try to only make a copy when necessary
# (if we actually need to change something) as in most cases, the logic
# doesn't need to run.
room_membership_for_user_map = dict(room_membership_for_user_map)
# Make a copy so we don't run into an error: `dictionary changed size during
# iteration`, when we remove items
for room_id in list(room_membership_for_user_map.keys()):
@ -316,13 +306,6 @@ class SlidingSyncRoomLists:
sync_config.user, room_membership_for_user_map, to_token=to_token
)
if changes:
# FIXME: It would be nice to avoid this copy but since
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
# can't return a mutable value like a `dict`. We make the copy to get a
# mutable dict that we can change. We try to only make a copy when necessary
# (if we actually need to change something) as in most cases, the logic
# doesn't need to run.
room_membership_for_user_map = dict(room_membership_for_user_map)
for room_id, change in changes.items():
if change is None:
# Remove rooms that the user joined after the `to_token`
@ -364,13 +347,6 @@ class SlidingSyncRoomLists:
newly_left_room_map.keys() - room_membership_for_user_map.keys()
)
if missing_newly_left_rooms:
# FIXME: It would be nice to avoid this copy but since
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
# can't return a mutable value like a `dict`. We make the copy to get a
# mutable dict that we can change. We try to only make a copy when necessary
# (if we actually need to change something) as in most cases, the logic
# doesn't need to run.
room_membership_for_user_map = dict(room_membership_for_user_map)
for room_id in missing_newly_left_rooms:
newly_left_room_for_user = newly_left_room_map[room_id]
# This should be a given
@ -461,6 +437,10 @@ class SlidingSyncRoomLists:
else:
room_membership_for_user_map.pop(room_id, None)
# Remove any rooms that we globally exclude from sync.
for room_id in self.rooms_to_exclude_globally:
room_membership_for_user_map.pop(room_id, None)
dm_room_ids = await self._get_dm_rooms_for_user(user_id)
if sync_config.lists:
@ -577,14 +557,6 @@ class SlidingSyncRoomLists:
if sync_config.room_subscriptions:
with start_active_span("assemble_room_subscriptions"):
# FIXME: It would be nice to avoid this copy but since
# `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it
# can't return a mutable value like a `dict`. We make the copy to get a
# mutable dict that we can change. We try to only make a copy when necessary
# (if we actually need to change something) as in most cases, the logic
# doesn't need to run.
room_membership_for_user_map = dict(room_membership_for_user_map)
# Find which rooms are partially stated and may need to be filtered out
# depending on the `required_state` requested (see below).
partial_state_rooms = await self.store.get_partial_rooms()

View file

@ -1230,12 +1230,16 @@ class SsoHandler:
if expected_user_id is not None and user_id != expected_user_id:
logger.error(
"Received a logout notification from SSO provider "
f"{auth_provider_id!r} for the user {expected_user_id!r}, but with "
f"a session ID ({auth_provider_session_id!r}) which belongs to "
f"{user_id!r}. This may happen when the SSO provider user mapper "
"%r for the user %r, but with "
"a session ID (%r) which belongs to "
"%r. This may happen when the SSO provider user mapper "
"uses something else than the standard attribute as mapping ID. "
"For OIDC providers, set `backchannel_logout_ignore_sub` to `true` "
"in the provider config if that is the case."
"in the provider config if that is the case.",
auth_provider_id,
expected_user_id,
auth_provider_session_id,
user_id,
)
continue

View file

@ -3083,8 +3083,10 @@ class SyncHandler:
if batch.limited and since_token:
user_id = sync_result_builder.sync_config.user.to_string()
logger.debug(
"Incremental gappy sync of %s for user %s with %d state events"
% (room_id, user_id, len(state))
"Incremental gappy sync of %s for user %s with %d state events",
room_id,
user_id,
len(state),
)
elif room_builder.rtype == "archived":
archived_room_sync = ArchivedSyncResult(

View file

@ -750,10 +750,9 @@ class UserDirectoryHandler(StateDeltasHandler):
)
continue
except Exception:
logger.error(
logger.exception(
"Failed to refresh profile for %r due to unhandled exception",
user_id,
exc_info=True,
)
await self.store.set_remote_user_profile_in_user_dir_stale(
user_id,

View file

@ -44,12 +44,15 @@ from synapse.logging.opentracing import start_active_span
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage.databases.main.lock import Lock, LockStore
from synapse.util.async_helpers import timeout_deferred
from synapse.util.constants import ONE_MINUTE_SECONDS
if TYPE_CHECKING:
from synapse.logging.opentracing import opentracing
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
# This lock is used to avoid creating an event while we are purging the room.
# We take a read lock when creating an event, and a write one when purging a room.
# This is because it is fine to create several events concurrently, since referenced events
@ -270,9 +273,10 @@ class WaitingLock:
def _get_next_retry_interval(self) -> float:
next = self._retry_interval
self._retry_interval = max(5, next * 2)
if self._retry_interval > 5 * 2 ^ 7: # ~10 minutes
logging.warning(
f"Lock timeout is getting excessive: {self._retry_interval}s. There may be a deadlock."
if self._retry_interval > 10 * ONE_MINUTE_SECONDS: # >7 iterations
logger.warning(
"Lock timeout is getting excessive: %ss. There may be a deadlock.",
self._retry_interval,
)
return next * random.uniform(0.9, 1.1)
@ -349,8 +353,9 @@ class WaitingMultiLock:
def _get_next_retry_interval(self) -> float:
next = self._retry_interval
self._retry_interval = max(5, next * 2)
if self._retry_interval > 5 * 2 ^ 7: # ~10 minutes
logging.warning(
f"Lock timeout is getting excessive: {self._retry_interval}s. There may be a deadlock."
if self._retry_interval > 10 * ONE_MINUTE_SECONDS: # >7 iterations
logger.warning(
"Lock timeout is getting excessive: %ss. There may be a deadlock.",
self._retry_interval,
)
return next * random.uniform(0.9, 1.1)

View file

@ -213,7 +213,7 @@ class _IPBlockingResolver:
if _is_ip_blocked(ip_address, self._ip_allowlist, self._ip_blocklist):
logger.info(
"Blocked %s from DNS resolution to %s" % (ip_address, hostname)
"Blocked %s from DNS resolution to %s", ip_address, hostname
)
has_bad_ip = True
@ -318,7 +318,7 @@ class BlocklistingAgentWrapper(Agent):
pass
else:
if _is_ip_blocked(ip_address, self._ip_allowlist, self._ip_blocklist):
logger.info("Blocking access to %s" % (ip_address,))
logger.info("Blocking access to %s", ip_address)
e = SynapseError(HTTPStatus.FORBIDDEN, "IP address blocked")
return defer.fail(Failure(e))
@ -723,7 +723,7 @@ class BaseHttpClient:
resp_headers = dict(response.headers.getAllRawHeaders())
if response.code > 299:
logger.warning("Got %d when downloading %s" % (response.code, url))
logger.warning("Got %d when downloading %s", response.code, url)
raise SynapseError(
HTTPStatus.BAD_GATEWAY, "Got error %d" % (response.code,), Codes.UNKNOWN
)
@ -1106,7 +1106,7 @@ class _MultipartParserProtocol(protocol.Protocol):
self.stream.write(data[start:end])
except Exception as e:
logger.warning(
f"Exception encountered writing file data to stream: {e}"
"Exception encountered writing file data to stream: %s", e
)
self.deferred.errback()
self.file_length += end - start
@ -1129,7 +1129,7 @@ class _MultipartParserProtocol(protocol.Protocol):
try:
self.parser.write(incoming_data)
except Exception as e:
logger.warning(f"Exception writing to multipart parser: {e}")
logger.warning("Exception writing to multipart parser: %s", e)
self.deferred.errback()
return

View file

@ -605,7 +605,7 @@ class MatrixFederationHttpClient:
try:
parse_and_validate_server_name(request.destination)
except ValueError:
logger.exception(f"Invalid destination: {request.destination}.")
logger.exception("Invalid destination: %s.", request.destination)
raise FederationDeniedError(request.destination)
if timeout is not None:

View file

@ -796,6 +796,13 @@ def inject_response_headers(response_headers: Headers) -> None:
response_headers.addRawHeader("Synapse-Trace-Id", f"{trace_id:x}")
@ensure_active_span("inject the span into a header dict")
def inject_request_headers(headers: Dict[str, str]) -> None:
span = opentracing.tracer.active_span
assert span is not None
opentracing.tracer.inject(span.context, opentracing.Format.HTTP_HEADERS, headers)
@ensure_active_span(
"get the active span context as a dict", ret=cast(Dict[str, str], {})
)

View file

@ -313,7 +313,7 @@ class MediaRepository:
logger.info("Stored local media in file %r", fname)
if should_quarantine:
logger.warn(
logger.warning(
"Media has been automatically quarantined as it matched existing quarantined media"
)
@ -366,7 +366,7 @@ class MediaRepository:
logger.info("Stored local media in file %r", fname)
if should_quarantine:
logger.warn(
logger.warning(
"Media has been automatically quarantined as it matched existing quarantined media"
)
@ -1393,8 +1393,8 @@ class MediaRepository:
)
logger.info(
"Purging remote media last accessed before"
f" {remote_media_threshold_timestamp_ms}"
"Purging remote media last accessed before %s",
remote_media_threshold_timestamp_ms,
)
await self.delete_old_remote_media(
@ -1409,8 +1409,8 @@ class MediaRepository:
)
logger.info(
"Purging local media last accessed before"
f" {local_media_threshold_timestamp_ms}"
"Purging local media last accessed before %s",
local_media_threshold_timestamp_ms,
)
await self.delete_old_local_media(

View file

@ -288,7 +288,7 @@ class UrlPreviewer:
og["og:image:width"] = dims["width"]
og["og:image:height"] = dims["height"]
else:
logger.warning("Couldn't get dims for %s" % url)
logger.warning("Couldn't get dims for %s", url)
# define our OG response for this media
elif _is_html(media_info.media_type):
@ -610,7 +610,7 @@ class UrlPreviewer:
should_quarantine = await self.store.get_is_hash_quarantined(sha256)
if should_quarantine:
logger.warn(
logger.warning(
"Media has been automatically quarantined as it matched existing quarantined media"
)

View file

@ -50,7 +50,7 @@ from synapse.event_auth import auth_types_for_event, get_user_power_level
from synapse.events import EventBase, relation_from_event
from synapse.events.snapshot import EventContext
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.state import POWER_KEY
from synapse.state import CREATE_KEY, POWER_KEY
from synapse.storage.databases.main.roommember import EventIdMembership
from synapse.storage.invite_rule import InviteRule
from synapse.storage.roommember import ProfileInfo
@ -248,6 +248,7 @@ class BulkPushRuleEvaluator:
StateFilter.from_types(event_types)
)
pl_event_id = prev_state_ids.get(POWER_KEY)
create_event_id = prev_state_ids.get(CREATE_KEY)
# fastpath: if there's a power level event, that's all we need, and
# not having a power level event is an extreme edge case
@ -270,6 +271,26 @@ class BulkPushRuleEvaluator:
if auth_event:
auth_events_dict[auth_event_id] = auth_event
auth_events = {(e.type, e.state_key): e for e in auth_events_dict.values()}
if auth_events.get(CREATE_KEY) is None:
# if the event being checked is the create event, use its own permissions
if event.type == EventTypes.Create and event.get_state_key() == "":
auth_events[CREATE_KEY] = event
else:
auth_events[
CREATE_KEY
] = await self.store.get_create_event_for_room(event.room_id)
# if we are evaluating the create event, then use itself to determine power levels.
if event.type == EventTypes.Create and event.get_state_key() == "":
auth_events[CREATE_KEY] = event
else:
# if we aren't processing the create event, create_event_id should always be set
assert create_event_id is not None
create_event = event_id_to_event.get(create_event_id)
if create_event:
auth_events[CREATE_KEY] = create_event
else:
auth_events[CREATE_KEY] = await self.store.get_event(create_event_id)
sender_level = get_user_power_level(event.sender, auth_events)

View file

@ -135,7 +135,7 @@ class Mailer:
self.app_name = app_name
self.email_subjects: EmailSubjectConfig = hs.config.email.email_subjects
logger.info("Created Mailer for app_name %s" % app_name)
logger.info("Created Mailer for app_name %s", app_name)
emails_sent_counter.labels("password_reset")

View file

@ -165,7 +165,7 @@ class ClientRestResource(JsonResource):
# Fail on unknown servlet groups.
if servlet_group not in SERVLET_GROUPS:
if servlet_group == "media":
logger.warn(
logger.warning(
"media.can_load_media_repo needs to be configured for the media servlet to be available"
)
raise RuntimeError(

View file

@ -71,7 +71,7 @@ class QuarantineMediaInRoom(RestServlet):
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester)
logging.info("Quarantining room: %s", room_id)
logger.info("Quarantining room: %s", room_id)
# Quarantine all media in this room
num_quarantined = await self.store.quarantine_media_ids_in_room(
@ -98,7 +98,7 @@ class QuarantineMediaByUser(RestServlet):
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester)
logging.info("Quarantining media by user: %s", user_id)
logger.info("Quarantining media by user: %s", user_id)
# Quarantine all media this user has uploaded
num_quarantined = await self.store.quarantine_media_ids_by_user(
@ -127,7 +127,7 @@ class QuarantineMediaByID(RestServlet):
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester)
logging.info("Quarantining media by ID: %s/%s", server_name, media_id)
logger.info("Quarantining media by ID: %s/%s", server_name, media_id)
# Quarantine this media id
await self.store.quarantine_media_by_id(
@ -155,7 +155,7 @@ class UnquarantineMediaByID(RestServlet):
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self.auth, request)
logging.info("Remove from quarantine media by ID: %s/%s", server_name, media_id)
logger.info("Remove from quarantine media by ID: %s/%s", server_name, media_id)
# Remove from quarantine this media id
await self.store.quarantine_media_by_id(server_name, media_id, None)
@ -177,7 +177,7 @@ class ProtectMediaByID(RestServlet):
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self.auth, request)
logging.info("Protecting local media by ID: %s", media_id)
logger.info("Protecting local media by ID: %s", media_id)
# Protect this media id
await self.store.mark_local_media_as_safe(media_id, safe=True)
@ -199,7 +199,7 @@ class UnprotectMediaByID(RestServlet):
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self.auth, request)
logging.info("Unprotecting local media by ID: %s", media_id)
logger.info("Unprotecting local media by ID: %s", media_id)
# Unprotect this media id
await self.store.mark_local_media_as_safe(media_id, safe=False)
@ -280,7 +280,7 @@ class DeleteMediaByID(RestServlet):
if await self.store.get_local_media(media_id) is None:
raise NotFoundError("Unknown media")
logging.info("Deleting local media by ID: %s", media_id)
logger.info("Deleting local media by ID: %s", media_id)
deleted_media, total = await self.media_repository.delete_local_media_ids(
[media_id]
@ -327,9 +327,11 @@ class DeleteMediaByDateSize(RestServlet):
if server_name is not None and self.server_name != server_name:
raise SynapseError(HTTPStatus.BAD_REQUEST, "Can only delete local media")
logging.info(
"Deleting local media by timestamp: %s, size larger than: %s, keep profile media: %s"
% (before_ts, size_gt, keep_profiles)
logger.info(
"Deleting local media by timestamp: %s, size larger than: %s, keep profile media: %s",
before_ts,
size_gt,
keep_profiles,
)
deleted_media, total = await self.media_repository.delete_old_local_media(

View file

@ -150,6 +150,44 @@ class ReportRoomRestServlet(RestServlet):
return 200, {}
class ReportUserRestServlet(RestServlet):
"""This endpoint lets clients report a user for abuse.
Introduced by MSC4260: https://github.com/matrix-org/matrix-spec-proposals/pull/4260
"""
PATTERNS = list(
client_patterns(
"/users/(?P<target_user_id>[^/]*)/report$",
releases=("v3",),
unstable=False,
v1=False,
)
)
def __init__(self, hs: "HomeServer"):
super().__init__()
self.hs = hs
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.store = hs.get_datastores().main
self.handler = hs.get_reports_handler()
class PostBody(RequestBodyModel):
reason: StrictStr
async def on_POST(
self, request: SynapseRequest, target_user_id: str
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
body = parse_and_validate_json_object_from_request(request, self.PostBody)
await self.handler.report_user(requester, target_user_id, body.reason)
return 200, {}
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReportEventRestServlet(hs).register(http_server)
ReportRoomRestServlet(hs).register(http_server)
ReportUserRestServlet(hs).register(http_server)

View file

@ -64,6 +64,7 @@ from synapse.logging.opentracing import set_tag
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.client._base import client_patterns
from synapse.rest.client.transactions import HttpTransactionCache
from synapse.state import CREATE_KEY, POWER_KEY
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester, StreamToken, ThirdPartyInstanceID, UserID
from synapse.types.state import StateFilter
@ -924,16 +925,16 @@ class RoomEventServlet(RestServlet):
if include_unredacted_content and not await self.auth.is_server_admin(
requester
):
power_level_event = (
await self._storage_controllers.state.get_current_state_event(
room_id, EventTypes.PowerLevels, ""
)
auth_events = await self._storage_controllers.state.get_current_state(
room_id,
StateFilter.from_types(
[
POWER_KEY,
CREATE_KEY,
]
),
)
auth_events = {}
if power_level_event:
auth_events[(EventTypes.PowerLevels, "")] = power_level_event
redact_level = event_auth.get_named_level(auth_events, "redact", 50)
user_level = event_auth.get_user_power_level(
requester.user.to_string(), auth_events

View file

@ -95,6 +95,7 @@ from synapse.handlers.read_marker import ReadMarkerHandler
from synapse.handlers.receipts import ReceiptsHandler
from synapse.handlers.register import RegistrationHandler
from synapse.handlers.relations import RelationsHandler
from synapse.handlers.reports import ReportsHandler
from synapse.handlers.room import (
RoomContextHandler,
RoomCreationHandler,
@ -729,6 +730,10 @@ class HomeServer(metaclass=abc.ABCMeta):
def get_receipts_handler(self) -> ReceiptsHandler:
return ReceiptsHandler(self)
@cache_in_self
def get_reports_handler(self) -> ReportsHandler:
return ReportsHandler(self)
@cache_in_self
def get_read_marker_handler(self) -> ReadMarkerHandler:
return ReadMarkerHandler(self)

View file

@ -83,6 +83,7 @@ EVICTION_TIMEOUT_SECONDS = 60 * 60
_NEXT_STATE_ID = 1
CREATE_KEY = (EventTypes.Create, "")
POWER_KEY = (EventTypes.PowerLevels, "")

View file

@ -255,7 +255,7 @@ async def _get_power_level_for_sender(
)
if aev and (aev.type, aev.state_key) == (EventTypes.Create, ""):
creator = (
event.sender
aev.sender
if event.room_version.implicit_room_creator
else aev.content.get("creator")
)

View file

@ -242,5 +242,5 @@ def db_to_json(db_content: Union[memoryview, bytes, bytearray, str]) -> Any:
try:
return json_decoder.decode(db_content)
except Exception:
logging.warning("Tried to decode '%r' as JSON and failed", db_content)
logger.warning("Tried to decode '%r' as JSON and failed", db_content)
raise

View file

@ -42,6 +42,7 @@ from synapse.logging.opentracing import (
start_active_span,
trace,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
@ -52,7 +53,7 @@ from synapse.storage.database import (
)
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util import Duration, json_encoder
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.stringutils import parse_and_validate_server_name
@ -63,6 +64,18 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# How long to keep messages in the device federation inbox before deleting them.
DEVICE_FEDERATION_INBOX_CLEANUP_DELAY_MS = 7 * Duration.DAY_MS
# How often to run the task to clean up old device_federation_inbox rows.
DEVICE_FEDERATION_INBOX_CLEANUP_INTERVAL_MS = 5 * Duration.MINUTE_MS
# Update name for the device federation inbox received timestamp index.
DEVICE_FEDERATION_INBOX_RECEIVED_INDEX_UPDATE = (
"device_federation_inbox_received_ts_index"
)
class DeviceInboxWorkerStore(SQLBaseStore):
def __init__(
self,
@ -137,6 +150,14 @@ class DeviceInboxWorkerStore(SQLBaseStore):
prefilled_cache=device_outbox_prefill,
)
if hs.config.worker.run_background_tasks:
self._clock.looping_call(
run_as_background_process,
DEVICE_FEDERATION_INBOX_CLEANUP_INTERVAL_MS,
"_delete_old_federation_inbox_rows",
self._delete_old_federation_inbox_rows,
)
def process_replication_rows(
self,
stream_name: str,
@ -963,6 +984,52 @@ class DeviceInboxWorkerStore(SQLBaseStore):
],
)
async def _delete_old_federation_inbox_rows(self, batch_size: int = 1000) -> None:
"""Delete old rows from the device_federation_inbox table."""
# We wait until we have the index on `received_ts`, otherwise the query
# will take a very long time.
if not await self.db_pool.updates.has_completed_background_update(
DEVICE_FEDERATION_INBOX_RECEIVED_INDEX_UPDATE
):
return
def _delete_old_federation_inbox_rows_txn(txn: LoggingTransaction) -> bool:
# We delete at most 100 rows that are older than
# DEVICE_FEDERATION_INBOX_CLEANUP_DELAY_MS
delete_before_ts = (
self._clock.time_msec() - DEVICE_FEDERATION_INBOX_CLEANUP_DELAY_MS
)
sql = """
WITH to_delete AS (
SELECT origin, message_id
FROM device_federation_inbox
WHERE received_ts < ?
ORDER BY received_ts ASC
LIMIT ?
)
DELETE FROM device_federation_inbox
WHERE
(origin, message_id) IN (
SELECT origin, message_id FROM to_delete
)
"""
txn.execute(sql, (delete_before_ts, batch_size))
return txn.rowcount < batch_size
while True:
finished = await self.db_pool.runInteraction(
"_delete_old_federation_inbox_rows",
_delete_old_federation_inbox_rows_txn,
db_autocommit=True, # We don't need to run in a transaction
)
if finished:
return
# We sleep a bit so that we don't hammer the database in a tight
# loop first time we run this.
self._clock.sleep(1)
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
@ -998,6 +1065,13 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
self._cleanup_device_federation_outbox,
)
self.db_pool.updates.register_background_index_update(
update_name=DEVICE_FEDERATION_INBOX_RECEIVED_INDEX_UPDATE,
index_name="device_federation_inbox_received_ts_index",
table="device_federation_inbox",
columns=["received_ts"],
)
async def _background_drop_index_device_inbox(
self, progress: JsonDict, batch_size: int
) -> int:

View file

@ -331,7 +331,7 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore):
values={"timestamp": int(self._clock.time_msec())},
)
else:
logger.warning("mau limit reserved threepid %s not found in db" % tp)
logger.warning("mau limit reserved threepid %s not found in db", tp)
async def upsert_monthly_active_user(self, user_id: str) -> None:
"""Updates or inserts the user into the monthly active user table, which

View file

@ -2421,6 +2421,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
self._room_reports_id_gen = IdGenerator(db_conn, "room_reports", "id")
self._user_reports_id_gen = IdGenerator(db_conn, "user_reports", "id")
self._instance_name = hs.get_instance_name()
@ -2662,6 +2663,37 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
)
return next_id
async def add_user_report(
self,
target_user_id: str,
user_id: str,
reason: str,
received_ts: int,
) -> int:
"""Add a user report
Args:
target_user_id: The user ID being reported.
user_id: User who reported the user.
reason: Description that the user specifies.
received_ts: Time when the user submitted the report (milliseconds).
Returns:
ID of the room report.
"""
next_id = self._user_reports_id_gen.get_next()
await self.db_pool.simple_insert(
table="user_reports",
values={
"id": next_id,
"received_ts": received_ts,
"target_user_id": target_user_id,
"user_id": user_id,
"reason": reason,
},
desc="add_user_report",
)
return next_id
async def clear_partial_state_room(self, room_id: str) -> Optional[int]:
"""Clears the partial state flag for a room.

View file

@ -253,8 +253,9 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
return 1
logger.debug(
"Processing the next %d rooms of %d remaining"
% (len(rooms_to_work_on), progress["remaining"])
"Processing the next %d rooms of %d remaining",
len(rooms_to_work_on),
progress["remaining"],
)
processed_event_count = 0

View file

@ -50,7 +50,9 @@ class InviteRulesConfig:
except Exception as e:
# If for whatever reason we can't process this, just ignore it.
logger.debug(
f"Could not process '{value}' field of invite rule config, ignoring: {e}"
"Could not process '%s' field of invite rule config, ignoring: %s",
value,
e,
)
if account_data:

View file

@ -63,8 +63,11 @@ def run_upgrade(
if user_id in owned.keys():
logger.error(
"user_id %s was owned by more than one application"
" service (IDs %s and %s); assigning arbitrarily to %s"
% (user_id, owned[user_id], appservice.id, owned[user_id])
" service (IDs %s and %s); assigning arbitrarily to %s",
user_id,
owned[user_id],
appservice.id,
owned[user_id],
)
owned.setdefault(appservice.id, []).append(user_id)

View file

@ -0,0 +1,16 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- Background update that adds an index to `device_federation_inbox.received_ts`
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(9206, 'device_federation_inbox_received_ts_index', '{}');

View file

@ -0,0 +1,22 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
CREATE TABLE user_reports (
id BIGINT NOT NULL PRIMARY KEY,
received_ts BIGINT NOT NULL,
target_user_id TEXT NOT NULL,
user_id TEXT NOT NULL,
reason TEXT NOT NULL
);
CREATE INDEX user_reports_target_user_id ON user_reports(target_user_id); -- for lookups
CREATE INDEX user_reports_user_id ON user_reports(user_id); -- for lookups

View file

@ -0,0 +1,24 @@
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
from typing import Awaitable, Mapping
class HttpClient:
def __init__(self, user_agent: str) -> None: ...
def get(self, url: str, response_limit: int) -> Awaitable[bytes]: ...
def post(
self,
url: str,
response_limit: int,
headers: Mapping[str, str],
request_body: str,
) -> Awaitable[bytes]: ...

View file

@ -19,10 +19,22 @@
#
#
import collections.abc
import json
import logging
import typing
from typing import Any, Callable, Dict, Generator, Optional, Sequence
from typing import (
Any,
Callable,
Dict,
Generator,
Iterator,
Mapping,
Optional,
Sequence,
Set,
TypeVar,
)
import attr
from immutabledict import immutabledict
@ -43,6 +55,15 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
class Duration:
"""Helper class that holds constants for common time durations in
milliseconds."""
MINUTE_MS = 60 * 1000
HOUR_MS = 60 * MINUTE_MS
DAY_MS = 24 * HOUR_MS
def _reject_invalid_json(val: Any) -> None:
"""Do not allow Infinity, -Infinity, or NaN values in JSON."""
raise ValueError("Invalid JSON value: '%s'" % val)
@ -251,3 +272,72 @@ class ExceptionBundle(Exception):
parts.append(str(e))
super().__init__("\n - ".join(parts))
self.exceptions = exceptions
K = TypeVar("K")
V = TypeVar("V")
@attr.s(slots=True, auto_attribs=True)
class MutableOverlayMapping(collections.abc.MutableMapping[K, V]):
"""A mutable mapping that allows changes to a read-only underlying
mapping. Supports deletions.
This is useful for cases where you want to allow modifications to a mapping
without changing or copying the original mapping.
Note: the underlying mapping must not change while this proxy is in use.
"""
_underlying_map: Mapping[K, V]
_mutable_map: Dict[K, V] = attr.ib(factory=dict)
_deletions: Set[K] = attr.ib(factory=set)
def __getitem__(self, key: K) -> V:
if key in self._deletions:
raise KeyError(key)
if key in self._mutable_map:
return self._mutable_map[key]
return self._underlying_map[key]
def __setitem__(self, key: K, value: V) -> None:
self._deletions.discard(key)
self._mutable_map[key] = value
def __delitem__(self, key: K) -> None:
if key not in self:
raise KeyError(key)
self._deletions.add(key)
self._mutable_map.pop(key, None)
def __iter__(self) -> Iterator[K]:
for key in self._mutable_map:
if key not in self._deletions:
yield key
for key in self._underlying_map:
if key not in self._deletions and key not in self._mutable_map:
# `key` should not be in both _mutable_map and _deletions
assert key not in self._mutable_map
yield key
def __len__(self) -> int:
count = len(self._underlying_map)
for key in self._deletions:
if key in self._underlying_map:
count -= 1
for key in self._mutable_map:
# `key` should not be in both _mutable_map and _deletions
assert key not in self._deletions
if key not in self._underlying_map:
count += 1
return count
def clear(self) -> None:
self._underlying_map = {}
self._mutable_map.clear()
self._deletions.clear()

View file

@ -37,6 +37,8 @@ DISTRIBUTION_NAME = "matrix-synapse"
__all__ = ["check_requirements"]
logger = logging.getLogger(__name__)
class DependencyException(Exception):
@property
@ -211,6 +213,6 @@ def check_requirements(extra: Optional[str] = None) -> None:
if deps_unfulfilled:
for err in errors:
logging.error(err)
logger.error(err)
raise DependencyException(deps_unfulfilled)

20
synapse/util/constants.py Normal file
View file

@ -0,0 +1,20 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Time-based constants.
#
# Laying these out incrementally, even if only some are required, helps with
# readability and catching bugs.
ONE_MINUTE_SECONDS = 60
ONE_HOUR_SECONDS = 60 * ONE_MINUTE_SECONDS

View file

@ -133,7 +133,7 @@ def daemonize_process(pid_file: str, logger: logging.Logger, chdir: str = "/") -
# write a log line on SIGTERM.
def sigterm(signum: int, frame: Optional[FrameType]) -> NoReturn:
logger.warning("Caught signal %s. Stopping daemon." % signum)
logger.warning("Caught signal %s. Stopping daemon.", signum)
sys.exit(0)
signal.signal(signal.SIGTERM, sigterm)

View file

@ -441,7 +441,8 @@ class TaskScheduler:
except Exception:
f = Failure()
logger.error(
f"scheduled task {task.id} failed",
"scheduled task %s failed",
task.id,
exc_info=(f.type, f.value, f.getTracebackObject()),
)
status = TaskStatus.FAILED
@ -474,8 +475,10 @@ class TaskScheduler:
self._clock.time_msec()
> task.timestamp + TaskScheduler.LAST_UPDATE_BEFORE_WARNING_MS
):
logger.warn(
f"Task {task.id} (action {task.action}) has seen no update for more than 24h and may be stuck"
logger.warning(
"Task %s (action %s) has seen no update for more than 24h and may be stuck",
task.id,
task.action,
)
if task.id in self._running_tasks:

View file

@ -45,6 +45,8 @@ from synapse.util import Clock
from tests import unittest
from tests.unittest import override_config
logger = logging.getLogger(__name__)
class FederationServerTests(unittest.FederatingHomeserverTestCase):
servlets = [
@ -252,7 +254,7 @@ class MessageAcceptTests(unittest.FederatingHomeserverTestCase):
class ServerACLsTestCase(unittest.TestCase):
def test_blocked_server(self) -> None:
e = _create_acl_event({"allow": ["*"], "deny": ["evil.com"]})
logging.info("ACL event: %s", e.content)
logger.info("ACL event: %s", e.content)
server_acl_evalutor = server_acl_evaluator_from_event(e)
@ -266,7 +268,7 @@ class ServerACLsTestCase(unittest.TestCase):
def test_block_ip_literals(self) -> None:
e = _create_acl_event({"allow_ip_literals": False, "allow": ["*"]})
logging.info("ACL event: %s", e.content)
logger.info("ACL event: %s", e.content)
server_acl_evalutor = server_acl_evaluator_from_event(e)

View file

@ -19,9 +19,10 @@
#
#
import json
from http import HTTPStatus
from io import BytesIO
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, Union
from unittest.mock import ANY, AsyncMock, Mock
from urllib.parse import parse_qs
@ -33,12 +34,11 @@ from signedjson.key import (
from signedjson.sign import sign_json
from twisted.test.proto_helpers import MemoryReactor
from twisted.web.http_headers import Headers
from twisted.web.iweb import IResponse
from synapse.api.errors import (
AuthError,
Codes,
HttpResponseException,
InvalidClientTokenError,
OAuthInsufficientScopeError,
SynapseError,
@ -52,7 +52,7 @@ from synapse.types import JsonDict, UserID
from synapse.util import Clock
from tests.server import FakeChannel
from tests.test_utils import FakeResponse, get_awaitable_result
from tests.test_utils import get_awaitable_result
from tests.unittest import HomeserverTestCase, override_config, skip_unless
from tests.utils import HAS_AUTHLIB, checked_cast, mock_getRawHeaders
@ -145,6 +145,9 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
self.auth = checked_cast(MSC3861DelegatedAuth, hs.get_auth())
self._rust_client = Mock(spec=["post"])
self.auth._rust_http_client = self._rust_client
return hs
def prepare(
@ -157,9 +160,15 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
store.store_device(USER_ID, DEVICE, initial_device_display_name=None)
)
def _set_introspection_returnvalue(self, response_value: Any) -> AsyncMock:
self._rust_client.post = mock = AsyncMock(
return_value=json.dumps(response_value).encode("utf-8")
)
return mock
def _assertParams(self) -> None:
"""Assert that the request parameters are correct."""
params = parse_qs(self.http_client.request.call_args[1]["data"].decode("utf-8"))
params = parse_qs(self._rust_client.post.call_args[1]["request_body"])
self.assertEqual(params["token"], ["mockAccessToken"])
self.assertEqual(params["client_id"], [CLIENT_ID])
self.assertEqual(params["client_secret"], [CLIENT_SECRET])
@ -167,128 +176,125 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
def test_inactive_token(self) -> None:
"""The handler should return a 403 where the token is inactive."""
self.http_client.request = AsyncMock(
return_value=FakeResponse.json(
code=200,
payload={"active": False},
)
)
self._set_introspection_returnvalue({"active": False})
request = Mock(args={})
request.args[b"access_token"] = [b"mockAccessToken"]
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
self.get_failure(self.auth.get_user_by_req(request), InvalidClientTokenError)
self.http_client.get_json.assert_called_once_with(WELL_KNOWN)
self.http_client.request.assert_called_once_with(
method="POST", uri=INTROSPECTION_ENDPOINT, data=ANY, headers=ANY
self._rust_client.post.assert_called_once_with(
url=INTROSPECTION_ENDPOINT,
response_limit=ANY,
request_body=ANY,
headers=ANY,
)
self._assertParams()
def test_active_no_scope(self) -> None:
"""The handler should return a 403 where no scope is given."""
self.http_client.request = AsyncMock(
return_value=FakeResponse.json(
code=200,
payload={"active": True},
)
)
self._set_introspection_returnvalue({"active": True})
request = Mock(args={})
request.args[b"access_token"] = [b"mockAccessToken"]
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
self.get_failure(self.auth.get_user_by_req(request), InvalidClientTokenError)
self.http_client.get_json.assert_called_once_with(WELL_KNOWN)
self.http_client.request.assert_called_once_with(
method="POST", uri=INTROSPECTION_ENDPOINT, data=ANY, headers=ANY
self._rust_client.post.assert_called_once_with(
url=INTROSPECTION_ENDPOINT,
response_limit=ANY,
request_body=ANY,
headers=ANY,
)
self._assertParams()
def test_active_user_no_subject(self) -> None:
"""The handler should return a 500 when no subject is present."""
self.http_client.request = AsyncMock(
return_value=FakeResponse.json(
code=200,
payload={"active": True, "scope": " ".join([MATRIX_USER_SCOPE])},
)
self._set_introspection_returnvalue(
{"active": True, "scope": " ".join([MATRIX_USER_SCOPE])}
)
request = Mock(args={})
request.args[b"access_token"] = [b"mockAccessToken"]
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
self.get_failure(self.auth.get_user_by_req(request), InvalidClientTokenError)
self.http_client.get_json.assert_called_once_with(WELL_KNOWN)
self.http_client.request.assert_called_once_with(
method="POST", uri=INTROSPECTION_ENDPOINT, data=ANY, headers=ANY
self._rust_client.post.assert_called_once_with(
url=INTROSPECTION_ENDPOINT,
response_limit=ANY,
request_body=ANY,
headers=ANY,
)
self._assertParams()
def test_active_no_user_scope(self) -> None:
"""The handler should return a 500 when no subject is present."""
self.http_client.request = AsyncMock(
return_value=FakeResponse.json(
code=200,
payload={
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_DEVICE_SCOPE]),
},
)
self._set_introspection_returnvalue(
{
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_DEVICE_SCOPE]),
}
)
request = Mock(args={})
request.args[b"access_token"] = [b"mockAccessToken"]
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
self.get_failure(self.auth.get_user_by_req(request), InvalidClientTokenError)
self.http_client.get_json.assert_called_once_with(WELL_KNOWN)
self.http_client.request.assert_called_once_with(
method="POST", uri=INTROSPECTION_ENDPOINT, data=ANY, headers=ANY
self._rust_client.post.assert_called_once_with(
url=INTROSPECTION_ENDPOINT,
response_limit=ANY,
request_body=ANY,
headers=ANY,
)
self._assertParams()
def test_active_admin_not_user(self) -> None:
"""The handler should raise when the scope has admin right but not user."""
self.http_client.request = AsyncMock(
return_value=FakeResponse.json(
code=200,
payload={
"active": True,
"sub": SUBJECT,
"scope": " ".join([SYNAPSE_ADMIN_SCOPE]),
"username": USERNAME,
},
)
self._set_introspection_returnvalue(
{
"active": True,
"sub": SUBJECT,
"scope": " ".join([SYNAPSE_ADMIN_SCOPE]),
"username": USERNAME,
}
)
request = Mock(args={})
request.args[b"access_token"] = [b"mockAccessToken"]
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
self.get_failure(self.auth.get_user_by_req(request), InvalidClientTokenError)
self.http_client.get_json.assert_called_once_with(WELL_KNOWN)
self.http_client.request.assert_called_once_with(
method="POST", uri=INTROSPECTION_ENDPOINT, data=ANY, headers=ANY
self._rust_client.post.assert_called_once_with(
url=INTROSPECTION_ENDPOINT,
response_limit=ANY,
request_body=ANY,
headers=ANY,
)
self._assertParams()
def test_active_admin(self) -> None:
"""The handler should return a requester with admin rights."""
self.http_client.request = AsyncMock(
return_value=FakeResponse.json(
code=200,
payload={
"active": True,
"sub": SUBJECT,
"scope": " ".join([SYNAPSE_ADMIN_SCOPE, MATRIX_USER_SCOPE]),
"username": USERNAME,
},
)
self._set_introspection_returnvalue(
{
"active": True,
"sub": SUBJECT,
"scope": " ".join([SYNAPSE_ADMIN_SCOPE, MATRIX_USER_SCOPE]),
"username": USERNAME,
}
)
request = Mock(args={})
request.args[b"access_token"] = [b"mockAccessToken"]
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
requester = self.get_success(self.auth.get_user_by_req(request))
self.http_client.get_json.assert_called_once_with(WELL_KNOWN)
self.http_client.request.assert_called_once_with(
method="POST", uri=INTROSPECTION_ENDPOINT, data=ANY, headers=ANY
self._rust_client.post.assert_called_once_with(
url=INTROSPECTION_ENDPOINT,
response_limit=ANY,
request_body=ANY,
headers=ANY,
)
self._assertParams()
self.assertEqual(requester.user.to_string(), "@%s:%s" % (USERNAME, SERVER_NAME))
@ -301,26 +307,26 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
def test_active_admin_highest_privilege(self) -> None:
"""The handler should resolve to the most permissive scope."""
self.http_client.request = AsyncMock(
return_value=FakeResponse.json(
code=200,
payload={
"active": True,
"sub": SUBJECT,
"scope": " ".join(
[SYNAPSE_ADMIN_SCOPE, MATRIX_USER_SCOPE, MATRIX_GUEST_SCOPE]
),
"username": USERNAME,
},
)
self._set_introspection_returnvalue(
{
"active": True,
"sub": SUBJECT,
"scope": " ".join(
[SYNAPSE_ADMIN_SCOPE, MATRIX_USER_SCOPE, MATRIX_GUEST_SCOPE]
),
"username": USERNAME,
}
)
request = Mock(args={})
request.args[b"access_token"] = [b"mockAccessToken"]
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
requester = self.get_success(self.auth.get_user_by_req(request))
self.http_client.get_json.assert_called_once_with(WELL_KNOWN)
self.http_client.request.assert_called_once_with(
method="POST", uri=INTROSPECTION_ENDPOINT, data=ANY, headers=ANY
self._rust_client.post.assert_called_once_with(
url=INTROSPECTION_ENDPOINT,
response_limit=ANY,
request_body=ANY,
headers=ANY,
)
self._assertParams()
self.assertEqual(requester.user.to_string(), "@%s:%s" % (USERNAME, SERVER_NAME))
@ -333,24 +339,24 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
def test_active_user(self) -> None:
"""The handler should return a requester with normal user rights."""
self.http_client.request = AsyncMock(
return_value=FakeResponse.json(
code=200,
payload={
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_USER_SCOPE]),
"username": USERNAME,
},
)
self._set_introspection_returnvalue(
{
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_USER_SCOPE]),
"username": USERNAME,
}
)
request = Mock(args={})
request.args[b"access_token"] = [b"mockAccessToken"]
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
requester = self.get_success(self.auth.get_user_by_req(request))
self.http_client.get_json.assert_called_once_with(WELL_KNOWN)
self.http_client.request.assert_called_once_with(
method="POST", uri=INTROSPECTION_ENDPOINT, data=ANY, headers=ANY
self._rust_client.post.assert_called_once_with(
url=INTROSPECTION_ENDPOINT,
response_limit=ANY,
request_body=ANY,
headers=ANY,
)
self._assertParams()
self.assertEqual(requester.user.to_string(), "@%s:%s" % (USERNAME, SERVER_NAME))
@ -363,24 +369,24 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
def test_active_user_with_device(self) -> None:
"""The handler should return a requester with normal user rights and a device ID."""
self.http_client.request = AsyncMock(
return_value=FakeResponse.json(
code=200,
payload={
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_USER_SCOPE, MATRIX_DEVICE_SCOPE]),
"username": USERNAME,
},
)
self._set_introspection_returnvalue(
{
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_USER_SCOPE, MATRIX_DEVICE_SCOPE]),
"username": USERNAME,
}
)
request = Mock(args={})
request.args[b"access_token"] = [b"mockAccessToken"]
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
requester = self.get_success(self.auth.get_user_by_req(request))
self.http_client.get_json.assert_called_once_with(WELL_KNOWN)
self.http_client.request.assert_called_once_with(
method="POST", uri=INTROSPECTION_ENDPOINT, data=ANY, headers=ANY
self._rust_client.post.assert_called_once_with(
url=INTROSPECTION_ENDPOINT,
response_limit=ANY,
request_body=ANY,
headers=ANY,
)
self._assertParams()
self.assertEqual(requester.user.to_string(), "@%s:%s" % (USERNAME, SERVER_NAME))
@ -393,32 +399,32 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
def test_active_user_with_device_explicit_device_id(self) -> None:
"""The handler should return a requester with normal user rights and a device ID, given explicitly, as supported by MAS 0.15+"""
self.http_client.request = AsyncMock(
return_value=FakeResponse.json(
code=200,
payload={
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_USER_SCOPE]),
"device_id": DEVICE,
"username": USERNAME,
},
)
self._set_introspection_returnvalue(
{
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_USER_SCOPE]),
"device_id": DEVICE,
"username": USERNAME,
}
)
request = Mock(args={})
request.args[b"access_token"] = [b"mockAccessToken"]
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
requester = self.get_success(self.auth.get_user_by_req(request))
self.http_client.get_json.assert_called_once_with(WELL_KNOWN)
self.http_client.request.assert_called_once_with(
method="POST", uri=INTROSPECTION_ENDPOINT, data=ANY, headers=ANY
self._rust_client.post.assert_called_once_with(
url=INTROSPECTION_ENDPOINT,
response_limit=ANY,
request_body=ANY,
headers=ANY,
)
# It should have called with the 'X-MAS-Supports-Device-Id: 1' header
self.assertEqual(
self.http_client.request.call_args[1]["headers"].getRawHeaders(
b"X-MAS-Supports-Device-Id",
self._rust_client.post.call_args[1]["headers"].get(
"X-MAS-Supports-Device-Id",
),
[b"1"],
"1",
)
self._assertParams()
self.assertEqual(requester.user.to_string(), "@%s:%s" % (USERNAME, SERVER_NAME))
@ -431,22 +437,19 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
def test_multiple_devices(self) -> None:
"""The handler should raise an error if multiple devices are found in the scope."""
self.http_client.request = AsyncMock(
return_value=FakeResponse.json(
code=200,
payload={
"active": True,
"sub": SUBJECT,
"scope": " ".join(
[
MATRIX_USER_SCOPE,
f"{MATRIX_DEVICE_SCOPE_PREFIX}AABBCC",
f"{MATRIX_DEVICE_SCOPE_PREFIX}DDEEFF",
]
),
"username": USERNAME,
},
)
self._set_introspection_returnvalue(
{
"active": True,
"sub": SUBJECT,
"scope": " ".join(
[
MATRIX_USER_SCOPE,
f"{MATRIX_DEVICE_SCOPE_PREFIX}AABBCC",
f"{MATRIX_DEVICE_SCOPE_PREFIX}DDEEFF",
]
),
"username": USERNAME,
}
)
request = Mock(args={})
request.args[b"access_token"] = [b"mockAccessToken"]
@ -456,16 +459,13 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
def test_active_guest_not_allowed(self) -> None:
"""The handler should return an insufficient scope error."""
self.http_client.request = AsyncMock(
return_value=FakeResponse.json(
code=200,
payload={
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_GUEST_SCOPE, MATRIX_DEVICE_SCOPE]),
"username": USERNAME,
},
)
self._set_introspection_returnvalue(
{
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_GUEST_SCOPE, MATRIX_DEVICE_SCOPE]),
"username": USERNAME,
}
)
request = Mock(args={})
request.args[b"access_token"] = [b"mockAccessToken"]
@ -474,8 +474,11 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
self.auth.get_user_by_req(request), OAuthInsufficientScopeError
)
self.http_client.get_json.assert_called_once_with(WELL_KNOWN)
self.http_client.request.assert_called_once_with(
method="POST", uri=INTROSPECTION_ENDPOINT, data=ANY, headers=ANY
self._rust_client.post.assert_called_once_with(
url=INTROSPECTION_ENDPOINT,
response_limit=ANY,
request_body=ANY,
headers=ANY,
)
self._assertParams()
self.assertEqual(
@ -486,16 +489,13 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
def test_active_guest_allowed(self) -> None:
"""The handler should return a requester with guest user rights and a device ID."""
self.http_client.request = AsyncMock(
return_value=FakeResponse.json(
code=200,
payload={
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_GUEST_SCOPE, MATRIX_DEVICE_SCOPE]),
"username": USERNAME,
},
)
self._set_introspection_returnvalue(
{
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_GUEST_SCOPE, MATRIX_DEVICE_SCOPE]),
"username": USERNAME,
}
)
request = Mock(args={})
request.args[b"access_token"] = [b"mockAccessToken"]
@ -504,8 +504,11 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
self.auth.get_user_by_req(request, allow_guest=True)
)
self.http_client.get_json.assert_called_once_with(WELL_KNOWN)
self.http_client.request.assert_called_once_with(
method="POST", uri=INTROSPECTION_ENDPOINT, data=ANY, headers=ANY
self._rust_client.post.assert_called_once_with(
url=INTROSPECTION_ENDPOINT,
response_limit=ANY,
request_body=ANY,
headers=ANY,
)
self._assertParams()
self.assertEqual(requester.user.to_string(), "@%s:%s" % (USERNAME, SERVER_NAME))
@ -522,30 +525,28 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
# The introspection endpoint is returning an error.
self.http_client.request = AsyncMock(
return_value=FakeResponse(code=500, body=b"Internal Server Error")
)
error = self.get_failure(self.auth.get_user_by_req(request), SynapseError)
self.assertEqual(error.value.code, 503)
# The introspection endpoint request fails.
self.http_client.request = AsyncMock(side_effect=Exception())
error = self.get_failure(self.auth.get_user_by_req(request), SynapseError)
self.assertEqual(error.value.code, 503)
# The introspection endpoint does not return a JSON object.
self.http_client.request = AsyncMock(
return_value=FakeResponse.json(
code=200, payload=["this is an array", "not an object"]
self._rust_client.post = AsyncMock(
side_effect=HttpResponseException(
code=500, msg="Internal Server Error", response=b"{}"
)
)
error = self.get_failure(self.auth.get_user_by_req(request), SynapseError)
self.assertEqual(error.value.code, 503)
# The introspection endpoint request fails.
self._rust_client.post = AsyncMock(side_effect=Exception())
error = self.get_failure(self.auth.get_user_by_req(request), SynapseError)
self.assertEqual(error.value.code, 503)
# The introspection endpoint does not return a JSON object.
self._set_introspection_returnvalue(["this is an array", "not an object"])
error = self.get_failure(self.auth.get_user_by_req(request), SynapseError)
self.assertEqual(error.value.code, 503)
# The introspection endpoint does not return valid JSON.
self.http_client.request = AsyncMock(
return_value=FakeResponse(code=200, body=b"this is not valid JSON")
)
self._set_introspection_returnvalue("this is not valid JSON")
error = self.get_failure(self.auth.get_user_by_req(request), SynapseError)
self.assertEqual(error.value.code, 503)
@ -554,23 +555,21 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
an expiry time, the introspection response is cached and then the entry is
re-requested after it has expired."""
self.http_client.request = introspection_mock = AsyncMock(
return_value=FakeResponse.json(
code=200,
payload={
"active": True,
"sub": SUBJECT,
"scope": " ".join(
[
MATRIX_USER_SCOPE,
f"{MATRIX_DEVICE_SCOPE_PREFIX}AABBCC",
]
),
"username": USERNAME,
"expires_in": 60,
},
)
introspection_mock = self._set_introspection_returnvalue(
{
"active": True,
"sub": SUBJECT,
"scope": " ".join(
[
MATRIX_USER_SCOPE,
f"{MATRIX_DEVICE_SCOPE_PREFIX}AABBCC",
]
),
"username": USERNAME,
"expires_in": 60,
}
)
request = Mock(args={})
request.args[b"access_token"] = [b"mockAccessToken"]
request.requestHeaders.getRawHeaders = mock_getRawHeaders()
@ -607,16 +606,13 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
def test_cross_signing(self) -> None:
"""Try uploading device keys with OAuth delegation enabled."""
self.http_client.request = AsyncMock(
return_value=FakeResponse.json(
code=200,
payload={
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_USER_SCOPE, MATRIX_DEVICE_SCOPE]),
"username": USERNAME,
},
)
self._set_introspection_returnvalue(
{
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_USER_SCOPE, MATRIX_DEVICE_SCOPE]),
"username": USERNAME,
}
)
keys_upload_body = self.make_device_keys(USER_ID, DEVICE)
channel = self.make_request(
@ -778,16 +774,13 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
# Because we still support those endpoints with ASes, it checks the
# access token before returning 404
self.http_client.request = AsyncMock(
return_value=FakeResponse.json(
code=200,
payload={
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_USER_SCOPE, MATRIX_DEVICE_SCOPE]),
"username": USERNAME,
},
)
self._set_introspection_returnvalue(
{
"active": True,
"sub": SUBJECT,
"scope": " ".join([MATRIX_USER_SCOPE, MATRIX_DEVICE_SCOPE]),
"username": USERNAME,
},
)
self.expect_unrecognized("POST", "/_matrix/client/v3/delete_devices", auth=True)
@ -820,9 +813,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
def test_admin_token(self) -> None:
"""The handler should return a requester with admin rights when admin_token is used."""
self.http_client.request = AsyncMock(
return_value=FakeResponse.json(code=200, payload={"active": False}),
)
self._set_introspection_returnvalue({"active": False})
request = Mock(args={})
request.args[b"access_token"] = [b"admin_token_value"]
@ -839,7 +830,7 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
)
# There should be no call to the introspection endpoint
self.http_client.request.assert_not_called()
self._rust_client.post.assert_not_called()
@override_config({"mau_stats_only": True})
def test_request_tracking(self) -> None:
@ -852,28 +843,23 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
known_token = "token-token-GOOD-:)"
async def mock_http_client_request(
method: str,
uri: str,
data: Optional[bytes] = None,
headers: Optional[Headers] = None,
) -> IResponse:
url: str, request_body: str, **kwargs: Any
) -> bytes:
"""Mocked auth provider response."""
assert method == "POST"
token = parse_qs(data)[b"token"][0].decode("utf-8")
token = parse_qs(request_body)["token"][0]
if token == known_token:
return FakeResponse.json(
code=200,
payload={
return json.dumps(
{
"active": True,
"scope": MATRIX_USER_SCOPE,
"sub": SUBJECT,
"username": USERNAME,
},
)
).encode("utf-8")
return FakeResponse.json(code=200, payload={"active": False})
return json.dumps({"active": False}).encode("utf-8")
self.http_client.request = mock_http_client_request
self._rust_client.post = mock_http_client_request
EXAMPLE_IPV4_ADDR = "123.123.123.123"
EXAMPLE_USER_AGENT = "httprettygood"

View file

@ -65,8 +65,10 @@ class WorkerLockTestCase(unittest.HomeserverTestCase):
timeout_seconds = 15 # Increased timeout for RISC-V
# add a print or log statement here for visibility in CI logs
logger.info( # use logger.info
f"Detected RISC-V architecture ({current_machine}). "
f"Adjusting test_lock_contention: timeout={timeout_seconds}s"
"Detected RISC-V architecture (%s). "
"Adjusting test_lock_contention: timeout=%ss",
current_machine,
timeout_seconds,
)
else:
# Settings for other architectures

View file

@ -1838,7 +1838,7 @@ def _get_test_protocol_factory() -> IProtocolFactory:
def _log_request(request: str) -> None:
"""Implements Factory.log, which is expected by Request.finish"""
logger.info(f"Completed request {request}")
logger.info("Completed request %s", request)
@implementer(IPolicyForHTTPS)

View file

@ -893,4 +893,4 @@ def _get_test_protocol_factory() -> IProtocolFactory:
def _log_request(request: str) -> None:
"""Implements Factory.log, which is expected by Request.finish"""
logger.info(f"Completed request {request}")
logger.info("Completed request %s", request)

View file

@ -86,11 +86,11 @@ class RemoteHandlerTestCase(LoggerCleanupMixin, TestCase):
# Send some debug messages
for i in range(3):
logger.debug("debug %s" % (i,))
logger.debug("debug %s", i)
# Send a bunch of useful messages
for i in range(7):
logger.info("info %s" % (i,))
logger.info("info %s", i)
# The last debug message pushes it past the maximum buffer
logger.debug("too much debug")
@ -116,15 +116,15 @@ class RemoteHandlerTestCase(LoggerCleanupMixin, TestCase):
# Send some debug messages
for i in range(3):
logger.debug("debug %s" % (i,))
logger.debug("debug %s", i)
# Send a bunch of useful messages
for i in range(10):
logger.warning("warn %s" % (i,))
logger.warning("warn %s", i)
# Send a bunch of info messages
for i in range(3):
logger.info("info %s" % (i,))
logger.info("info %s", i)
# The last debug message pushes it past the maximum buffer
logger.debug("too much debug")
@ -152,7 +152,7 @@ class RemoteHandlerTestCase(LoggerCleanupMixin, TestCase):
# Send a bunch of useful messages
for i in range(20):
logger.warning("warn %s" % (i,))
logger.warning("warn %s", i)
# Allow the reconnection
client, server = connect_logging_client(self.reactor, 0)

View file

@ -1616,3 +1616,55 @@ class SlidingSyncTestCase(SlidingSyncBase):
{space_room_id, space_room_id2},
exact=True,
)
def test_exclude_rooms_from_sync(self) -> None:
"""Tests that sliding sync honours the `exclude_rooms_from_sync` config
option.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id_to_exclude = self.helper.create_room_as(
user1_id,
tok=user1_tok,
)
room_id_to_include = self.helper.create_room_as(
user1_id,
tok=user1_tok,
)
# We cheekily modify the stored config here, as we can't add it to the
# raw config since we don't know the room ID before we start up.
self.hs.get_sliding_sync_handler().rooms_to_exclude_globally.append(
room_id_to_exclude
)
self.hs.get_sliding_sync_handler().room_lists.rooms_to_exclude_globally.append(
room_id_to_exclude
)
# Make the Sliding Sync request
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 99]],
"required_state": [],
"timeline_limit": 0,
},
}
}
response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Make sure response only contains room_id_to_include
self.assertIncludes(
set(response_body["rooms"].keys()),
{room_id_to_include},
exact=True,
)
# Test that the excluded room is not in the list ops
# Make sure the list is sorted in the way we expect
self.assertIncludes(
set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
{room_id_to_include},
exact=True,
)

View file

@ -18,6 +18,7 @@
# [This file includes modifications made by New Vector Limited]
#
#
from typing import Optional
from twisted.test.proto_helpers import MemoryReactor
@ -201,3 +202,91 @@ class ReportRoomTestCase(unittest.HomeserverTestCase):
shorthand=False,
)
self.assertEqual(response_status, channel.code, msg=channel.result["body"])
class ReportUserTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
room.register_servlets,
reporting.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.other_user = self.register_user("user", "pass")
self.other_user_tok = self.login("user", "pass")
self.target_user_id = self.register_user("target_user", "pass")
def test_reason_str(self) -> None:
data = {"reason": "this makes me sad"}
self._assert_status(200, data)
rows = self.get_success(
self.hs.get_datastores().main.db_pool.simple_select_onecol(
table="user_reports",
keyvalues={"target_user_id": self.target_user_id},
retcol="id",
desc="get_user_report_ids",
)
)
self.assertEqual(len(rows), 1)
def test_no_reason(self) -> None:
data = {"not_reason": "for typechecking"}
self._assert_status(400, data)
def test_reason_nonstring(self) -> None:
data = {"reason": 42}
self._assert_status(400, data)
def test_reason_null(self) -> None:
data = {"reason": None}
self._assert_status(400, data)
def test_reason_long(self) -> None:
data = {"reason": "x" * 1001}
self._assert_status(400, data)
def test_cannot_report_nonlocal_user(self) -> None:
"""
Tests that we ignore reports for nonlocal users.
"""
target_user_id = "@bloop:example.org"
data = {"reason": "i am very sad"}
self._assert_status(200, data, target_user_id)
self._assert_no_reports_for_user(target_user_id)
def test_can_report_nonexistent_user(self) -> None:
"""
Tests that we ignore reports for nonexistent users.
"""
target_user_id = f"@bloop:{self.hs.hostname}"
data = {"reason": "i am very sad"}
self._assert_status(200, data, target_user_id)
self._assert_no_reports_for_user(target_user_id)
def _assert_no_reports_for_user(self, target_user_id: str) -> None:
rows = self.get_success(
self.hs.get_datastores().main.db_pool.simple_select_onecol(
table="user_reports",
keyvalues={"target_user_id": target_user_id},
retcol="id",
desc="get_user_report_ids",
)
)
self.assertEqual(len(rows), 0)
def _assert_status(
self, response_status: int, data: JsonDict, user_id: Optional[str] = None
) -> None:
if user_id is None:
user_id = self.target_user_id
channel = self.make_request(
"POST",
f"/_matrix/client/v3/users/{user_id}/report",
data,
access_token=self.other_user_tok,
shorthand=False,
)
self.assertEqual(response_status, channel.code, msg=channel.result["body"])

View file

@ -961,25 +961,47 @@ class AuthChainDifferenceTestCase(unittest.TestCase):
f"wrong pl for {user_id} on v{room_version.identifier}",
)
# the creator alone without PL is 100
got_creator_pl = self.successResultOf(
defer.ensureDeferred(
_get_power_level_for_sender(
ROOM_ID,
member_event.event_id,
{
member_event.event_id: member_event,
create_event.event_id: create_event,
},
store,
# the creator alone without PL is 100, everyone else is 0
want_pls = {
ALICE: 100,
BOB: 0,
CHARLIE: 0,
}
for user_id, want_pl in want_pls.items():
test_event = make_event_from_dict(
{
"room_id": ROOM_ID,
"sender": user_id,
"type": EventTypes.Topic,
"state_key": "",
"content": {"topic": "Test"},
"auth_events": [
create_event.event_id,
member_event.event_id,
pl_event.event_id,
],
"prev_events": [pl_event.event_id],
},
room_version,
)
got_pl = self.successResultOf(
defer.ensureDeferred(
_get_power_level_for_sender(
ROOM_ID,
test_event.event_id,
{
test_event.event_id: test_event,
create_event.event_id: create_event,
},
store,
)
)
)
)
self.assertEqual(
got_creator_pl,
100,
f"wrong pl for creator with no PL event on v{room_version.identifier}",
)
self.assertEqual(
got_pl,
want_pl,
f"wrong pl for {user_id} with no PL event on v{room_version.identifier}",
)
T = TypeVar("T")

Some files were not shown because too many files have changed in this diff Show more