Implement alternative routing

This new feature will query the hardcoded dns hosts with the hardcoded (encoded) urls. The result is that it will provide a set of alternative url routes that can be used to reach Proton API.

For this to work, the dev implementing this must disable ssl_verification and ensure that TLS pinning is enabled.

Since the standard TLS checks are weaked, given by the fact that ssl_verification is disabled, TLS pinning must always be enabled, when using this feature, to ensure protection again MITM attacks.
This commit is contained in:
Alexandru Cheltuitor 2021-07-08 17:41:24 +01:00
parent 32d68803dd
commit 705e0a47f6
No known key found for this signature in database
GPG key ID: 74418CFCDE482CF8
17 changed files with 217 additions and 69 deletions

View file

@ -18,6 +18,7 @@ RUN pacman -Syu --noconfirm \
python-bcrypt \
python-gnupg \
python-pytest \
python-dnspython \
python-pytest-cov \
&& useradd -ms /bin/bash user \
&& usermod -a -G wheel user \

View file

@ -1,46 +0,0 @@
FROM IMAGE_URL_CENTOS8
RUN dnf update -y
# RUN yum -y install epel-release && yum repolist
RUN dnf install -y \
gcc \
sudo \
rpm-build \
rpm-devel \
rpmlint \
make \
python3 \
python3-pip \
bash \
epel-release \
diffutils \
patch \
rpmdevtools \
rpm-sign \
vim \
openssl-devel \
openssl-libs
RUN dnf install -y \
python3-requests \
python3-pyOpenSSL \
python3-bcrypt \
python3-gnupg
RUN dnf install -y \
python3-pytest \
python3-pytest-cov
RUN useradd -ms /bin/bash user
RUN usermod -a -G wheel user
RUN echo '%wheel ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers
COPY docker_entry.sh /usr/local/bin
COPY . /home/user/proton-python-client
RUN chown -R user:user /home/user/
USER user
WORKDIR /home/user/proton-python-client
ENTRYPOINT ["/usr/local/bin/docker_entry.sh"]

View file

@ -27,6 +27,7 @@ RUN apt-get install -y \
python3-bcrypt \
python3-gnupg \
python3-openssl \
python3-dnspython \
python3-requests >= 2.16.0
RUN apt-get install -y \

View file

@ -26,6 +26,7 @@ RUN dnf install -y \
python3-requests \
python3-pyOpenSSL \
python3-bcrypt \
python3-dnspython \
python3-gnupg
RUN dnf install -y \

View file

@ -26,6 +26,7 @@ RUN dnf install -y \
python3-requests \
python3-pyOpenSSL \
python3-bcrypt \
python3-dnspython \
python3-gnupg
RUN dnf install -y \

View file

@ -26,6 +26,7 @@ RUN dnf install -y \
python3-requests \
python3-pyOpenSSL \
python3-bcrypt \
python3-dnspython \
python3-gnupg
RUN dnf install -y \

View file

@ -26,6 +26,7 @@ RUN dnf install -y \
python3-requests \
python3-pyOpenSSL \
python3-bcrypt \
python3-dnspython \
python3-gnupg
RUN dnf install -y \

View file

@ -1,13 +1,13 @@
# Maintainer: Proton Technologies AG <opensource@proton.me>
pkgname=python-proton-client
pkgver=0.5.1
pkgrel=3
pkgver=0.6.0
pkgrel=1
pkgdesc="Safely login with ProtonVPN credentials to connect to Proton."
arch=("any")
url="https://github.com/ProtonMail/proton-python-client"
license=("GPL3")
groups=("ProtonVPN")
depends=("python-requests" "python-bcrypt" "python-gnupg" "python-pyopenssl")
depends=("python-requests" "python-bcrypt" "python-gnupg" "python-pyopenssl", "python-dnspython")
makedepends=("python-setuptools")
source=("$pkgname.tar.gz")
sha256sums=(.)

6
debian/changelog vendored
View file

