mirror of
https://github.com/element-hq/synapse.git
synced 2026-01-16 23:00:43 +00:00
This can be reviewed commit by commit. This enables the `flake8-logging` and `flake8-logging-format` rules in Ruff, as well as logging exception stack traces in a few places where it makes sense - https://docs.astral.sh/ruff/rules/#flake8-logging-log - https://docs.astral.sh/ruff/rules/#flake8-logging-format-g ### Linting to avoid pre-formatting log messages See [`adamchainz/flake8-logging` -> *LOG011 avoid pre-formatting log messages*](152db2f167/README.rst (log011-avoid-pre-formatting-log-messages)) Practically, this means prefer placeholders (`%s`) over f-strings for logging. This is because placeholders are passed as args to loggers, so they can do special handling of them. For example, Sentry will record the args separately in their logging integration:c15b390dfe/sentry_sdk/integrations/logging.py (L280-L284)One theoretical small perf benefit is that log levels that aren't enabled won't get formatted, so it doesn't unnecessarily create formatted strings
1823 lines
66 KiB
Python
1823 lines
66 KiB
Python
#
|
|
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
|
#
|
|
# Copyright 2014-2021 The Matrix.org Foundation C.I.C.
|
|
# Copyright (C) 2023 New Vector, Ltd
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# See the GNU Affero General Public License for more details:
|
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
|
#
|
|
# Originally licensed under the Apache License, Version 2.0:
|
|
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
|
#
|
|
# [This file includes modifications made by New Vector Limited]
|
|
#
|
|
#
|
|
import abc
|
|
import codecs
|
|
import logging
|
|
import random
|
|
import sys
|
|
import urllib.parse
|
|
from http import HTTPStatus
|
|
from io import BytesIO, StringIO
|
|
from typing import (
|
|
TYPE_CHECKING,
|
|
Any,
|
|
BinaryIO,
|
|
Callable,
|
|
Dict,
|
|
Generic,
|
|
List,
|
|
Literal,
|
|
Optional,
|
|
TextIO,
|
|
Tuple,
|
|
TypeVar,
|
|
Union,
|
|
cast,
|
|
overload,
|
|
)
|
|
|
|
import attr
|
|
import treq
|
|
from canonicaljson import encode_canonical_json
|
|
from prometheus_client import Counter
|
|
from signedjson.sign import sign_json
|
|
|
|
from twisted.internet import defer
|
|
from twisted.internet.error import DNSLookupError
|
|
from twisted.internet.interfaces import IReactorTime
|
|
from twisted.internet.task import Cooperator
|
|
from twisted.web.client import ResponseFailed
|
|
from twisted.web.http_headers import Headers
|
|
from twisted.web.iweb import UNKNOWN_LENGTH, IAgent, IBodyProducer, IResponse
|
|
|
|
import synapse.metrics
|
|
import synapse.util.retryutils
|
|
from synapse.api.errors import (
|
|
Codes,
|
|
FederationDeniedError,
|
|
HttpResponseException,
|
|
RequestSendFailed,
|
|
SynapseError,
|
|
)
|
|
from synapse.api.ratelimiting import Ratelimiter
|
|
from synapse.crypto.context_factory import FederationPolicyForHTTPS
|
|
from synapse.http import QuieterFileBodyProducer
|
|
from synapse.http.client import (
|
|
BlocklistingAgentWrapper,
|
|
BodyExceededMaxSize,
|
|
ByteWriteable,
|
|
SimpleHttpClient,
|
|
_make_scheduler,
|
|
encode_query_args,
|
|
read_body_with_max_size,
|
|
read_multipart_response,
|
|
)
|
|
from synapse.http.connectproxyclient import BearerProxyCredentials
|
|
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
|
|
from synapse.http.proxyagent import ProxyAgent
|
|
from synapse.http.types import QueryParams
|
|
from synapse.logging import opentracing
|
|
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
|
from synapse.logging.opentracing import set_tag, start_active_span, tags
|
|
from synapse.types import JsonDict
|
|
from synapse.util import json_decoder
|
|
from synapse.util.async_helpers import AwakenableSleeper, Linearizer, timeout_deferred
|
|
from synapse.util.metrics import Measure
|
|
from synapse.util.stringutils import parse_and_validate_server_name
|
|
|
|
if TYPE_CHECKING:
|
|
from synapse.server import HomeServer
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
outgoing_requests_counter = Counter(
|
|
"synapse_http_matrixfederationclient_requests", "", ["method"]
|
|
)
|
|
incoming_responses_counter = Counter(
|
|
"synapse_http_matrixfederationclient_responses", "", ["method", "code"]
|
|
)
|
|
|
|
|
|
MAXINT = sys.maxsize
|
|
|
|
|
|
_next_id = 1
|
|
|
|
T = TypeVar("T")
|
|
|
|
|
|
class ByteParser(ByteWriteable, Generic[T], abc.ABC):
|
|
"""A `ByteWriteable` that has an additional `finish` function that returns
|
|
the parsed data.
|
|
"""
|
|
|
|
CONTENT_TYPE: str = abc.abstractproperty() # type: ignore
|
|
"""The expected content type of the response, e.g. `application/json`. If
|
|
the content type doesn't match we fail the request.
|
|
"""
|
|
|
|
# a federation response can be rather large (eg a big state_ids is 50M or so), so we
|
|
# need a generous limit here.
|
|
MAX_RESPONSE_SIZE: int = 100 * 1024 * 1024
|
|
"""The largest response this parser will accept."""
|
|
|
|
@abc.abstractmethod
|
|
def finish(self) -> T:
|
|
"""Called when response has finished streaming and the parser should
|
|
return the final result (or error).
|
|
"""
|
|
|
|
|
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
|
class MatrixFederationRequest:
|
|
method: str
|
|
"""HTTP method
|
|
"""
|
|
|
|
path: str
|
|
"""HTTP path
|
|
"""
|
|
|
|
destination: str
|
|
"""The remote server to send the HTTP request to.
|
|
"""
|
|
|
|
json: Optional[JsonDict] = None
|
|
"""JSON to send in the body.
|
|
"""
|
|
|
|
json_callback: Optional[Callable[[], JsonDict]] = None
|
|
"""A callback to generate the JSON.
|
|
"""
|
|
|
|
query: Optional[QueryParams] = None
|
|
"""Query arguments.
|
|
"""
|
|
|
|
txn_id: str = attr.ib(init=False)
|
|
"""Unique ID for this request (for logging), this is autogenerated.
|
|
"""
|
|
|
|
uri: bytes = b""
|
|
"""The URI of this request, usually generated from the above information.
|
|
"""
|
|
|
|
_generate_uri: bool = True
|
|
"""True to automatically generate the uri field based on the above information.
|
|
|
|
Set to False if manually configuring the URI.
|
|
"""
|
|
|
|
def __attrs_post_init__(self) -> None:
|
|
global _next_id
|
|
txn_id = "%s-O-%s" % (self.method, _next_id)
|
|
_next_id = (_next_id + 1) % (MAXINT - 1)
|
|
|
|
object.__setattr__(self, "txn_id", txn_id)
|
|
|
|
if self._generate_uri:
|
|
destination_bytes = self.destination.encode("ascii")
|
|
path_bytes = self.path.encode("ascii")
|
|
query_bytes = encode_query_args(self.query)
|
|
|
|
# The object is frozen so we can pre-compute this.
|
|
uri = urllib.parse.urlunparse(
|
|
(
|
|
b"matrix-federation",
|
|
destination_bytes,
|
|
path_bytes,
|
|
None,
|
|
query_bytes,
|
|
b"",
|
|
)
|
|
)
|
|
object.__setattr__(self, "uri", uri)
|
|
|
|
def get_json(self) -> Optional[JsonDict]:
|
|
if self.json_callback:
|
|
return self.json_callback()
|
|
return self.json
|
|
|
|
|
|
class _BaseJsonParser(ByteParser[T]):
|
|
"""A parser that buffers the response and tries to parse it as JSON."""
|
|
|
|
CONTENT_TYPE = "application/json"
|
|
|
|
def __init__(
|
|
self, validator: Optional[Callable[[Optional[object]], bool]] = None
|
|
) -> None:
|
|
"""
|
|
Args:
|
|
validator: A callable which takes the parsed JSON value and returns
|
|
true if the value is valid.
|
|
"""
|
|
self._buffer = StringIO()
|
|
self._binary_wrapper = BinaryIOWrapper(self._buffer)
|
|
self._validator = validator
|
|
|
|
def write(self, data: bytes) -> int:
|
|
return self._binary_wrapper.write(data)
|
|
|
|
def finish(self) -> T:
|
|
result = json_decoder.decode(self._buffer.getvalue())
|
|
if self._validator is not None and not self._validator(result):
|
|
raise ValueError(
|
|
f"Received incorrect JSON value: {result.__class__.__name__}"
|
|
)
|
|
return result
|
|
|
|
|
|
class JsonParser(_BaseJsonParser[JsonDict]):
|
|
"""A parser that buffers the response and tries to parse it as a JSON object."""
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__(self._validate)
|
|
|
|
@staticmethod
|
|
def _validate(v: Any) -> bool:
|
|
return isinstance(v, dict)
|
|
|
|
|
|
class LegacyJsonSendParser(_BaseJsonParser[Tuple[int, JsonDict]]):
|
|
"""Ensure the legacy responses of /send_join & /send_leave are correct."""
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__(self._validate)
|
|
|
|
@staticmethod
|
|
def _validate(v: Any) -> bool:
|
|
# Match [integer, JSON dict]
|
|
return (
|
|
isinstance(v, list)
|
|
and len(v) == 2
|
|
and type(v[0]) == int # noqa: E721
|
|
and isinstance(v[1], dict)
|
|
)
|
|
|
|
|
|
async def _handle_response(
|
|
reactor: IReactorTime,
|
|
timeout_sec: float,
|
|
request: MatrixFederationRequest,
|
|
response: IResponse,
|
|
start_ms: int,
|
|
parser: ByteParser[T],
|
|
) -> T:
|
|
"""
|
|
Reads the body of a response with a timeout and sends it to a parser
|
|
|
|
Args:
|
|
reactor: twisted reactor, for the timeout
|
|
timeout_sec: number of seconds to wait for response to complete
|
|
request: the request that triggered the response
|
|
response: response to the request
|
|
start_ms: Timestamp when request was made
|
|
parser: The parser for the response
|
|
|
|
Returns:
|
|
The parsed response
|
|
"""
|
|
|
|
max_response_size = parser.MAX_RESPONSE_SIZE
|
|
|
|
finished = False
|
|
try:
|
|
check_content_type_is(response.headers, parser.CONTENT_TYPE)
|
|
|
|
d = read_body_with_max_size(response, parser, max_response_size)
|
|
d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
|
|
|
|
length = await make_deferred_yieldable(d)
|
|
|
|
finished = True
|
|
value = parser.finish()
|
|
except BodyExceededMaxSize as e:
|
|
# The response was too big.
|
|
logger.warning(
|
|
"{%s} [%s] JSON response exceeded max size %i - %s %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
max_response_size,
|
|
request.method,
|
|
request.uri.decode("ascii"),
|
|
)
|
|
raise RequestSendFailed(e, can_retry=False) from e
|
|
except ValueError as e:
|
|
# The content was invalid.
|
|
logger.warning(
|
|
"{%s} [%s] Failed to parse response - %s %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
request.method,
|
|
request.uri.decode("ascii"),
|
|
)
|
|
raise RequestSendFailed(e, can_retry=False) from e
|
|
except defer.TimeoutError as e:
|
|
logger.warning(
|
|
"{%s} [%s] Timed out reading response - %s %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
request.method,
|
|
request.uri.decode("ascii"),
|
|
)
|
|
raise RequestSendFailed(e, can_retry=True) from e
|
|
except ResponseFailed as e:
|
|
logger.warning(
|
|
"{%s} [%s] Failed to read response - %s %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
request.method,
|
|
request.uri.decode("ascii"),
|
|
)
|
|
raise RequestSendFailed(e, can_retry=True) from e
|
|
except Exception as e:
|
|
logger.warning(
|
|
"{%s} [%s] Error reading response %s %s: %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
request.method,
|
|
request.uri.decode("ascii"),
|
|
e,
|
|
)
|
|
raise
|
|
finally:
|
|
if not finished:
|
|
# There was an exception and we didn't `finish()` the parse.
|
|
# Let the parser know that it can free up any resources.
|
|
try:
|
|
parser.finish()
|
|
except Exception:
|
|
# Ignore any additional exceptions.
|
|
pass
|
|
|
|
time_taken_secs = reactor.seconds() - start_ms / 1000
|
|
|
|
logger.info(
|
|
"{%s} [%s] Completed request: %d %s in %.2f secs, got %d bytes - %s %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
response.code,
|
|
response.phrase.decode("ascii", errors="replace"),
|
|
time_taken_secs,
|
|
length,
|
|
request.method,
|
|
request.uri.decode("ascii"),
|
|
)
|
|
return value
|
|
|
|
|
|
class BinaryIOWrapper:
|
|
"""A wrapper for a TextIO which converts from bytes on the fly."""
|
|
|
|
def __init__(self, file: TextIO, encoding: str = "utf-8", errors: str = "strict"):
|
|
self.decoder = codecs.getincrementaldecoder(encoding)(errors)
|
|
self.file = file
|
|
|
|
def write(self, b: Union[bytes, bytearray]) -> int:
|
|
self.file.write(self.decoder.decode(b))
|
|
return len(b)
|
|
|
|
|
|
class MatrixFederationHttpClient:
|
|
"""HTTP client used to talk to other homeservers over the federation
|
|
protocol. Send client certificates and signs requests.
|
|
|
|
Attributes:
|
|
agent (twisted.web.client.Agent): The twisted Agent used to send the
|
|
requests.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
hs: "HomeServer",
|
|
tls_client_options_factory: Optional[FederationPolicyForHTTPS],
|
|
):
|
|
self.hs = hs
|
|
self.signing_key = hs.signing_key
|
|
self.server_name = hs.hostname
|
|
|
|
self.reactor = hs.get_reactor()
|
|
|
|
user_agent = hs.version_string
|
|
if hs.config.server.user_agent_suffix:
|
|
user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix)
|
|
|
|
outbound_federation_restricted_to = (
|
|
hs.config.worker.outbound_federation_restricted_to
|
|
)
|
|
if hs.get_instance_name() in outbound_federation_restricted_to:
|
|
# Talk to federation directly
|
|
federation_agent: IAgent = MatrixFederationAgent(
|
|
self.reactor,
|
|
tls_client_options_factory,
|
|
user_agent.encode("ascii"),
|
|
hs.config.server.federation_ip_range_allowlist,
|
|
hs.config.server.federation_ip_range_blocklist,
|
|
)
|
|
else:
|
|
proxy_authorization_secret = hs.config.worker.worker_replication_secret
|
|
assert proxy_authorization_secret is not None, (
|
|
"`worker_replication_secret` must be set when using `outbound_federation_restricted_to` (used to authenticate requests across workers)"
|
|
)
|
|
federation_proxy_credentials = BearerProxyCredentials(
|
|
proxy_authorization_secret.encode("ascii")
|
|
)
|
|
|
|
# We need to talk to federation via the proxy via one of the configured
|
|
# locations
|
|
federation_proxy_locations = outbound_federation_restricted_to.locations
|
|
federation_agent = ProxyAgent(
|
|
self.reactor,
|
|
self.reactor,
|
|
tls_client_options_factory,
|
|
federation_proxy_locations=federation_proxy_locations,
|
|
federation_proxy_credentials=federation_proxy_credentials,
|
|
)
|
|
|
|
# Use a BlocklistingAgentWrapper to prevent circumventing the IP
|
|
# blocking via IP literals in server names
|
|
self.agent: IAgent = BlocklistingAgentWrapper(
|
|
federation_agent,
|
|
ip_blocklist=hs.config.server.federation_ip_range_blocklist,
|
|
)
|
|
|
|
self.clock = hs.get_clock()
|
|
self._store = hs.get_datastores().main
|
|
self.version_string_bytes = hs.version_string.encode("ascii")
|
|
self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000
|
|
self.max_long_retry_delay_seconds = (
|
|
hs.config.federation.max_long_retry_delay_ms / 1000
|
|
)
|
|
self.max_short_retry_delay_seconds = (
|
|
hs.config.federation.max_short_retry_delay_ms / 1000
|
|
)
|
|
self.max_long_retries = hs.config.federation.max_long_retries
|
|
self.max_short_retries = hs.config.federation.max_short_retries
|
|
|
|
self._cooperator = Cooperator(scheduler=_make_scheduler(self.reactor))
|
|
|
|
self._sleeper = AwakenableSleeper(self.reactor)
|
|
|
|
self._simple_http_client = SimpleHttpClient(
|
|
hs,
|
|
ip_blocklist=hs.config.server.federation_ip_range_blocklist,
|
|
ip_allowlist=hs.config.server.federation_ip_range_allowlist,
|
|
use_proxy=True,
|
|
)
|
|
|
|
self.remote_download_linearizer = Linearizer("remote_download_linearizer", 6)
|
|
|
|
def wake_destination(self, destination: str) -> None:
|
|
"""Called when the remote server may have come back online."""
|
|
|
|
self._sleeper.wake(destination)
|
|
|
|
async def _send_request_with_optional_trailing_slash(
|
|
self,
|
|
request: MatrixFederationRequest,
|
|
try_trailing_slash_on_400: bool = False,
|
|
**send_request_args: Any,
|
|
) -> IResponse:
|
|
"""Wrapper for _send_request which can optionally retry the request
|
|
upon receiving a combination of a 400 HTTP response code and a
|
|
'M_UNRECOGNIZED' errcode. This is a workaround for Synapse <= v0.99.3
|
|
due to https://github.com/matrix-org/synapse/issues/3622.
|
|
|
|
Args:
|
|
request: details of request to be sent
|
|
try_trailing_slash_on_400: Whether on receiving a 400
|
|
'M_UNRECOGNIZED' from the server to retry the request with a
|
|
trailing slash appended to the request path.
|
|
send_request_args: A dictionary of arguments to pass to `_send_request()`.
|
|
|
|
Raises:
|
|
HttpResponseException: If we get an HTTP response code >= 300
|
|
(except 429).
|
|
|
|
Returns:
|
|
Parsed JSON response body.
|
|
"""
|
|
try:
|
|
response = await self._send_request(request, **send_request_args)
|
|
except HttpResponseException as e:
|
|
# Received an HTTP error > 300. Check if it meets the requirements
|
|
# to retry with a trailing slash
|
|
if not try_trailing_slash_on_400:
|
|
raise
|
|
|
|
if e.code != 400 or e.to_synapse_error().errcode != "M_UNRECOGNIZED":
|
|
raise
|
|
|
|
# Retry with a trailing slash if we received a 400 with
|
|
# 'M_UNRECOGNIZED' which some endpoints can return when omitting a
|
|
# trailing slash on Synapse <= v0.99.3.
|
|
logger.info("Retrying request with trailing slash")
|
|
|
|
# Request is frozen so we create a new instance
|
|
request = attr.evolve(request, path=request.path + "/")
|
|
|
|
response = await self._send_request(request, **send_request_args)
|
|
|
|
return response
|
|
|
|
async def _send_request(
|
|
self,
|
|
request: MatrixFederationRequest,
|
|
retry_on_dns_fail: bool = True,
|
|
timeout: Optional[int] = None,
|
|
long_retries: bool = False,
|
|
ignore_backoff: bool = False,
|
|
backoff_on_404: bool = False,
|
|
backoff_on_all_error_codes: bool = False,
|
|
follow_redirects: bool = False,
|
|
) -> IResponse:
|
|
"""
|
|
Sends a request to the given server.
|
|
|
|
Args:
|
|
request: details of request to be sent
|
|
|
|
retry_on_dns_fail: true if the request should be retried on DNS failures
|
|
|
|
timeout: number of milliseconds to wait for the response headers
|
|
(including connecting to the server), *for each attempt*.
|
|
60s by default.
|
|
|
|
long_retries: whether to use the long retry algorithm.
|
|
|
|
The regular retry algorithm makes 4 attempts, with intervals
|
|
[0.5s, 1s, 2s].
|
|
|
|
The long retry algorithm makes 11 attempts, with intervals
|
|
[4s, 16s, 60s, 60s, ...]
|
|
|
|
Both algorithms add -20%/+40% jitter to the retry intervals.
|
|
|
|
Note that the above intervals are *in addition* to the time spent
|
|
waiting for the request to complete (up to `timeout` ms).
|
|
|
|
NB: the long retry algorithm takes over 20 minutes to complete, with a
|
|
default timeout of 60s! It's best not to use the `long_retries` option
|
|
for something that is blocking a client so we don't make them wait for
|
|
aaaaages, whereas some things like sending transactions (server to
|
|
server) we can be a lot more lenient but its very fuzzy / hand-wavey.
|
|
|
|
In the future, we could be more intelligent about doing this sort of
|
|
thing by looking at things with the bigger picture in mind,
|
|
https://github.com/matrix-org/synapse/issues/8917
|
|
|
|
ignore_backoff: true to ignore the historical backoff data
|
|
and try the request anyway.
|
|
|
|
backoff_on_404: Back off if we get a 404
|
|
backoff_on_all_error_codes: Back off if we get any error response
|
|
|
|
follow_redirects: True to follow the Location header of 307/308 redirect
|
|
responses. This does not recurse.
|
|
|
|
Returns:
|
|
Resolves with the HTTP response object on success.
|
|
|
|
Raises:
|
|
HttpResponseException: If we get an HTTP response code >= 300
|
|
(except 429).
|
|
NotRetryingDestination: If we are not yet ready to retry this
|
|
server.
|
|
FederationDeniedError: If this destination is not on our
|
|
federation whitelist
|
|
RequestSendFailed: If there were problems connecting to the
|
|
remote, due to e.g. DNS failures, connection timeouts etc.
|
|
"""
|
|
# Validate server name and log if it is an invalid destination, this is
|
|
# partially to help track down code paths where we haven't validated before here
|
|
try:
|
|
parse_and_validate_server_name(request.destination)
|
|
except ValueError:
|
|
logger.exception("Invalid destination: %s.", request.destination)
|
|
raise FederationDeniedError(request.destination)
|
|
|
|
if timeout is not None:
|
|
_sec_timeout = timeout / 1000
|
|
else:
|
|
_sec_timeout = self.default_timeout_seconds
|
|
|
|
if (
|
|
self.hs.config.federation.federation_domain_whitelist is not None
|
|
and request.destination
|
|
not in self.hs.config.federation.federation_domain_whitelist
|
|
):
|
|
raise FederationDeniedError(request.destination)
|
|
|
|
limiter = await synapse.util.retryutils.get_retry_limiter(
|
|
request.destination,
|
|
self.clock,
|
|
self._store,
|
|
backoff_on_404=backoff_on_404,
|
|
ignore_backoff=ignore_backoff,
|
|
notifier=self.hs.get_notifier(),
|
|
replication_client=self.hs.get_replication_command_handler(),
|
|
backoff_on_all_error_codes=backoff_on_all_error_codes,
|
|
)
|
|
|
|
method_bytes = request.method.encode("ascii")
|
|
destination_bytes = request.destination.encode("ascii")
|
|
path_bytes = request.path.encode("ascii")
|
|
query_bytes = encode_query_args(request.query)
|
|
|
|
scope = start_active_span(
|
|
"outgoing-federation-request",
|
|
tags={
|
|
tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
|
|
tags.PEER_ADDRESS: request.destination,
|
|
tags.HTTP_METHOD: request.method,
|
|
tags.HTTP_URL: request.path,
|
|
},
|
|
finish_on_close=True,
|
|
)
|
|
|
|
# Inject the span into the headers
|
|
headers_dict: Dict[bytes, List[bytes]] = {}
|
|
opentracing.inject_header_dict(headers_dict, request.destination)
|
|
|
|
headers_dict[b"User-Agent"] = [self.version_string_bytes]
|
|
|
|
with limiter, scope:
|
|
# XXX: Would be much nicer to retry only at the transaction-layer
|
|
# (once we have reliable transactions in place)
|
|
if long_retries:
|
|
retries_left = self.max_long_retries
|
|
else:
|
|
retries_left = self.max_short_retries
|
|
|
|
url_bytes = request.uri
|
|
url_str = url_bytes.decode("ascii")
|
|
|
|
url_to_sign_bytes = urllib.parse.urlunparse(
|
|
(b"", b"", path_bytes, None, query_bytes, b"")
|
|
)
|
|
|
|
while True:
|
|
try:
|
|
json = request.get_json()
|
|
if json:
|
|
headers_dict[b"Content-Type"] = [b"application/json"]
|
|
auth_headers = self.build_auth_headers(
|
|
destination_bytes, method_bytes, url_to_sign_bytes, json
|
|
)
|
|
data = encode_canonical_json(json)
|
|
producer: Optional[IBodyProducer] = QuieterFileBodyProducer(
|
|
BytesIO(data), cooperator=self._cooperator
|
|
)
|
|
else:
|
|
producer = None
|
|
auth_headers = self.build_auth_headers(
|
|
destination_bytes, method_bytes, url_to_sign_bytes
|
|
)
|
|
|
|
headers_dict[b"Authorization"] = auth_headers
|
|
|
|
logger.debug(
|
|
"{%s} [%s] Sending request: %s %s; timeout %fs",
|
|
request.txn_id,
|
|
request.destination,
|
|
request.method,
|
|
url_str,
|
|
_sec_timeout,
|
|
)
|
|
|
|
outgoing_requests_counter.labels(request.method).inc()
|
|
|
|
try:
|
|
with Measure(self.clock, "outbound_request"):
|
|
# we don't want all the fancy cookie and redirect handling
|
|
# that treq.request gives: just use the raw Agent.
|
|
|
|
# To preserve the logging context, the timeout is treated
|
|
# in a similar way to `defer.gatherResults`:
|
|
# * Each logging context-preserving fork is wrapped in
|
|
# `run_in_background`. In this case there is only one,
|
|
# since the timeout fork is not logging-context aware.
|
|
# * The `Deferred` that joins the forks back together is
|
|
# wrapped in `make_deferred_yieldable` to restore the
|
|
# logging context regardless of the path taken.
|
|
request_deferred = run_in_background(
|
|
self.agent.request,
|
|
method_bytes,
|
|
url_bytes,
|
|
headers=Headers(headers_dict),
|
|
bodyProducer=producer,
|
|
)
|
|
request_deferred = timeout_deferred(
|
|
request_deferred,
|
|
timeout=_sec_timeout,
|
|
reactor=self.reactor,
|
|
)
|
|
|
|
response = await make_deferred_yieldable(request_deferred)
|
|
except DNSLookupError as e:
|
|
raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e
|
|
except Exception as e:
|
|
raise RequestSendFailed(e, can_retry=True) from e
|
|
|
|
incoming_responses_counter.labels(
|
|
request.method, response.code
|
|
).inc()
|
|
|
|
set_tag(tags.HTTP_STATUS_CODE, response.code)
|
|
response_phrase = response.phrase.decode("ascii", errors="replace")
|
|
|
|
if 200 <= response.code < 300:
|
|
logger.debug(
|
|
"{%s} [%s] Got response headers: %d %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
response.code,
|
|
response_phrase,
|
|
)
|
|
elif (
|
|
response.code in (307, 308)
|
|
and follow_redirects
|
|
and response.headers.hasHeader("Location")
|
|
):
|
|
# The Location header *might* be relative so resolve it.
|
|
location = response.headers.getRawHeaders(b"Location")[0]
|
|
new_uri = urllib.parse.urljoin(request.uri, location)
|
|
|
|
return await self._send_request(
|
|
attr.evolve(request, uri=new_uri, generate_uri=False),
|
|
retry_on_dns_fail,
|
|
timeout,
|
|
long_retries,
|
|
ignore_backoff,
|
|
backoff_on_404,
|
|
backoff_on_all_error_codes,
|
|
# Do not continue following redirects.
|
|
follow_redirects=False,
|
|
)
|
|
else:
|
|
logger.info(
|
|
"{%s} [%s] Got response headers: %d %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
response.code,
|
|
response_phrase,
|
|
)
|
|
# :'(
|
|
# Update transactions table?
|
|
d = treq.content(response)
|
|
d = timeout_deferred(
|
|
d, timeout=_sec_timeout, reactor=self.reactor
|
|
)
|
|
|
|
try:
|
|
body = await make_deferred_yieldable(d)
|
|
except Exception as e:
|
|
# Eh, we're already going to raise an exception so lets
|
|
# ignore if this fails.
|
|
logger.warning(
|
|
"{%s} [%s] Failed to get error response: %s %s: %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
request.method,
|
|
url_str,
|
|
_flatten_response_never_received(e),
|
|
)
|
|
body = b""
|
|
|
|
exc = HttpResponseException(
|
|
response.code, response_phrase, body
|
|
)
|
|
|
|
# Retry if the error is a 5xx or a 429 (Too Many
|
|
# Requests), otherwise just raise a standard
|
|
# `HttpResponseException`
|
|
if 500 <= response.code < 600 or response.code == 429:
|
|
raise RequestSendFailed(exc, can_retry=True) from exc
|
|
else:
|
|
raise exc
|
|
|
|
break
|
|
except RequestSendFailed as e:
|
|
logger.info(
|
|
"{%s} [%s] Request failed: %s %s: %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
request.method,
|
|
url_str,
|
|
_flatten_response_never_received(e.inner_exception),
|
|
)
|
|
|
|
if not e.can_retry:
|
|
raise
|
|
|
|
if retries_left and not timeout:
|
|
if long_retries:
|
|
delay_seconds = 4 ** (
|
|
self.max_long_retries + 1 - retries_left
|
|
)
|
|
delay_seconds = min(
|
|
delay_seconds, self.max_long_retry_delay_seconds
|
|
)
|
|
delay_seconds *= random.uniform(0.8, 1.4)
|
|
else:
|
|
delay_seconds = 0.5 * 2 ** (
|
|
self.max_short_retries - retries_left
|
|
)
|
|
delay_seconds = min(
|
|
delay_seconds, self.max_short_retry_delay_seconds
|
|
)
|
|
delay_seconds *= random.uniform(0.8, 1.4)
|
|
|
|
logger.debug(
|
|
"{%s} [%s] Waiting %ss before re-sending...",
|
|
request.txn_id,
|
|
request.destination,
|
|
delay_seconds,
|
|
)
|
|
|
|
# Sleep for the calculated delay, or wake up immediately
|
|
# if we get notified that the server is back up.
|
|
await self._sleeper.sleep(
|
|
request.destination, delay_seconds * 1000
|
|
)
|
|
retries_left -= 1
|
|
else:
|
|
raise
|
|
|
|
except Exception as e:
|
|
logger.warning(
|
|
"{%s} [%s] Request failed: %s %s: %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
request.method,
|
|
url_str,
|
|
_flatten_response_never_received(e),
|
|
)
|
|
raise
|
|
return response
|
|
|
|
def build_auth_headers(
|
|
self,
|
|
destination: Optional[bytes],
|
|
method: bytes,
|
|
url_bytes: bytes,
|
|
content: Optional[JsonDict] = None,
|
|
destination_is: Optional[bytes] = None,
|
|
) -> List[bytes]:
|
|
"""
|
|
Builds the Authorization headers for a federation request
|
|
Args:
|
|
destination: The destination homeserver of the request.
|
|
May be None if the destination is an identity server, in which case
|
|
destination_is must be non-None.
|
|
method: The HTTP method of the request
|
|
url_bytes: The URI path of the request
|
|
content: The body of the request
|
|
destination_is: As 'destination', but if the destination is an
|
|
identity server
|
|
|
|
Returns:
|
|
A list of headers to be added as "Authorization:" headers
|
|
"""
|
|
if not destination and not destination_is:
|
|
raise ValueError(
|
|
"At least one of the arguments destination and destination_is "
|
|
"must be a nonempty bytestring."
|
|
)
|
|
|
|
request: JsonDict = {
|
|
"method": method.decode("ascii"),
|
|
"uri": url_bytes.decode("ascii"),
|
|
"origin": self.server_name,
|
|
}
|
|
|
|
if destination is not None:
|
|
request["destination"] = destination.decode("ascii")
|
|
|
|
if destination_is is not None:
|
|
request["destination_is"] = destination_is.decode("ascii")
|
|
|
|
if content is not None:
|
|
request["content"] = content
|
|
|
|
request = sign_json(request, self.server_name, self.signing_key)
|
|
|
|
auth_headers = []
|
|
|
|
for key, sig in request["signatures"][self.server_name].items():
|
|
auth_headers.append(
|
|
(
|
|
'X-Matrix origin="%s",key="%s",sig="%s",destination="%s"'
|
|
% (
|
|
self.server_name,
|
|
key,
|
|
sig,
|
|
request.get("destination") or request["destination_is"],
|
|
)
|
|
).encode("ascii")
|
|
)
|
|
return auth_headers
|
|
|
|
@overload
|
|
async def put_json(
|
|
self,
|
|
destination: str,
|
|
path: str,
|
|
args: Optional[QueryParams] = None,
|
|
data: Optional[JsonDict] = None,
|
|
json_data_callback: Optional[Callable[[], JsonDict]] = None,
|
|
long_retries: bool = False,
|
|
timeout: Optional[int] = None,
|
|
ignore_backoff: bool = False,
|
|
backoff_on_404: bool = False,
|
|
try_trailing_slash_on_400: bool = False,
|
|
parser: Literal[None] = None,
|
|
backoff_on_all_error_codes: bool = False,
|
|
) -> JsonDict: ...
|
|
|
|
@overload
|
|
async def put_json(
|
|
self,
|
|
destination: str,
|
|
path: str,
|
|
args: Optional[QueryParams] = None,
|
|
data: Optional[JsonDict] = None,
|
|
json_data_callback: Optional[Callable[[], JsonDict]] = None,
|
|
long_retries: bool = False,
|
|
timeout: Optional[int] = None,
|
|
ignore_backoff: bool = False,
|
|
backoff_on_404: bool = False,
|
|
try_trailing_slash_on_400: bool = False,
|
|
parser: Optional[ByteParser[T]] = None,
|
|
backoff_on_all_error_codes: bool = False,
|
|
) -> T: ...
|
|
|
|
async def put_json(
|
|
self,
|
|
destination: str,
|
|
path: str,
|
|
args: Optional[QueryParams] = None,
|
|
data: Optional[JsonDict] = None,
|
|
json_data_callback: Optional[Callable[[], JsonDict]] = None,
|
|
long_retries: bool = False,
|
|
timeout: Optional[int] = None,
|
|
ignore_backoff: bool = False,
|
|
backoff_on_404: bool = False,
|
|
try_trailing_slash_on_400: bool = False,
|
|
parser: Optional[ByteParser[T]] = None,
|
|
backoff_on_all_error_codes: bool = False,
|
|
) -> Union[JsonDict, T]:
|
|
"""Sends the specified json data using PUT
|
|
|
|
Args:
|
|
destination: The remote server to send the HTTP request to.
|
|
path: The HTTP path.
|
|
args: query params
|
|
data: A dict containing the data that will be used as
|
|
the request body. This will be encoded as JSON.
|
|
json_data_callback: A callable returning the dict to
|
|
use as the request body.
|
|
|
|
long_retries: whether to use the long retry algorithm. See
|
|
docs on _send_request for details.
|
|
|
|
timeout: number of milliseconds to wait for the response.
|
|
self._default_timeout (60s) by default.
|
|
|
|
Note that we may make several attempts to send the request; this
|
|
timeout applies to the time spent waiting for response headers for
|
|
*each* attempt (including connection time) as well as the time spent
|
|
reading the response body after a 200 response.
|
|
|
|
ignore_backoff: true to ignore the historical backoff data
|
|
and try the request anyway.
|
|
backoff_on_404: True if we should count a 404 response as
|
|
a failure of the server (and should therefore back off future
|
|
requests).
|
|
try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
|
|
response we should try appending a trailing slash to the end
|
|
of the request. Workaround for https://github.com/matrix-org/synapse/issues/3622
|
|
in Synapse <= v0.99.3. This will be attempted before backing off if
|
|
backing off has been enabled.
|
|
parser: The parser to use to decode the response. Defaults to
|
|
parsing as JSON.
|
|
backoff_on_all_error_codes: Back off if we get any error response
|
|
|
|
Returns:
|
|
Succeeds when we get a 2xx HTTP response. The
|
|
result will be the decoded JSON body.
|
|
|
|
Raises:
|
|
HttpResponseException: If we get an HTTP response code >= 300
|
|
(except 429).
|
|
NotRetryingDestination: If we are not yet ready to retry this
|
|
server.
|
|
FederationDeniedError: If this destination is not on our
|
|
federation whitelist
|
|
RequestSendFailed: If there were problems connecting to the
|
|
remote, due to e.g. DNS failures, connection timeouts etc.
|
|
"""
|
|
request = MatrixFederationRequest(
|
|
method="PUT",
|
|
destination=destination,
|
|
path=path,
|
|
query=args,
|
|
json_callback=json_data_callback,
|
|
json=data,
|
|
)
|
|
|
|
start_ms = self.clock.time_msec()
|
|
|
|
response = await self._send_request_with_optional_trailing_slash(
|
|
request,
|
|
try_trailing_slash_on_400,
|
|
backoff_on_404=backoff_on_404,
|
|
ignore_backoff=ignore_backoff,
|
|
long_retries=long_retries,
|
|
timeout=timeout,
|
|
backoff_on_all_error_codes=backoff_on_all_error_codes,
|
|
)
|
|
|
|
if timeout is not None:
|
|
_sec_timeout = timeout / 1000
|
|
else:
|
|
_sec_timeout = self.default_timeout_seconds
|
|
|
|
if parser is None:
|
|
parser = cast(ByteParser[T], JsonParser())
|
|
|
|
body = await _handle_response(
|
|
self.reactor,
|
|
_sec_timeout,
|
|
request,
|
|
response,
|
|
start_ms,
|
|
parser=parser,
|
|
)
|
|
|
|
return body
|
|
|
|
async def post_json(
|
|
self,
|
|
destination: str,
|
|
path: str,
|
|
data: Optional[JsonDict] = None,
|
|
long_retries: bool = False,
|
|
timeout: Optional[int] = None,
|
|
ignore_backoff: bool = False,
|
|
args: Optional[QueryParams] = None,
|
|
) -> JsonDict:
|
|
"""Sends the specified json data using POST
|
|
|
|
Args:
|
|
destination: The remote server to send the HTTP request to.
|
|
|
|
path: The HTTP path.
|
|
|
|
data: A dict containing the data that will be used as
|
|
the request body. This will be encoded as JSON.
|
|
|
|
long_retries: whether to use the long retry algorithm. See
|
|
docs on _send_request for details.
|
|
|
|
timeout: number of milliseconds to wait for the response.
|
|
self._default_timeout (60s) by default.
|
|
|
|
Note that we may make several attempts to send the request; this
|
|
timeout applies to the time spent waiting for response headers for
|
|
*each* attempt (including connection time) as well as the time spent
|
|
reading the response body after a 200 response.
|
|
|
|
ignore_backoff: true to ignore the historical backoff data and
|
|
try the request anyway.
|
|
|
|
args: query params
|
|
Returns:
|
|
Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body.
|
|
|
|
Raises:
|
|
HttpResponseException: If we get an HTTP response code >= 300
|
|
(except 429).
|
|
NotRetryingDestination: If we are not yet ready to retry this
|
|
server.
|
|
FederationDeniedError: If this destination is not on our
|
|
federation whitelist
|
|
RequestSendFailed: If there were problems connecting to the
|
|
remote, due to e.g. DNS failures, connection timeouts etc.
|
|
"""
|
|
|
|
request = MatrixFederationRequest(
|
|
method="POST", destination=destination, path=path, query=args, json=data
|
|
)
|
|
|
|
start_ms = self.clock.time_msec()
|
|
|
|
response = await self._send_request(
|
|
request,
|
|
long_retries=long_retries,
|
|
timeout=timeout,
|
|
ignore_backoff=ignore_backoff,
|
|
)
|
|
|
|
if timeout is not None:
|
|
_sec_timeout = timeout / 1000
|
|
else:
|
|
_sec_timeout = self.default_timeout_seconds
|
|
|
|
body = await _handle_response(
|
|
self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
|
|
)
|
|
return body
|
|
|
|
@overload
|
|
async def get_json(
|
|
self,
|
|
destination: str,
|
|
path: str,
|
|
args: Optional[QueryParams] = None,
|
|
retry_on_dns_fail: bool = True,
|
|
timeout: Optional[int] = None,
|
|
ignore_backoff: bool = False,
|
|
try_trailing_slash_on_400: bool = False,
|
|
parser: Literal[None] = None,
|
|
) -> JsonDict: ...
|
|
|
|
@overload
|
|
async def get_json(
|
|
self,
|
|
destination: str,
|
|
path: str,
|
|
args: Optional[QueryParams] = ...,
|
|
retry_on_dns_fail: bool = ...,
|
|
timeout: Optional[int] = ...,
|
|
ignore_backoff: bool = ...,
|
|
try_trailing_slash_on_400: bool = ...,
|
|
parser: ByteParser[T] = ...,
|
|
) -> T: ...
|
|
|
|
async def get_json(
|
|
self,
|
|
destination: str,
|
|
path: str,
|
|
args: Optional[QueryParams] = None,
|
|
retry_on_dns_fail: bool = True,
|
|
timeout: Optional[int] = None,
|
|
ignore_backoff: bool = False,
|
|
try_trailing_slash_on_400: bool = False,
|
|
parser: Optional[ByteParser[T]] = None,
|
|
) -> Union[JsonDict, T]:
|
|
"""GETs some json from the given host homeserver and path
|
|
|
|
Args:
|
|
destination: The remote server to send the HTTP request to.
|
|
|
|
path: The HTTP path.
|
|
|
|
args: A dictionary used to create query strings, defaults to
|
|
None.
|
|
|
|
retry_on_dns_fail: true if the request should be retried on DNS failures
|
|
|
|
timeout: number of milliseconds to wait for the response.
|
|
self._default_timeout (60s) by default.
|
|
|
|
Note that we may make several attempts to send the request; this
|
|
timeout applies to the time spent waiting for response headers for
|
|
*each* attempt (including connection time) as well as the time spent
|
|
reading the response body after a 200 response.
|
|
|
|
ignore_backoff: true to ignore the historical backoff data
|
|
and try the request anyway.
|
|
|
|
try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
|
|
response we should try appending a trailing slash to the end of
|
|
the request. Workaround for https://github.com/matrix-org/synapse/issues/3622
|
|
in Synapse <= v0.99.3.
|
|
|
|
parser: The parser to use to decode the response. Defaults to
|
|
parsing as JSON.
|
|
|
|
Returns:
|
|
Succeeds when we get a 2xx HTTP response. The
|
|
result will be the decoded JSON body.
|
|
|
|
Raises:
|
|
HttpResponseException: If we get an HTTP response code >= 300
|
|
(except 429).
|
|
NotRetryingDestination: If we are not yet ready to retry this
|
|
server.
|
|
FederationDeniedError: If this destination is not on our
|
|
federation whitelist
|
|
RequestSendFailed: If there were problems connecting to the
|
|
remote, due to e.g. DNS failures, connection timeouts etc.
|
|
"""
|
|
json_dict, _ = await self.get_json_with_headers(
|
|
destination=destination,
|
|
path=path,
|
|
args=args,
|
|
retry_on_dns_fail=retry_on_dns_fail,
|
|
timeout=timeout,
|
|
ignore_backoff=ignore_backoff,
|
|
try_trailing_slash_on_400=try_trailing_slash_on_400,
|
|
parser=parser,
|
|
)
|
|
return json_dict
|
|
|
|
@overload
|
|
async def get_json_with_headers(
|
|
self,
|
|
destination: str,
|
|
path: str,
|
|
args: Optional[QueryParams] = None,
|
|
retry_on_dns_fail: bool = True,
|
|
timeout: Optional[int] = None,
|
|
ignore_backoff: bool = False,
|
|
try_trailing_slash_on_400: bool = False,
|
|
parser: Literal[None] = None,
|
|
) -> Tuple[JsonDict, Dict[bytes, List[bytes]]]: ...
|
|
|
|
@overload
|
|
async def get_json_with_headers(
|
|
self,
|
|
destination: str,
|
|
path: str,
|
|
args: Optional[QueryParams] = ...,
|
|
retry_on_dns_fail: bool = ...,
|
|
timeout: Optional[int] = ...,
|
|
ignore_backoff: bool = ...,
|
|
try_trailing_slash_on_400: bool = ...,
|
|
parser: ByteParser[T] = ...,
|
|
) -> Tuple[T, Dict[bytes, List[bytes]]]: ...
|
|
|
|
async def get_json_with_headers(
|
|
self,
|
|
destination: str,
|
|
path: str,
|
|
args: Optional[QueryParams] = None,
|
|
retry_on_dns_fail: bool = True,
|
|
timeout: Optional[int] = None,
|
|
ignore_backoff: bool = False,
|
|
try_trailing_slash_on_400: bool = False,
|
|
parser: Optional[ByteParser[T]] = None,
|
|
) -> Tuple[Union[JsonDict, T], Dict[bytes, List[bytes]]]:
|
|
"""GETs some json from the given host homeserver and path
|
|
|
|
Args:
|
|
destination: The remote server to send the HTTP request to.
|
|
|
|
path: The HTTP path.
|
|
|
|
args: A dictionary used to create query strings, defaults to
|
|
None.
|
|
|
|
retry_on_dns_fail: true if the request should be retried on DNS failures
|
|
|
|
timeout: number of milliseconds to wait for the response.
|
|
self._default_timeout (60s) by default.
|
|
|
|
Note that we may make several attempts to send the request; this
|
|
timeout applies to the time spent waiting for response headers for
|
|
*each* attempt (including connection time) as well as the time spent
|
|
reading the response body after a 200 response.
|
|
|
|
ignore_backoff: true to ignore the historical backoff data
|
|
and try the request anyway.
|
|
|
|
try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
|
|
response we should try appending a trailing slash to the end of
|
|
the request. Workaround for https://github.com/matrix-org/synapse/issues/3622
|
|
in Synapse <= v0.99.3.
|
|
|
|
parser: The parser to use to decode the response. Defaults to
|
|
parsing as JSON.
|
|
|
|
Returns:
|
|
Succeeds when we get a 2xx HTTP response. The result will be a tuple of the
|
|
decoded JSON body and a dict of the response headers.
|
|
|
|
Raises:
|
|
HttpResponseException: If we get an HTTP response code >= 300
|
|
(except 429).
|
|
NotRetryingDestination: If we are not yet ready to retry this
|
|
server.
|
|
FederationDeniedError: If this destination is not on our
|
|
federation whitelist
|
|
RequestSendFailed: If there were problems connecting to the
|
|
remote, due to e.g. DNS failures, connection timeouts etc.
|
|
"""
|
|
request = MatrixFederationRequest(
|
|
method="GET", destination=destination, path=path, query=args
|
|
)
|
|
|
|
start_ms = self.clock.time_msec()
|
|
|
|
response = await self._send_request_with_optional_trailing_slash(
|
|
request,
|
|
try_trailing_slash_on_400,
|
|
backoff_on_404=False,
|
|
ignore_backoff=ignore_backoff,
|
|
retry_on_dns_fail=retry_on_dns_fail,
|
|
timeout=timeout,
|
|
)
|
|
|
|
headers = dict(response.headers.getAllRawHeaders())
|
|
|
|
if timeout is not None:
|
|
_sec_timeout = timeout / 1000
|
|
else:
|
|
_sec_timeout = self.default_timeout_seconds
|
|
|
|
if parser is None:
|
|
parser = cast(ByteParser[T], JsonParser())
|
|
|
|
body = await _handle_response(
|
|
self.reactor,
|
|
_sec_timeout,
|
|
request,
|
|
response,
|
|
start_ms,
|
|
parser=parser,
|
|
)
|
|
|
|
return body, headers
|
|
|
|
async def delete_json(
|
|
self,
|
|
destination: str,
|
|
path: str,
|
|
long_retries: bool = False,
|
|
timeout: Optional[int] = None,
|
|
ignore_backoff: bool = False,
|
|
args: Optional[QueryParams] = None,
|
|
) -> JsonDict:
|
|
"""Send a DELETE request to the remote expecting some json response
|
|
|
|
Args:
|
|
destination: The remote server to send the HTTP request to.
|
|
path: The HTTP path.
|
|
|
|
long_retries: whether to use the long retry algorithm. See
|
|
docs on _send_request for details.
|
|
|
|
timeout: number of milliseconds to wait for the response.
|
|
self._default_timeout (60s) by default.
|
|
|
|
Note that we may make several attempts to send the request; this
|
|
timeout applies to the time spent waiting for response headers for
|
|
*each* attempt (including connection time) as well as the time spent
|
|
reading the response body after a 200 response.
|
|
|
|
ignore_backoff: true to ignore the historical backoff data and
|
|
try the request anyway.
|
|
|
|
args: query params
|
|
Returns:
|
|
Succeeds when we get a 2xx HTTP response. The
|
|
result will be the decoded JSON body.
|
|
|
|
Raises:
|
|
HttpResponseException: If we get an HTTP response code >= 300
|
|
(except 429).
|
|
NotRetryingDestination: If we are not yet ready to retry this
|
|
server.
|
|
FederationDeniedError: If this destination is not on our
|
|
federation whitelist
|
|
RequestSendFailed: If there were problems connecting to the
|
|
remote, due to e.g. DNS failures, connection timeouts etc.
|
|
"""
|
|
request = MatrixFederationRequest(
|
|
method="DELETE", destination=destination, path=path, query=args
|
|
)
|
|
|
|
start_ms = self.clock.time_msec()
|
|
|
|
response = await self._send_request(
|
|
request,
|
|
long_retries=long_retries,
|
|
timeout=timeout,
|
|
ignore_backoff=ignore_backoff,
|
|
)
|
|
|
|
if timeout is not None:
|
|
_sec_timeout = timeout / 1000
|
|
else:
|
|
_sec_timeout = self.default_timeout_seconds
|
|
|
|
body = await _handle_response(
|
|
self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
|
|
)
|
|
return body
|
|
|
|
async def get_file(
|
|
self,
|
|
destination: str,
|
|
path: str,
|
|
output_stream: BinaryIO,
|
|
download_ratelimiter: Ratelimiter,
|
|
ip_address: str,
|
|
max_size: int,
|
|
args: Optional[QueryParams] = None,
|
|
retry_on_dns_fail: bool = True,
|
|
ignore_backoff: bool = False,
|
|
follow_redirects: bool = False,
|
|
) -> Tuple[int, Dict[bytes, List[bytes]]]:
|
|
"""GETs a file from a given homeserver
|
|
Args:
|
|
destination: The remote server to send the HTTP request to.
|
|
path: The HTTP path to GET.
|
|
output_stream: File to write the response body to.
|
|
download_ratelimiter: a ratelimiter to limit remote media downloads, keyed to
|
|
requester IP
|
|
ip_address: IP address of the requester
|
|
max_size: maximum allowable size in bytes of the file
|
|
args: Optional dictionary used to create the query string.
|
|
ignore_backoff: true to ignore the historical backoff data
|
|
and try the request anyway.
|
|
follow_redirects: True to follow the Location header of 307/308 redirect
|
|
responses. This does not recurse.
|
|
|
|
Returns:
|
|
Resolves with an (int,dict) tuple of
|
|
the file length and a dict of the response headers.
|
|
|
|
Raises:
|
|
HttpResponseException: If we get an HTTP response code >= 300
|
|
(except 429).
|
|
NotRetryingDestination: If we are not yet ready to retry this
|
|
server.
|
|
FederationDeniedError: If this destination is not on our
|
|
federation whitelist
|
|
RequestSendFailed: If there were problems connecting to the
|
|
remote, due to e.g. DNS failures, connection timeouts etc.
|
|
SynapseError: If the requested file exceeds ratelimits
|
|
"""
|
|
request = MatrixFederationRequest(
|
|
method="GET", destination=destination, path=path, query=args
|
|
)
|
|
|
|
# check for a minimum balance of 1MiB in ratelimiter before initiating request
|
|
send_req, _ = await download_ratelimiter.can_do_action(
|
|
requester=None, key=ip_address, n_actions=1048576, update=False
|
|
)
|
|
|
|
if not send_req:
|
|
msg = "Requested file size exceeds ratelimits"
|
|
logger.warning(
|
|
"{%s} [%s] %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
msg,
|
|
)
|
|
raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED)
|
|
|
|
response = await self._send_request(
|
|
request,
|
|
retry_on_dns_fail=retry_on_dns_fail,
|
|
ignore_backoff=ignore_backoff,
|
|
follow_redirects=follow_redirects,
|
|
)
|
|
|
|
headers = dict(response.headers.getAllRawHeaders())
|
|
expected_size = response.length
|
|
|
|
if expected_size == UNKNOWN_LENGTH:
|
|
expected_size = max_size
|
|
else:
|
|
if int(expected_size) > max_size:
|
|
msg = "Requested file is too large > %r bytes" % (max_size,)
|
|
logger.warning(
|
|
"{%s} [%s] %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
msg,
|
|
)
|
|
raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
|
|
|
|
read_body, _ = await download_ratelimiter.can_do_action(
|
|
requester=None,
|
|
key=ip_address,
|
|
n_actions=expected_size,
|
|
)
|
|
if not read_body:
|
|
msg = "Requested file size exceeds ratelimits"
|
|
logger.warning(
|
|
"{%s} [%s] %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
msg,
|
|
)
|
|
raise SynapseError(
|
|
HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED
|
|
)
|
|
|
|
try:
|
|
async with self.remote_download_linearizer.queue(ip_address):
|
|
# add a byte of headroom to max size as function errs at >=
|
|
d = read_body_with_max_size(response, output_stream, expected_size + 1)
|
|
d.addTimeout(self.default_timeout_seconds, self.reactor)
|
|
length = await make_deferred_yieldable(d)
|
|
except BodyExceededMaxSize:
|
|
msg = "Requested file is too large > %r bytes" % (expected_size,)
|
|
logger.warning(
|
|
"{%s} [%s] %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
msg,
|
|
)
|
|
raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
|
|
except defer.TimeoutError as e:
|
|
logger.warning(
|
|
"{%s} [%s] Timed out reading response - %s %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
request.method,
|
|
request.uri.decode("ascii"),
|
|
)
|
|
raise RequestSendFailed(e, can_retry=True) from e
|
|
except ResponseFailed as e:
|
|
logger.warning(
|
|
"{%s} [%s] Failed to read response - %s %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
request.method,
|
|
request.uri.decode("ascii"),
|
|
)
|
|
raise RequestSendFailed(e, can_retry=True) from e
|
|
except Exception as e:
|
|
logger.warning(
|
|
"{%s} [%s] Error reading response: %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
e,
|
|
)
|
|
raise
|
|
logger.info(
|
|
"{%s} [%s] Completed: %d %s [%d bytes] %s %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
response.code,
|
|
response.phrase.decode("ascii", errors="replace"),
|
|
length,
|
|
request.method,
|
|
request.uri.decode("ascii"),
|
|
)
|
|
|
|
# if we didn't know the length upfront, decrement the actual size from ratelimiter
|
|
if response.length == UNKNOWN_LENGTH:
|
|
download_ratelimiter.record_action(
|
|
requester=None, key=ip_address, n_actions=length
|
|
)
|
|
|
|
return length, headers
|
|
|
|
async def federation_get_file(
|
|
self,
|
|
destination: str,
|
|
path: str,
|
|
output_stream: BinaryIO,
|
|
download_ratelimiter: Ratelimiter,
|
|
ip_address: str,
|
|
max_size: int,
|
|
args: Optional[QueryParams] = None,
|
|
retry_on_dns_fail: bool = True,
|
|
ignore_backoff: bool = False,
|
|
) -> Tuple[int, Dict[bytes, List[bytes]], bytes]:
|
|
"""GETs a file from a given homeserver over the federation /download endpoint
|
|
Args:
|
|
destination: The remote server to send the HTTP request to.
|
|
path: The HTTP path to GET.
|
|
output_stream: File to write the response body to.
|
|
download_ratelimiter: a ratelimiter to limit remote media downloads, keyed to
|
|
requester IP
|
|
ip_address: IP address of the requester
|
|
max_size: maximum allowable size in bytes of the file
|
|
args: Optional dictionary used to create the query string.
|
|
ignore_backoff: true to ignore the historical backoff data
|
|
and try the request anyway.
|
|
|
|
Returns:
|
|
Resolves to an (int, dict, bytes) tuple of
|
|
the file length, a dict of the response headers, and the file json
|
|
|
|
Raises:
|
|
HttpResponseException: If we get an HTTP response code >= 300
|
|
(except 429).
|
|
NotRetryingDestination: If we are not yet ready to retry this
|
|
server.
|
|
FederationDeniedError: If this destination is not on our
|
|
federation whitelist
|
|
RequestSendFailed: If there were problems connecting to the
|
|
remote, due to e.g. DNS failures, connection timeouts etc.
|
|
SynapseError: If the requested file exceeds ratelimits or the response from the
|
|
remote server is not a multipart response
|
|
AssertionError: if the resolved multipart response's length is None
|
|
"""
|
|
request = MatrixFederationRequest(
|
|
method="GET", destination=destination, path=path, query=args
|
|
)
|
|
|
|
# check for a minimum balance of 1MiB in ratelimiter before initiating request
|
|
send_req, _ = await download_ratelimiter.can_do_action(
|
|
requester=None, key=ip_address, n_actions=1048576, update=False
|
|
)
|
|
|
|
if not send_req:
|
|
msg = "Requested file size exceeds ratelimits"
|
|
logger.warning(
|
|
"{%s} [%s] %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
msg,
|
|
)
|
|
raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED)
|
|
|
|
response = await self._send_request(
|
|
request,
|
|
retry_on_dns_fail=retry_on_dns_fail,
|
|
ignore_backoff=ignore_backoff,
|
|
)
|
|
|
|
headers = dict(response.headers.getAllRawHeaders())
|
|
expected_size = response.length
|
|
|
|
if expected_size == UNKNOWN_LENGTH:
|
|
expected_size = max_size
|
|
else:
|
|
if int(expected_size) > max_size:
|
|
msg = "Requested file is too large > %r bytes" % (max_size,)
|
|
logger.warning(
|
|
"{%s} [%s] %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
msg,
|
|
)
|
|
raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
|
|
|
|
read_body, _ = await download_ratelimiter.can_do_action(
|
|
requester=None,
|
|
key=ip_address,
|
|
n_actions=expected_size,
|
|
)
|
|
if not read_body:
|
|
msg = "Requested file size exceeds ratelimits"
|
|
logger.warning(
|
|
"{%s} [%s] %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
msg,
|
|
)
|
|
raise SynapseError(
|
|
HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED
|
|
)
|
|
|
|
# this should be a multipart/mixed response with the boundary string in the header
|
|
try:
|
|
raw_content_type = headers.get(b"Content-Type")
|
|
assert raw_content_type is not None
|
|
content_type = raw_content_type[0].decode("UTF-8")
|
|
content_type_parts = content_type.split("boundary=")
|
|
boundary = content_type_parts[1]
|
|
except Exception:
|
|
msg = "Remote response is malformed: expected Content-Type of multipart/mixed with a boundary present."
|
|
logger.warning(
|
|
"{%s} [%s] %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
msg,
|
|
)
|
|
raise SynapseError(HTTPStatus.BAD_GATEWAY, msg)
|
|
|
|
try:
|
|
async with self.remote_download_linearizer.queue(ip_address):
|
|
# add a byte of headroom to max size as `_MultipartParserProtocol.dataReceived` errs at >=
|
|
deferred = read_multipart_response(
|
|
response, output_stream, boundary, expected_size + 1
|
|
)
|
|
deferred.addTimeout(self.default_timeout_seconds, self.reactor)
|
|
except BodyExceededMaxSize:
|
|
msg = "Requested file is too large > %r bytes" % (expected_size,)
|
|
logger.warning(
|
|
"{%s} [%s] %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
msg,
|
|
)
|
|
raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
|
|
except defer.TimeoutError as e:
|
|
logger.warning(
|
|
"{%s} [%s] Timed out reading response - %s %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
request.method,
|
|
request.uri.decode("ascii"),
|
|
)
|
|
raise RequestSendFailed(e, can_retry=True) from e
|
|
except ResponseFailed as e:
|
|
logger.warning(
|
|
"{%s} [%s] Failed to read response - %s %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
request.method,
|
|
request.uri.decode("ascii"),
|
|
)
|
|
raise RequestSendFailed(e, can_retry=True) from e
|
|
except Exception as e:
|
|
logger.warning(
|
|
"{%s} [%s] Error reading response: %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
e,
|
|
)
|
|
raise
|
|
|
|
multipart_response = await make_deferred_yieldable(deferred)
|
|
if not multipart_response.url:
|
|
assert multipart_response.length is not None
|
|
length = multipart_response.length
|
|
headers[b"Content-Type"] = [multipart_response.content_type]
|
|
headers[b"Content-Disposition"] = [multipart_response.disposition]
|
|
|
|
# the response contained a redirect url to download the file from
|
|
else:
|
|
str_url = multipart_response.url.decode("utf-8")
|
|
logger.info(
|
|
"{%s} [%s] File download redirected, now downloading from: %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
str_url,
|
|
)
|
|
# We don't know how large the response will be upfront, so limit it to
|
|
# the `max_size` config value.
|
|
length, headers, _, _ = await self._simple_http_client.get_file(
|
|
str_url, output_stream, max_size
|
|
)
|
|
|
|
logger.info(
|
|
"{%s} [%s] Completed: %d %s [%d bytes] %s %s",
|
|
request.txn_id,
|
|
request.destination,
|
|
response.code,
|
|
response.phrase.decode("ascii", errors="replace"),
|
|
length,
|
|
request.method,
|
|
request.uri.decode("ascii"),
|
|
)
|
|
|
|
# if we didn't know the length upfront, decrement the actual size from ratelimiter
|
|
if response.length == UNKNOWN_LENGTH:
|
|
download_ratelimiter.record_action(
|
|
requester=None, key=ip_address, n_actions=length
|
|
)
|
|
|
|
return length, headers, multipart_response.json
|
|
|
|
|
|
def _flatten_response_never_received(e: BaseException) -> str:
|
|
if hasattr(e, "reasons"):
|
|
reasons = ", ".join(
|
|
_flatten_response_never_received(f.value) for f in e.reasons
|
|
)
|
|
|
|
return "%s:[%s]" % (type(e).__name__, reasons)
|
|
else:
|
|
return repr(e)
|
|
|
|
|
|
def check_content_type_is(headers: Headers, expected_content_type: str) -> None:
|
|
"""
|
|
Check that a set of HTTP headers have a Content-Type header, and that it
|
|
is the expected value..
|
|
|
|
Args:
|
|
headers: headers to check
|
|
|
|
Raises:
|
|
RequestSendFailed: if the Content-Type header is missing or doesn't match
|
|
|
|
"""
|
|
content_type_headers = headers.getRawHeaders(b"Content-Type")
|
|
if content_type_headers is None:
|
|
raise RequestSendFailed(
|
|
RuntimeError("No Content-Type header received from remote server"),
|
|
can_retry=False,
|
|
)
|
|
|
|
c_type = content_type_headers[0].decode("ascii") # only the first header
|
|
# Extract the 'essence' of the mimetype, removing any parameter
|
|
c_type_parsed = c_type.split(";", 1)[0].strip()
|
|
if c_type_parsed != expected_content_type:
|
|
raise RequestSendFailed(
|
|
RuntimeError(
|
|
f"Remote server sent Content-Type header of '{c_type}', not '{expected_content_type}'",
|
|
),
|
|
can_retry=False,
|
|
)
|