@ -1,3 +1,9 @@
proton-python-client (0.6.0-1) unstable; urgency=medium
* Feature: Alternative Routing
-- Proton Technologies AG <opensource@proton.me> Thu, 08 Jul 2021 17:30:00 +0100
proton-python-client (0.5.1-3) unstable; urgency=medium
* Add new exceptions for improved case handling

2
debian/control vendored
View file

@ -10,7 +10,7 @@ Package: python3-proton-client
Architecture: all
Homepage: https://github.com/ProtonMail/proton-python-client
Section: net
Depends: ${python3:Depends}, ${misc:Depends}, python3-bcrypt, python3-gnupg, python3-openssl, python3-requests (>= 2.16.0)
Depends: ${python3:Depends}, ${misc:Depends}, python3-bcrypt, python3-gnupg, python3-openssl, python3-dnspython, python3-requests (>= 2.16.0)
Description: Proton SRP (Python 3)
Safely login with ProtonVPN credentials to connect to Proton.
.

View file

@ -1,2 +1,2 @@
from .api import Session # noqa
from .exceptions import ProtonError # noqa
from .exceptions import ProtonAPIError # noqa

View file

@ -3,12 +3,21 @@ import json
import gnupg
import requests
import urllib3
urllib3.disable_warnings()
from concurrent.futures import ThreadPoolExecutor
import dns.message
from .cert_pinning import TLSPinningAdapter
from .constants import (DEFAULT_TIMEOUT, SRP_MODULUS_KEY,
SRP_MODULUS_KEY_FINGERPRINT)
from .exceptions import (ConnectionTimeOutError, NewConnectionError,
ProtonError, TLSPinningError, UnknownConnectionError)
from .exceptions import (ConnectionTimeOutError,
EmptyAlternativeRoutesListError, NewConnectionError,
ProtonAPIError, TLSPinningError,
UnknownConnectionError, TLSPinningDisabledError)
from .srp import User as PmsrpUser
@ -17,6 +26,14 @@ class Session:
"x-pm-apiversion": "3",
"Accept": "application/vnd.protonmail.v1+json"
}
__ssl_verification = True
__DNS_HOSTS = ["https://dns11.quad9.net/dns-query", "https://dns.google/dns-query"]
__ENCODED_URLS = [
"dMFYGSLTQOJXXI33OOZYG4LTDNA.protonpro.xyz",
"dMFYGSLTQOJXXI33ONVQWS3BOMNUA.protonpro.xyz"
]
__tls_pinning_enabled = False
@staticmethod
def load(dump, TLSPinning=True, timeout=DEFAULT_TIMEOUT, proxies=None):
@ -66,11 +83,13 @@ class Session:
self._session_data = {}
self.s = requests.Session()
if proxies:
self.s.proxies.update(proxies)
self.s.proxies = proxies
if TLSPinning:
self.__tls_pinning_enabled = True
self.s.mount(self.__api_url, TLSPinningAdapter())
self.s.headers['x-pm-appversion'] = appversion
self.s.headers['User-Agent'] = user_agent
@ -100,7 +119,8 @@ class Session:
self.__api_url + endpoint,
headers=additional_headers,
json=jsondata,
timeout=self.__timeout
timeout=self.__timeout,
verify=self.__ssl_verification
)
except requests.exceptions.ConnectionError as e:
raise NewConnectionError(e)
@ -111,10 +131,12 @@ class Session:
except (Exception, requests.exceptions.BaseHTTPError) as e:
raise UnknownConnectionError(e)
# print(ret.content)
try:
ret = ret.json()
except json.decoder.JSONDecodeError:
raise ProtonError(
raise ProtonAPIError(
{
"Code": ret.status_code,
"Error": ret.reason,
@ -123,7 +145,7 @@ class Session:
)
if ret['Code'] != 1000:
raise ProtonError(ret)
raise ProtonAPIError(ret)
return ret
@ -216,6 +238,138 @@ class Session:
self._session_data['RefreshToken'] = refresh_response["RefreshToken"]
self.s.headers['Authorization'] = 'Bearer ' + self.AccessToken
def get_alternative_routes(self, callback=None):
"""Get alterntive routes to circumvent firewalls and api restrictions.
Args:
callback (func): a callback method to be called
Might be usefull for multi-threading.
This method leverages the power of ThreadPoolExecutor to async
check if the provided dns hosts can be reached, and if so, collect the
alternatives routes provided by them.
The encoded url are done sync because most often one of the two should work,
as it should provide the data as quick as possible.
If callback is passed then the method does not return any value, otherwise it
returns a set().
"""
routes = None
if not self.__tls_pinning_enabled:
raise TLSPinningDisabledError(
"TLS pinning should be enabled when using alternative routing"
)
for encoded_url in self.__ENCODED_URLS:
dns_query, dns_encoded_data = self.__generate_dns_message(encoded_url)
dns_hosts_response = []
host_and_dns = [(host, dns_encoded_data) for host in self.__DNS_HOSTS]
with ThreadPoolExecutor(max_workers=len(self.__DNS_HOSTS)) as executor:
dns_hosts_response = list(
executor.map(self.__query_for_dns_data, host_and_dns, timeout=20)
)
if len(dns_hosts_response) == 0:
continue
for response in dns_hosts_response:
routes = self.__extract_dns_answer(response, dns_query)
if routes:
break
if not routes:
raise EmptyAlternativeRoutesListError("No alternative routes were found")
if not callback:
return routes
callback(routes)
def __generate_dns_message(self, encoded_url):
"""Generate DNS object.
Args:
encoded_url (string): encoded url as per documentation
Returns:
tuple():
dns_query (dns.message.Message): output of dns.message.make_query
base64_dns_message (base64): encode bytes
"""
dns_query = dns.message.make_query(encoded_url, dns.rdatatype.TXT)
dns_wire = dns_query.to_wire()
base64_dns_message = base64.urlsafe_b64encode(dns_wire).rstrip(b"=")
return dns_query, base64_dns_message
def __query_for_dns_data(self, dns_settings):
"""Query for DNS host for data.
Args:
dns_settings (tuple):
host_url (str): http/https url
dns_encoded_data (str): base64 output
generate by __generate_dns_message()
This method uses requests.get to query the url
for dns data.
Returns:
bytes: content of the response
"""
dns_host, dns_encoded_data = dns_settings[0], dns_settings[1]
response = requests.get(
dns_host,
headers={"accept": "application/dns-message"},
timeout=(3.05, 16.95),
params={"dns": dns_encoded_data}
)
return response.content
def __extract_dns_answer(self, query_content, dns_query):
"""Extract alternative URL from dns message.
Args:
query_content (bytes): content of the response
dns_query (dns.message.Message): output of dns.message.make_query
Returns:
set(): alternative url for API
"""
r = dns.message.from_wire(
query_content,
keyring=dns_query.keyring,
request_mac=dns_query.request_mac,
one_rr_per_rrset=False,
ignore_trailing=False
)
routes = set()
for route in r.answer:
routes = set([str(url).strip("\"") for url in route])
return routes
@property
def api_url(self):
return self.__api_url
@api_url.setter
def api_url(self, newvalue):
self.__api_url = newvalue
@property
def ssl_verify(self):
return self.__ssl_verification
@ssl_verify.setter
def ssl_verify(self, newvalue):
self.__ssl_verification = newvalue
@property
def UID(self):
return self._session_data.get('UID', None)

View file

@ -108,12 +108,14 @@ class TLSPinningHTTPSConnectionPool(HTTPSConnectionPool):
def is_hash_valid(self, cert_hash, hash_dict):
"""Validate the hash against a known list of hashes/pins"""
try:
hash_dict[self.host].index(cert_hash)
except (ValueError, KeyError, TypeError):
return False
else:
return True
if cert_hash not in hash_dict["alt_routing"]:
return False
return True
def get_certificate(self, sock):
"""Extract and convert certificate to PEM format"""
@ -149,7 +151,7 @@ class TLSPinningPoolManager(PoolManager):
if scheme != 'https':
return super(TLSPinningPoolManager, self)._new_pool(
scheme, host, port, request_context
)
)
kwargs = self.connection_pool_kw

View file

@ -1,6 +1,6 @@
VERSION = "0.5.1"
DEFAULT_TIMEOUT = (10, 30)
VERSION = "0.6.0"
DEFAULT_TIMEOUT = (3.05, 27)
PUBKEY_HASH_DICT = {
"api.protonvpn.ch": [
"IEwk65VSaxv3s1/88vF/rM8PauJoIun3rzVCX5mLS3M=",
@ -13,6 +13,12 @@ PUBKEY_HASH_DICT = {
"8joiNBdqaYiQpKskgtkJsqRxF7zN0C0aqfi8DacknnI=",
"JMI8yrbc6jB1FYGyyWRLFTmDNgIszrNEMGlgy972e7w=",
"Iu44zU84EOCZ9vx/vz67/MRVrxF1IO4i4NIa8ETwiIY="
],
"alt_routing": [
"EU6TS9MO0L/GsDHvVc9D5fChYLNy5JdGYpJw0ccgetM=",
"iKPIHPnDNqdkvOnTClQ8zQAIKG0XavaPkcEo0LBAABA=",
"MSlVrBCdL0hKyczvgYVSRNm88RicyY04Q2y5qrBt0xA=",
"C2UxW0T1Ckl9s+8cXfjXxlEqwAfPM4HiW2y3UdtBeCw="
]
}

View file

@ -1,4 +1,4 @@
class ProtonError(Exception):
class ProtonAPIError(Exception):
def __init__(self, ret):
self.code = ret['Code']
self.error = ret['Error']
@ -10,13 +10,21 @@ class ProtonError(Exception):
super().__init__("{}".format(self.error))
class ProtonNetworkError(Exception):
class ProtonError(Exception):
def __init__(self, message, additional_context=None):
self.message = message
self.additional_context = additional_context
super().__init__(self.message)
class NetworkError(ProtonError):
"""NetworkError"""
class ProtonNetworkError(ProtonError):
"""ProtonNetworkError"""
class TLSPinningError(ProtonNetworkError):
"""TLS Pinning exception"""
@ -31,3 +39,11 @@ class ConnectionTimeOutError(ProtonNetworkError):
class UnknownConnectionError(ProtonNetworkError):
"""UnknownConnectionError"""
class EmptyAlternativeRoutesListError(ProtonError):
"""List with alterntive routes is empty."""
class TLSPinningDisabledError(ProtonAPIError):
"""TLS Pinning is disabled."""

View file

@ -1,6 +1,6 @@
%define unmangled_name proton-client
%define version 0.5.1
%define release 3
%define version 0.6.0
%define release 1
Prefix: %{_prefix}
@ -23,6 +23,7 @@ Requires: python3-requests
Requires: python3-pyOpenSSL
Requires: python3-bcrypt
Requires: python3-gnupg
Requires: python3-dnspython
%{?python_disable_dependency_generator}
@ -48,6 +49,9 @@ rm -rf $RPM_BUILD_ROOT
%defattr(-,root,root)
%changelog
* Thu Jul 08 2021 Proton Technologies AG <opensource@proton.me> 0.6.0-1
- Feature: Alternative Routing
* Mon May 24 2021 Proton Technologies AG <opensource@proton.me> 0.5.1-3
- Add new exceptions for improved case handling

View file

@ -10,7 +10,7 @@ setup(
author="Proton Technologies",
author_email="contact@protonmail.com",
url="https://github.com/ProtonMail/proton-python-client",
install_requires=["requests", "bcrypt", "python-gnupg", "pyopenssl"],
install_requires=["requests", "bcrypt", "python-gnupg", "pyopenssl", "dnspython"],
packages=find_packages(),
include_package_data=True,
license="GPLv3",