commit b2ccd90117e3d25e5b025b48981a1058eecdb9f6 Author: Alexandru Cheltuitor Date: Thu Oct 29 16:49:42 2020 +0000 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6a44f11 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +build/ +dist/ +MANIFEST +*.pyc +*.egg-info/ +.vscode/ +*.lock diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..ee2776b --- /dev/null +++ b/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2020 Proton Technologies AG +Copyright (c) 2012 Tom Cocagne + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..449e0ce --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,4 @@ +include LICENSE +include *.txt +recursive-include proton *.py +recursive-include proton *.rst diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..9c55bfe --- /dev/null +++ b/Pipfile @@ -0,0 +1,13 @@ +[[source]] +name = "pypi" +url = "https://pypi.org/simple" +verify_ssl = true + +[dev-packages] +pytest = "*" + +[packages] +requests = ">=2.18.4" +bcrypt = ">=3.1.4" +python-gnupg = ">=0.4.1" +pyopenssl = ">=17.5" diff --git a/README.md b/README.md new file mode 100644 index 0000000..f18b451 --- /dev/null +++ b/README.md @@ -0,0 +1,2 @@ +proton-client +============= diff --git a/proton/__init__.py b/proton/__init__.py new file mode 100644 index 0000000..4a05266 --- /dev/null +++ b/proton/__init__.py @@ -0,0 +1 @@ +from .api import Session, ProtonError diff --git a/proton/api.py b/proton/api.py new file mode 100644 index 0000000..f89bb4e --- /dev/null +++ b/proton/api.py @@ -0,0 +1,223 @@ +import base64 +import json + +import gnupg +import requests + +from .cert_pinning import TLSPinningAdapter +from .srp import User as PmsrpUser + + +class ProtonError(Exception): + def __init__(self, ret): + self.code = ret['Code'] + self.error = ret['Error'] + try: + self.headers = ret["Headers"] + except KeyError: + self.headers = "" + + super().__init__("[{}] {} {}".format( + self.code, self.error, self.headers + )) + + +class Session: + _base_headers = { + "x-pm-apiversion": "3", + "Accept": "application/vnd.protonmail.v1+json" + } + + _srp_modulus_key = """-----BEGIN PGP PUBLIC KEY BLOCK----- +xjMEXAHLgxYJKwYBBAHaRw8BAQdAFurWXXwjTemqjD7CXjXVyKf0of7n9Ctm +L8v9enkzggHNEnByb3RvbkBzcnAubW9kdWx1c8J3BBAWCgApBQJcAcuDBgsJ +BwgDAgkQNQWFxOlRjyYEFQgKAgMWAgECGQECGwMCHgEAAPGRAP9sauJsW12U +MnTQUZpsbJb53d0Wv55mZIIiJL2XulpWPQD/V6NglBd96lZKBmInSXX/kXat +Sv+y0io+LR8i2+jV+AbOOARcAcuDEgorBgEEAZdVAQUBAQdAeJHUz1c9+KfE +kSIgcBRE3WuXC4oj5a2/U3oASExGDW4DAQgHwmEEGBYIABMFAlwBy4MJEDUF +hcTpUY8mAhsMAAD/XQD8DxNI6E78meodQI+wLsrKLeHn32iLvUqJbVDhfWSU +WO4BAMcm1u02t4VKw++ttECPt+HUgPUq5pqQWe5Q2cW4TMsE +=Y4Mw +-----END PGP PUBLIC KEY BLOCK-----""" + + @staticmethod + def load(dump, TLSPinning=True): + api_url = dump['api_url'] + appversion = dump['appversion'] + user_agent = dump['User-Agent'] + cookies = dump.get('cookies', {}) + s = Session(api_url, appversion, user_agent, TLSPinning=TLSPinning) + requests.utils.add_dict_to_cookiejar(s.s.cookies, cookies) + s._session_data = dump['session_data'] + if s.UID is not None: + s.s.headers['x-pm-uid'] = s.UID + s.s.headers['Authorization'] = 'Bearer ' + s.AccessToken + return s + + def dump(self): + return { + 'api_url': self.__api_url, + 'appversion': self.__appversion, + 'User-Agent': self.__user_agent, + 'cookies': self.s.cookies.get_dict(), + 'session_data': self._session_data + } + + def __init__(self, api_url, appversion="Other", user_agent="None", TLSPinning=True, ClientSecret=None): + self.__api_url = api_url + self.__appversion = appversion + self.__user_agent = user_agent + self.__clientsecret = ClientSecret + + ## Verify modulus + self.__gnupg = gnupg.GPG() + self.__gnupg.import_keys(self._srp_modulus_key) + + self._session_data = {} + + self.s = requests.Session() + if TLSPinning: + self.s.mount(self.__api_url, TLSPinningAdapter()) + self.s.headers['x-pm-appversion'] = appversion + self.s.headers['User-Agent'] = user_agent + + def api_request(self, endpoint, jsondata=None, additional_headers=None, method=None): + fct = self.s.post + if method is None: + if jsondata is None: + fct = self.s.get + else: + fct = self.s.post + else: + fct = { + 'get': self.s.get, + 'post': self.s.post, + 'put': self.s.put, + 'delete': self.s.delete, + 'patch': self.s.patch + }.get(method.lower()) + + if fct is None: + raise ValueError("Unknown method: {}".format(method)) + + + ret = fct( + self.__api_url + endpoint, + headers = additional_headers, + json = jsondata + ) + + try: + ret = ret.json() + except json.decoder.JSONDecodeError: + raise ProtonError( + { + "Code": ret.status_code, + "Error": ret.reason, + "Headers": ret.headers + } + ) + + if ret['Code'] != 1000: + raise ProtonError(ret) + + return ret + + + def authenticate(self, username, password): + self.logout() + + payload = {"Username": username} + if self.__clientsecret: + payload['ClientSecret'] = self.__clientsecret + info_response = self.api_request("/auth/info", payload) + d = self.__gnupg.decrypt(info_response['Modulus']) + + if not d.valid: + raise ValueError('Invalid modulus') + + modulus = base64.b64decode(d.data.strip()) + challenge = base64.b64decode(info_response["ServerEphemeral"]) + salt = base64.b64decode(info_response["Salt"]) + session = info_response["SRPSession"] + version = info_response["Version"] + + usr = PmsrpUser(password, modulus) + A = usr.get_challenge() + M = usr.process_challenge(salt, challenge, version) + + if M is None: + raise ValueError('Invalid challenge') + + ## Send response + payload = { + "Username": username, + "ClientEphemeral" : base64.b64encode(A).decode('utf8'), + "ClientProof" : base64.b64encode(M).decode('utf8'), + "SRPSession": session, + } + if self.__clientsecret: + payload['ClientSecret'] = self.__clientsecret + auth_response = self.api_request("/auth", payload) + + if "ServerProof" not in auth_response: + raise ValueError("Invalid password") + + usr.verify_session( base64.b64decode(auth_response["ServerProof"])) + if not usr.authenticated(): + raise ValueError('Invalid server proof') + + self._session_data = { + 'UID': auth_response["UID"], + 'AccessToken': auth_response["AccessToken"], + 'RefreshToken': auth_response["RefreshToken"], + 'Scope': auth_response["Scope"].split(), + } + + if self.UID is not None: + self.s.headers['x-pm-uid'] = self.UID + self.s.headers['Authorization'] = 'Bearer ' + self.AccessToken + + return self.Scope + + def provide_2fa(self, code): + ret = self.api_request('/auth/2fa', { + "TwoFactorCode": code + }) + self._session_data['Scope'] = ret['Scope'] + + return self.Scope + + def logout(self): + if self._session_data: + self.api_request('/auth',method='DELETE') + del self.s.headers['Authorization'] + del self.s.headers['x-pm-uid'] + self._session_data = {} + + def refresh(self): + refresh_response = self.api_request('/auth/refresh', { + "ResponseType": "token", + "GrantType": "refresh_token", + "RefreshToken": self.RefreshToken, + "RedirectURI": "http://protonmail.ch" + }) + self._session_data['AccessToken'] = refresh_response["AccessToken"] + self._session_data['RefreshToken'] = refresh_response["RefreshToken"] + self.s.headers['Authorization'] = 'Bearer ' + self.AccessToken + + @property + def UID(self): + return self._session_data.get('UID', None) + + @property + def AccessToken(self): + return self._session_data.get('AccessToken', None) + + @property + def RefreshToken(self): + return self._session_data.get('RefreshToken', None) + + @property + def Scope(self): + return self._session_data.get('Scope', []) diff --git a/proton/cert_pinning.py b/proton/cert_pinning.py new file mode 100644 index 0000000..5692cd5 --- /dev/null +++ b/proton/cert_pinning.py @@ -0,0 +1,187 @@ +import base64 +import hashlib +from ssl import DER_cert_to_PEM_cert + +import requests +from OpenSSL import crypto +from requests.adapters import HTTPAdapter +from urllib3.connectionpool import HTTPSConnectionPool +from urllib3.poolmanager import PoolManager +from urllib3.util.timeout import Timeout + +from .constants import PUBKEY_HASH_DICT + + +class TLSPinningError(requests.exceptions.SSLError): + def __init__(self, strerror): + self.strerror = strerror + super(TLSPinningError, self).__init__(strerror) + + +class TLSPinningHTTPSConnectionPool(HTTPSConnectionPool): + """Verify the certificate upon each connection""" + def __init__( + self, + host, + hash_dict, + port=None, + strict=False, + timeout=Timeout.DEFAULT_TIMEOUT, + maxsize=1, + block=False, + headers=None, + retries=None, + _proxy=None, + _proxy_headers=None, + key_file=None, + cert_file=None, + cert_reqs=None, + key_password=None, + ca_certs=None, + ssl_version=None, + assert_hostname=None, + assert_fingerprint=None, + ca_cert_dir=None, + **conn_kw + ): + self.hash_dict = hash_dict + try: + super(TLSPinningHTTPSConnectionPool, self).__init__( + host, + port, + strict, + timeout, + maxsize, + block, + headers, + retries, + _proxy, + _proxy_headers, + key_file, + cert_file, + cert_reqs, + key_password, + ca_certs, + ssl_version, + assert_hostname, + assert_fingerprint, + ca_cert_dir, + **conn_kw + ) + except TypeError: + super(TLSPinningHTTPSConnectionPool, self).__init__( + host, + port, + strict, + timeout, + maxsize, + block, + headers, + retries, + _proxy, + _proxy_headers, + key_file, + cert_file, + cert_reqs, + ca_certs, + ssl_version, + assert_hostname, + assert_fingerprint, + ca_cert_dir, + **conn_kw + ) + + def _validate_conn(self, conn): + r = super(TLSPinningHTTPSConnectionPool, self)._validate_conn(conn) + + sock = conn.sock + + pem_certificate = self.get_certificate(sock) + + if self.is_session_secure(pem_certificate, conn, self.hash_dict): + return r + + def is_session_secure(self, cert, conn, hash_dict): + """Check if connection is secure""" + + cert_hash = self.extract_hash(cert) + + if not self.is_hash_valid(cert_hash, hash_dict): + # Also generate a report + conn.close() + raise TLSPinningError("Insecure connection") + + return True + + def is_hash_valid(self, cert_hash, hash_dict): + """Validate the hash against a known list of hashes/pins""" + try: + hash_dict[self.host].index(cert_hash) + except (ValueError, KeyError, TypeError): + return False + else: + return True + + def get_certificate(self, sock): + """Extract and convert certificate to PEM format""" + certificate_binary_form = sock.getpeercert(True) + return DER_cert_to_PEM_cert(certificate_binary_form) + + def extract_hash(self, cert): + """Extract encrypted hash from the certificate""" + cert_obj = crypto.load_certificate(crypto.FILETYPE_PEM, cert) + pubkey_obj = cert_obj.get_pubkey() + pubkey = crypto.dump_publickey(crypto.FILETYPE_ASN1, pubkey_obj) + + spki_hash = hashlib.sha256(pubkey).digest() + cert_hash = base64.b64encode(spki_hash).decode('utf-8') + return cert_hash + + +class TLSPinningPoolManager(PoolManager): + """Attach TLSPinningHTTPSConnectionPool to TLSPinningPoolManager""" + def __init__( + self, + hash_dict, + num_pools=10, + headers=None, + **connection_pool_kw + ): + self.hash_dict = hash_dict + super(TLSPinningPoolManager, self).__init__( + num_pools=10, headers=None, **connection_pool_kw + ) + + def _new_pool(self, scheme, host, port, request_context): + if scheme != 'https': + return super(TLSPinningPoolManager, self)._new_pool( + scheme, host, port, request_context + ) + + kwargs = self.connection_pool_kw + + pool = TLSPinningHTTPSConnectionPool( + host=host, port=port, + hash_dict=self.hash_dict, **kwargs + ) + + return pool + + +class TLSPinningAdapter(HTTPAdapter): + """Attach TLSPinningPoolManager to TLSPinningAdapter""" + def __init__(self, hash_dict=PUBKEY_HASH_DICT): + self.hash_dict = hash_dict + super(TLSPinningAdapter, self).__init__() + + def init_poolmanager( + self, + connections, + maxsize, + block=False, + **pool_kwargs + ): + self.poolmanager = TLSPinningPoolManager( + num_pools=connections, maxsize=maxsize, block=block, + strict=True, hash_dict=self.hash_dict, **pool_kwargs + ) diff --git a/proton/constants.py b/proton/constants.py new file mode 100644 index 0000000..788c6cb --- /dev/null +++ b/proton/constants.py @@ -0,0 +1,15 @@ +PUBKEY_HASH_DICT = { + "api.protonvpn.ch": [ + "IEwk65VSaxv3s1/88vF/rM8PauJoIun3rzVCX5mLS3M=", + "drtmcR2kFkM8qJClsuWgUzxgBkePfRCkRpqUesyDmeE=", + "YRGlaY0jyJ4Jw2/4M8FIftwbDIQfh8Sdro96CeEel54=", + "AfMENBVvOS8MnISprtvyPsjKlPooqh8nMB/pvCrpJpw=" + ], + "protonvpn.com": [ + "+0dMG0qG2Ga+dNE8uktwMm7dv6RFEXwBoBjQ43GqsQ0=", + "8joiNBdqaYiQpKskgtkJsqRxF7zN0C0aqfi8DacknnI=", + "JMI8yrbc6jB1FYGyyWRLFTmDNgIszrNEMGlgy972e7w=", + "Iu44zU84EOCZ9vx/vz67/MRVrxF1IO4i4NIa8ETwiIY=" + ] + +} \ No newline at end of file diff --git a/proton/doc/conf.py b/proton/doc/conf.py new file mode 100644 index 0000000..ba75a8c --- /dev/null +++ b/proton/doc/conf.py @@ -0,0 +1,216 @@ +# -*- coding: utf-8 -*- +# +# Secure Remote Password documentation build configuration file, created by +# sphinx-quickstart on Fri Mar 25 10:20:52 2011. +# +# This file is execfile()d with the current directory set to its containing dir. +# +# Note that not all possible configuration values are present in this +# autogenerated file. +# +# All configuration values have a default; values that are commented out +# serve to show the default. + +import sys, os + +# If extensions (or modules to document with autodoc) are in another directory, +# add these directories to sys.path here. If the directory is relative to the +# documentation root, use os.path.abspath to make it absolute, like shown here. +#sys.path.insert(0, os.path.abspath('.')) + +# -- General configuration ----------------------------------------------------- + +# If your documentation needs a minimal Sphinx version, state it here. +#needs_sphinx = '1.0' + +# Add any Sphinx extension module names here, as strings. They can be extensions +# coming with Sphinx (named 'sphinx.ext.*') or your custom ones. +extensions = [] + +# Add any paths that contain templates here, relative to this directory. +templates_path = ['_templates'] + +# The suffix of source filenames. +source_suffix = '.rst' + +# The encoding of source files. +#source_encoding = 'utf-8-sig' + +# The master toctree document. +master_doc = 'index' + +# General information about the project. +project = u'Secure Remote Password' +copyright = u'2011, Tom Cocagne' + +# The version info for the project you're documenting, acts as replacement for +# |version| and |release|, also used in various other places throughout the +# built documents. +# +# The short X.Y version. +version = '1.0' +# The full version, including alpha/beta/rc tags. +release = '1.0' + +# The language for content autogenerated by Sphinx. Refer to documentation +# for a list of supported languages. +#language = None + +# There are two options for replacing |today|: either, you set today to some +# non-false value, then it is used: +#today = '' +# Else, today_fmt is used as the format for a strftime call. +#today_fmt = '%B %d, %Y' + +# List of patterns, relative to source directory, that match files and +# directories to ignore when looking for source files. +exclude_patterns = ['_build'] + +# The reST default role (used for this markup: `text`) to use for all documents. +#default_role = None + +# If true, '()' will be appended to :func: etc. cross-reference text. +#add_function_parentheses = True + +# If true, the current module name will be prepended to all description +# unit titles (such as .. function::). +#add_module_names = True + +# If true, sectionauthor and moduleauthor directives will be shown in the +# output. They are ignored by default. +#show_authors = False + +# The name of the Pygments (syntax highlighting) style to use. +pygments_style = 'sphinx' + +# A list of ignored prefixes for module index sorting. +#modindex_common_prefix = [] + + +# -- Options for HTML output --------------------------------------------------- + +# The theme to use for HTML and HTML Help pages. See the documentation for +# a list of builtin themes. +html_theme = 'default' + +# Theme options are theme-specific and customize the look and feel of a theme +# further. For a list of options available for each theme, see the +# documentation. +#html_theme_options = {} + +# Add any paths that contain custom themes here, relative to this directory. +#html_theme_path = [] + +# The name for this set of Sphinx documents. If None, it defaults to +# " v documentation". +#html_title = None + +# A shorter title for the navigation bar. Default is the same as html_title. +#html_short_title = None + +# The name of an image file (relative to this directory) to place at the top +# of the sidebar. +#html_logo = None + +# The name of an image file (within the static path) to use as favicon of the +# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32 +# pixels large. +#html_favicon = None + +# Add any paths that contain custom static files (such as style sheets) here, +# relative to this directory. They are copied after the builtin static files, +# so a file named "default.css" will overwrite the builtin "default.css". +html_static_path = ['_static'] + +# If not '', a 'Last updated on:' timestamp is inserted at every page bottom, +# using the given strftime format. +#html_last_updated_fmt = '%b %d, %Y' + +# If true, SmartyPants will be used to convert quotes and dashes to +# typographically correct entities. +#html_use_smartypants = True + +# Custom sidebar templates, maps document names to template names. +#html_sidebars = {} + +# Additional templates that should be rendered to pages, maps page names to +# template names. +#html_additional_pages = {} + +# If false, no module index is generated. +#html_domain_indices = True + +# If false, no index is generated. +#html_use_index = True + +# If true, the index is split into individual pages for each letter. +#html_split_index = False + +# If true, links to the reST sources are added to the pages. +#html_show_sourcelink = True + +# If true, "Created using Sphinx" is shown in the HTML footer. Default is True. +#html_show_sphinx = True + +# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True. +#html_show_copyright = True + +# If true, an OpenSearch description file will be output, and all pages will +# contain a tag referring to it. The value of this option must be the +# base URL from which the finished HTML is served. +#html_use_opensearch = '' + +# This is the file name suffix for HTML files (e.g. ".xhtml"). +#html_file_suffix = None + +# Output file base name for HTML help builder. +htmlhelp_basename = 'SecureRemotePassworddoc' + + +# -- Options for LaTeX output -------------------------------------------------- + +# The paper size ('letter' or 'a4'). +#latex_paper_size = 'letter' + +# The font size ('10pt', '11pt' or '12pt'). +#latex_font_size = '10pt' + +# Grouping the document tree into LaTeX files. List of tuples +# (source start file, target name, title, author, documentclass [howto/manual]). +latex_documents = [ + ('index', 'SecureRemotePassword.tex', u'Secure Remote Password Documentation', + u'Tom Cocagne', 'manual'), +] + +# The name of an image file (relative to this directory) to place at the top of +# the title page. +#latex_logo = None + +# For "manual" documents, if this is true, then toplevel headings are parts, +# not chapters. +#latex_use_parts = False + +# If true, show page references after internal links. +#latex_show_pagerefs = False + +# If true, show URL addresses after external links. +#latex_show_urls = False + +# Additional stuff for the LaTeX preamble. +#latex_preamble = '' + +# Documents to append as an appendix to all manuals. +#latex_appendices = [] + +# If false, no module index is generated. +#latex_domain_indices = True + + +# -- Options for manual page output -------------------------------------------- + +# One entry per manual page. List of tuples +# (source start file, name, description, authors, manual section). +man_pages = [ + ('index', 'secureremotepassword', u'Secure Remote Password Documentation', + [u'Tom Cocagne'], 1) +] diff --git a/proton/doc/index.rst b/proton/doc/index.rst new file mode 100644 index 0000000..9648908 --- /dev/null +++ b/proton/doc/index.rst @@ -0,0 +1,14 @@ +Welcome to Python Proton Client module's documentation! +======================================================= + +Contents: + +.. toctree:: + :maxdepth: 2 + +Indices and tables +================== + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` diff --git a/proton/srp/README.md b/proton/srp/README.md new file mode 100644 index 0000000..747900d --- /dev/null +++ b/proton/srp/README.md @@ -0,0 +1,37 @@ +# Secure Remote Password submodule +This submodule provides the interface to the custom implementation of ProtonMail's SRP API. +It automatically tries to load the constant time ctypes + OpenSSL implementation, +and on failure it uses the native long int implementation. +It is based on [pysrp](https://github.com/cocagne/pysrp). + +## Examples +### Authenticate against the API +```python +from proton.srp import User +usr = User(password, modulus) +client_challenge = usr.get_challenge() + +# Get server challenge and user salt... + +client_proof = usr.process_challenge(salt, server_challenge, version) + +# Send client proof... + +usr.verify_session(server_proof) +if usr.authenticated(): + print("Logged in!") +``` + +### Generate new random verifier +```python +from proton.srp import User +usr = User(password, modulus) +generated_salt, generated_v = usr.compute_v() +``` + +### Generate verifier given salt +```python +from proton.srp import User +usr = User(password, modulus) +generated_salt, generated_v = usr.compute_v(salt) +``` diff --git a/proton/srp/__init__.py b/proton/srp/__init__.py new file mode 100644 index 0000000..5389423 --- /dev/null +++ b/proton/srp/__init__.py @@ -0,0 +1,13 @@ +from . import _pysrp +_mod = None + +try: + from . import _ctsrp + _mod = _ctsrp +except (ImportError, OSError): + pass + +if not _mod: + _mod = _pysrp + +User = _mod.User diff --git a/proton/srp/_ctsrp.py b/proton/srp/_ctsrp.py new file mode 100644 index 0000000..492e780 --- /dev/null +++ b/proton/srp/_ctsrp.py @@ -0,0 +1,304 @@ +# N A large safe prime (N = 2q+1, where q is prime) +# All arithmetic is done modulo N. +# g A generator modulo N +# k Multiplier parameter (k = H(N, g) in SRP-6a, k = 3 for legacy SRP-6) +# s User's salt +# I Username +# p Cleartext Password +# H() One-way hash function +# ^ (Modular) Exponentiation +# u Random scrambling parameter +# a,b Secret ephemeral values +# A,B Public ephemeral values +# x Private key (derived from p and s) +# v Password verifier + +from __future__ import division +import sys +import ctypes + +from .pmhash import pmhash +from .util import * + +dlls = list() + +platform = sys.platform +if platform == 'darwin': + dlls.append(ctypes.cdll.LoadLibrary('libssl.dylib')) +elif 'win' in platform: + for d in ('libeay32.dll', 'libssl32.dll', 'ssleay32.dll'): + try: + dlls.append(ctypes.cdll.LoadLibrary(d)) + except: + pass +else: + try: + dlls.append(ctypes.cdll.LoadLibrary('libssl.so.10')) + except OSError: + try: + dlls.append(ctypes.cdll.LoadLibrary('libssl.so.1.0.0')) + except OSError: + dlls.append(ctypes.cdll.LoadLibrary('libssl.so')) + + +class BIGNUM_Struct(ctypes.Structure): + _fields_ = [("d", ctypes.c_void_p), + ("top", ctypes.c_int), + ("dmax", ctypes.c_int), + ("neg", ctypes.c_int), + ("flags", ctypes.c_int)] + + +class BN_CTX_Struct(ctypes.Structure): + _fields_ = [("_", ctypes.c_byte)] + + +BIGNUM = ctypes.POINTER(BIGNUM_Struct) +BN_CTX = ctypes.POINTER(BN_CTX_Struct) + + +def load_func(name, args, returns=ctypes.c_int): + d = sys.modules[__name__].__dict__ + f = None + + for dll in dlls: + try: + f = getattr(dll, name) + f.argtypes = args + f.restype = returns + d[name] = f + return + except: + pass + raise ImportError('Unable to load required functions from SSL dlls') + + +load_func('BN_new', [], BIGNUM) +load_func('BN_free', [BIGNUM], None) +load_func('BN_clear', [BIGNUM], None) + +load_func('BN_CTX_new', [], BN_CTX) +load_func('BN_CTX_free', [BN_CTX], None) + +load_func('BN_cmp', [BIGNUM, BIGNUM], ctypes.c_int) + +load_func('BN_num_bits', [BIGNUM], ctypes.c_int) + +load_func('BN_add', [BIGNUM, BIGNUM, BIGNUM]) +load_func('BN_sub', [BIGNUM, BIGNUM, BIGNUM]) +load_func('BN_mul', [BIGNUM, BIGNUM, BIGNUM, BN_CTX]) +load_func('BN_div', [BIGNUM, BIGNUM, BIGNUM, BIGNUM, BN_CTX]) +load_func('BN_mod_exp', [BIGNUM, BIGNUM, BIGNUM, BIGNUM, BN_CTX]) + +load_func('BN_rand', [BIGNUM, ctypes.c_int, ctypes.c_int, ctypes.c_int]) + +load_func('BN_bn2bin', [BIGNUM, ctypes.c_char_p]) +load_func('BN_bin2bn', [ctypes.c_char_p, ctypes.c_int, BIGNUM], BIGNUM) + +load_func('BN_hex2bn', [ctypes.POINTER(BIGNUM), ctypes.c_char_p]) +load_func('BN_bn2hex', [BIGNUM], ctypes.c_char_p) + +load_func('CRYPTO_free', [ctypes.c_char_p]) + +load_func('RAND_seed', [ctypes.c_char_p, ctypes.c_int]) + + +def bn_num_bytes(a): + return ((BN_num_bits(a) + 7) // 8) + + +def bn_mod(rem, m, d, ctx): + return BN_div(None, rem, m, d, ctx) + + +def bn_is_zero(n): + return n[0].top == 0 + + +def bn_to_bytes(n): + b = ctypes.create_string_buffer(bn_num_bytes(n)) + BN_bn2bin(n, b) + return b.raw[::-1] + + +def bytes_to_bn(dest_bn, bytes): + BN_bin2bn(bytes[::-1], len(bytes), dest_bn) + + +def bn_hash(hash_class, dest, n1, n2): + h = hash_class() + h.update(bn_to_bytes(n1)) + h.update(bn_to_bytes(n2)) + d = h.digest() + bytes_to_bn(dest, d) + + +def bn_hash_k(hash_class, dest, g, N, width): + h = hash_class() + bin1 = ctypes.create_string_buffer(width) + bin2 = ctypes.create_string_buffer(width) + BN_bn2bin(g, bin1) + BN_bn2bin(N, bin2) + h.update(bin1) + h.update(bin2[::-1]) + bytes_to_bn(dest, h.digest()) + + +def calculate_x(hash_class, dest, salt, password, modulus, version): + exp = hash_password(hash_class, password, bn_to_bytes(salt), bn_to_bytes(modulus), version) + bytes_to_bn(dest, exp) + + +def update_hash(h, n): + h.update(bn_to_bytes(n)) + + +def calculate_client_challenge(hash_class, A, B, K): + h = hash_class() + update_hash(h, A) + update_hash(h, B) + h.update(K) + return h.digest() + + +def calculate_server_challenge(hash_class, A, M, K): + h = hash_class() + update_hash(h, A) + h.update(M) + h.update(K) + return h.digest() + + +def get_ngk(hash_class, n_bin, g_hex, ctx): + N = BN_new() + g = BN_new() + k = BN_new() + + bytes_to_bn(N, n_bin) + BN_hex2bn(g, g_hex) + bn_hash_k(hash_class, k, g, N, width=bn_num_bytes(N)) + + return N, g, k + + +class User(object): + def __init__(self, password, n_bin, g_hex=b"2", bytes_a=None, bytes_A=None): + if bytes_a and len(bytes_a) != 32: + raise ValueError("32 bytes required for bytes_a") + + if not isinstance(password, str) or len(password) == 0: + raise ValueError("Invalid password") + + self.password = password.encode() + self.a = BN_new() + self.A = BN_new() + self.B = BN_new() + self.s = BN_new() + self.S = BN_new() + self.u = BN_new() + self.x = BN_new() + self.v = BN_new() + self.tmp1 = BN_new() + self.tmp2 = BN_new() + self.tmp3 = BN_new() + self.ctx = BN_CTX_new() + self.M = None + self.K = None + self.expected_server_proof = None + self._authenticated = False + + self.hash_class = pmhash + self.N, self.g, self.k = get_ngk(self.hash_class, n_bin, g_hex, self.ctx) + + if bytes_a: + bytes_to_bn(self.a, bytes_a) + else: + BN_rand(self.a, 256, 0, 0) + + if bytes_A: + bytes_to_bn(self.A, bytes_A) + else: + BN_mod_exp(self.A, self.g, self.a, self.N, self.ctx) + + def __del__(self): + if not hasattr(self, 'a'): + return # __init__ threw exception. no clean up required + BN_free(self.a) + BN_free(self.A) + BN_free(self.B) + BN_free(self.s) + BN_free(self.S) + BN_free(self.u) + BN_free(self.x) + BN_free(self.v) + BN_free(self.N) + BN_free(self.g) + BN_free(self.k) + BN_free(self.tmp1) + BN_free(self.tmp2) + BN_free(self.tmp3) + BN_CTX_free(self.ctx) + + def authenticated(self): + return self._authenticated + + def get_ephemeral_secret(self): + return bn_to_bytes(self.a) + + def get_session_key(self): + return self.K if self._authenticated else None + + def get_challenge(self): + return bn_to_bytes(self.A) + + # Returns M or None if SRP-6a safety check is violated + def process_challenge(self, bytes_s, bytes_server_challenge, version=PM_VERSION): + bytes_to_bn(self.s, bytes_s) + bytes_to_bn(self.B, bytes_server_challenge) + + # SRP-6a safety check + if bn_is_zero(self.B): + return None + + bn_hash(self.hash_class, self.u, self.A, self.B) + + # SRP-6a safety check + if bn_is_zero(self.u): + return None + + calculate_x(self.hash_class, self.x, self.s, self.password, self.N, version) + + BN_mod_exp(self.v, self.g, self.x, self.N, self.ctx) + + # S = (B - k*(g^x)) ^ (a + ux) + BN_mul(self.tmp1, self.u, self.x, self.ctx) + BN_add(self.tmp2, self.a, self.tmp1) # tmp2 = (a + ux) + BN_mod_exp(self.tmp1, self.g, self.x, self.N, self.ctx) + BN_mul(self.tmp3, self.k, self.tmp1, self.ctx) # tmp3 = k*(g^x) + BN_sub(self.tmp1, self.B, self.tmp3) # tmp1 = (B - K*(g^x)) + BN_mod_exp(self.S, self.tmp1, self.tmp2, self.N, self.ctx) + + self.K = bn_to_bytes(self.S) + self.M = calculate_client_challenge(self.hash_class, self.A, self.B, self.K) + self.expected_server_proof = calculate_server_challenge(self.hash_class, self.A, self.M, self.K) + + return self.M + + def verify_session(self, server_proof): + if self.expected_server_proof == server_proof: + self._authenticated = True + + def compute_v(self, bytes_s=None, version=PM_VERSION): + if bytes_s is None: + BN_rand(self.s, 10*8, 0, 0) + else: + bytes_to_bn(self.s, bytes_s) + + calculate_x(self.hash_class, self.x, self.s, self.password, self.N, version) + BN_mod_exp(self.v, self.g, self.x, self.N, self.ctx) + return bn_to_bytes(self.s), bn_to_bytes(self.v) + +# --------------------------------------------------------- +# Init +# +RAND_seed(os.urandom(32), 32) diff --git a/proton/srp/_pysrp.py b/proton/srp/_pysrp.py new file mode 100644 index 0000000..5e47789 --- /dev/null +++ b/proton/srp/_pysrp.py @@ -0,0 +1,130 @@ +# N A large safe prime (N = 2q+1, where q is prime) +# All arithmetic is done modulo N. +# g A generator modulo N +# k Multiplier parameter (k = H(N, g) in SRP-6a, k = 3 for legacy SRP-6) +# s User's salt +# I Username +# p Cleartext Password +# H() One-way hash function +# ^ (Modular) Exponentiation +# u Random scrambling parameter +# a,b Secret ephemeral values +# A,B Public ephemeral values +# x Private key (derived from p and s) +# v Password verifier +from .pmhash import pmhash +from .util import * + + +def get_ng(n_bin, g_hex): + return bytes_to_long(n_bin), int(g_hex, 16) + + +def hash_k(hash_class, g, modulus, width): + h = hash_class() + h.update(g.to_bytes(width, 'little')) + h.update(modulus.to_bytes(width, 'little')) + return bytes_to_long(h.digest()) + + +def calculate_x(hash_class, salt, password, modulus, version): + exp = hash_password(hash_class, password, long_to_bytes(salt), long_to_bytes(modulus), version) + return bytes_to_long(exp) + + +def calculate_client_proof(hash_class, A, B, K): + h = hash_class() + h.update(long_to_bytes(A)) + h.update(long_to_bytes(B)) + h.update(K) + return h.digest() + + +def calculate_server_proof(hash_class, A, M, K): + h = hash_class() + h.update(long_to_bytes(A)) + h.update(M) + h.update(K) + return h.digest() + + +class User(object): + def __init__(self, password, n_bin, g_hex=b"2", bytes_a=None, bytes_A=None): + if bytes_a and len(bytes_a) != 32: + raise ValueError("32 bytes required for bytes_a") + + if not isinstance(password, str) or len(password) == 0: + raise ValueError("Invalid password") + + self.N, self.g = get_ng(n_bin, g_hex) + self.hash_class = pmhash + self.k = hash_k(self.hash_class, self.g, self.N, width=long_length(self.N)) + + self.p = password.encode() + if bytes_a: + self.a = bytes_to_long(bytes_a) + else: + self.a = get_random_of_length(32) + if bytes_A: + self.A = bytes_to_long(bytes_A) + else: + self.A = pow(self.g, self.a, self.N) + self.v = None + self.M = None + self.K = None + self.expected_server_proof = None + self._authenticated = False + self.s = None + self.S = None + self.B = None + self.u = None + self.x = None + + def authenticated(self): + return self._authenticated + + def get_ephemeral_secret(self): + return long_to_bytes(self.a) + + def get_session_key(self): + return self.K if self._authenticated else None + + def get_challenge(self): + return long_to_bytes(self.A) + + # Returns M or None if SRP-6a safety check is violated + def process_challenge(self, bytes_s, bytes_server_challenge, version=PM_VERSION): + self.s = bytes_to_long(bytes_s) + self.B = bytes_to_long(bytes_server_challenge) + + # SRP-6a safety check + if (self.B % self.N) == 0: + return None + + self.u = custom_hash(self.hash_class, self.A, self.B) + + # SRP-6a safety check + if self.u == 0: + return None + + self.x = calculate_x(self.hash_class, self.s, self.p, self.N, version) + + self.v = pow(self.g, self.x, self.N) + + self.S = pow((self.B - self.k * self.v), (self.a + self.u * self.x), self.N) + + self.K = long_to_bytes(self.S) + self.M = calculate_client_proof(self.hash_class, self.A, self.B, self.K) + self.expected_server_proof = calculate_server_proof(self.hash_class, self.A, self.M, self.K) + + return self.M + + def verify_session(self, server_proof): + if self.expected_server_proof == server_proof: + self._authenticated = True + + def compute_v(self, bytes_s=None, version=PM_VERSION): + self.s = get_random_of_length(10) if bytes_s is None else bytes_to_long(bytes_s) + self.x = calculate_x(self.hash_class, self.s, self.p, self.N, version) + + return long_to_bytes(self.s), long_to_bytes(pow(self.g, self.x, self.N)) diff --git a/proton/srp/pmhash.py b/proton/srp/pmhash.py new file mode 100644 index 0000000..7b03297 --- /dev/null +++ b/proton/srp/pmhash.py @@ -0,0 +1,27 @@ +# Custom expanded version of SHA512 +import hashlib + + +class PMHash: + digest_size = 256 + name = 'PMHash' + + def __init__(self, b=b""): + self.b = b + + def update(self, b): + self.b += b + + def digest(self): + return hashlib.sha512(self.b + b'\0').digest() + hashlib.sha512(self.b + b'\1').digest() + hashlib.sha512( + self.b + b'\2').digest() + hashlib.sha512(self.b + b'\3').digest() + + def hexdigest(self): + return self.digest().hex() + + def copy(self): + return PMHash(self.b) + + +def pmhash(b=b""): + return PMHash(b) diff --git a/proton/srp/util.py b/proton/srp/util.py new file mode 100644 index 0000000..ff9ac46 --- /dev/null +++ b/proton/srp/util.py @@ -0,0 +1,57 @@ +import base64 +import bcrypt +import os + +PM_VERSION = 4 + + +def bcrypt_b64_encode(s): # The joy of bcrypt + bcrypt_base64 = b"./ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" + std_base64chars = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/" + s = base64.b64encode(s) + return s.translate(bytes.maketrans(std_base64chars, bcrypt_base64)) + + +def hash_password_3(hash_class, password, salt, modulus): + salt = (salt + b"proton")[:16] + salt = bcrypt_b64_encode(salt)[:22] + hashed = bcrypt.hashpw(password, b"$2y$10$" + salt) + return hash_class(hashed + modulus).digest() + + +def hash_password(hash_class, password, salt, modulus, version): + if version == 4 or version == 3: + return hash_password_3(hash_class, password, salt, modulus) + + raise ValueError('Unsupported auth version') + + +def long_length(n): + return (n.bit_length() + 7) // 8 + + +def bytes_to_long(s): + return int.from_bytes(s, 'little') + + +def long_to_bytes(n): + return n.to_bytes(long_length(n), 'little') + + +def get_random(nbytes): + return bytes_to_long(os.urandom(nbytes)) + + +def get_random_of_length(nbytes): + offset = (nbytes * 8) - 1 + return get_random(nbytes) | (1 << offset) + + +def custom_hash(hash_class, *args, **kwargs): + h = hash_class() + for s in args: + if s is not None: + data = long_to_bytes(s) if isinstance(s, int) else s + h.update(data) + + return bytes_to_long(h.digest()) diff --git a/setup.py b/setup.py new file mode 100755 index 0000000..6dfd236 --- /dev/null +++ b/setup.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python + +from setuptools import setup, find_packages + + +long_description = ''' + +This package, originally forked from python-srp module implements a simple +wrapper to the Proton Technologies API, abstracting from the SRP authentication. +''' + +setup(name = 'proton-client', + version = '0.0.9', + description = 'Proton Technologies API wrapper', + author = 'Proton Technologies', + author_email = 'contact@protonmail.com', + url = 'https://github.com/ProtonMail/proton-python-client', + long_description = long_description, + install_requires = ['requests', 'bcrypt', 'python-gnupg', 'pyopenssl'], + packages = find_packages(), + include_package_data = True, + license = "MIT", + platforms = "OS Independent", + classifiers = [ + 'Development Status :: 3 - Alpha', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: MIT License', + 'Operating System :: OS Independent', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python', + 'Topic :: Security', + ],) diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..396878e --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,122 @@ +import unittest + +from testdata import instances +from testserver import TestServer +from proton.srp.util import * +from proton.srp._ctsrp import User as CTUser +from proton.srp._pysrp import User as PYUser + + +class SRPTestCases: + class SRPTestBase(unittest.TestCase): + def test_invalid_version(self): + modulus = bytes.fromhex(instances[0]['Modulus']) + salt = base64.b64decode(instances[0]['Salt']) + + with self.assertRaises(ValueError): + usr = self.user('pass', modulus) + salt, usr.compute_v(salt, 2) + + with self.assertRaises(ValueError): + usr = self.user('pass', modulus) + salt, usr.compute_v(salt, 5) + + def test_compute_v(self): + for instance in instances: + if instance["Exception"] is not None: + with self.assertRaises(instance['Exception']): + usr = self.user(instance["Password"], bytes.fromhex(instance["Modulus"])) + usr.compute_v(base64.b64decode(instance["Salt"]), PM_VERSION) + else: + usr = self.user(instance["Password"], bytes.fromhex(instance["Modulus"])) + salt, v = usr.compute_v(base64.b64decode(instance["Salt"]), PM_VERSION) + + self.assertEqual( + instance["Salt"], + base64.b64encode(salt).decode('utf8'), + "Wrong salt while generating v, instance: " + str(instance)[:30] + "..." + ) + + self.assertEqual( + instance["Verifier"], + base64.b64encode(v).decode('utf8'), + "Wrong verifier while generating v, instance: " + str(instance)[:30] + "..." + ) + + def test_generate_v(self): + for instance in instances: + if instance["Exception"] is not None: + continue + + usr = self.user(instance["Password"], bytes.fromhex(instance["Modulus"])) + generated_salt, generated_v = usr.compute_v() + + computed_salt, computed_v = usr.compute_v(generated_salt) + + self.assertEqual( + generated_salt, + computed_salt, + "Wrong salt while generating v, instance: " + str(instance)[:30] + "..." + ) + + self.assertEqual( + generated_v, + computed_v, + "Wrong verifier while generating v, instance: " + str(instance)[:30] + "..." + ) + + def test_srp(self): + for instance in instances: + if instance["Exception"]: + continue + + server = TestServer() + + server.setup( + instance["Username"], + bytes.fromhex(instance["Modulus"]), + base64.b64decode(instance["Verifier"]) + ) + + server_challenge = server.get_challenge() + usr = self.user(instance["Password"], bytes.fromhex(instance["Modulus"])) + + client_challenge = usr.get_challenge() + client_proof = usr.process_challenge(base64.b64decode(instance["Salt"]), server_challenge, PM_VERSION) + server_proof = server.process_challenge(client_challenge, client_proof) + usr.verify_session(server_proof) + + self.assertIsNotNone( + client_proof, + "SRP exchange failed, client_proof is none for instance: " + str(instance)[:30] + "..." + ) + + self.assertEqual( + server.get_session_key(), + usr.get_session_key(), + "Secrets do not match, instance: " + str(instance)[:30] + "..." + ) + + self.assertTrue( + server.get_authenticated(), + "Server is not correctly authenticated, instance:: " + str(instance)[:30] + "..." + ) + + self.assertTrue( + usr.authenticated(), + "User is not correctly authenticated, instance:: " + str(instance)[:30] + "..." + ) + + +class TestCTSRPClass(SRPTestCases.SRPTestBase): + def setUp(self): + self.user = CTUser + + +class TestPYSRPClass(SRPTestCases.SRPTestBase): + def setUp(self): + self.user = PYUser + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_tlspinning.py b/tests/test_tlspinning.py new file mode 100644 index 0000000..15ac7e3 --- /dev/null +++ b/tests/test_tlspinning.py @@ -0,0 +1,49 @@ +import pytest +import requests +from proton import cert_pinning + +fake_hashes = { + "api.protonvpn.ch": [ + "aIEwk65VSaxv3s1/88vF/rM8PauJoIun3rzVCX5mLS3M=", + "adrtmcR2kFkM8qJClsuWgUzxgBkePfRCkRpqUesyDmeE=", + "aYRGlaY0jyJ4Jw2/4M8FIftwbDIQfh8Sdro96CeEel54=", + "aAfMENBVvOS8MnISprtvyPsjKlPooqh8nMB/pvCrpJpw=" + ], + "protonvpn.com": [ + "a+0dMG0qG2Ga+dNE8uktwMm7dv6RFEXwBoBjQ43GqsQ0=", + "a8joiNBdqaYiQpKskgtkJsqRxF7zN0C0aqfi8DacknnI=", + "aJMI8yrbc6jB1FYGyyWRLFTmDNgIszrNEMGlgy972e7w=", + "aIu44zU84EOCZ9vx/vz67/MRVrxF1IO4i4NIa8ETwiIY=" + ] +} + + +class TestCertificatePinning(): + + s = requests.Session() + + def test_api_url_real_hash(self): + url = 'https://api.protonvpn.ch/tests/ping' + self.s.mount(url, cert_pinning.TLSPinningAdapter()) + r = self.s.get(url) + assert int(r.status_code) == 200 + + def test_non_api_url_real_hash(self): + url = 'https://protonvpn.com' + self.s.mount(url, cert_pinning.TLSPinningAdapter()) + r = self.s.get(url) + assert int(r.status_code) == 200 + + def test_api_url_fake_hash(self): + url = 'https://api.protonvpn.ch/tests/ping' + self.s.mount(url, cert_pinning.TLSPinningAdapter(fake_hashes)) + + with pytest.raises(requests.exceptions.ConnectionError): + self.s.get(url) + + def test_non_api_url_fake_hash(self): + url = 'https://protonvpn.com' + self.s.mount(url, cert_pinning.TLSPinningAdapter(fake_hashes)) + + with pytest.raises(requests.exceptions.ConnectionError): + self.s.get(url) diff --git a/tests/testdata.py b/tests/testdata.py new file mode 100644 index 0000000..4d78b10 --- /dev/null +++ b/tests/testdata.py @@ -0,0 +1,81 @@ +instances = [ + { + 'Username': 'test', + 'Password': 'test', + 'Modulus': '1B64DF29DEDD8656245DB7EEE751442AD9CF1DAFC5A71A94076385C2FBF9FA7AD63E94CB365EC94EBA5BE131CF63D3930CAC4755DE6D0625C24DD9A906551D216601222EBA94FF50C78B8B26DBF27636F4019F1700BA091287462CFFAD4F88B22D66BBF8993090865E46D077ECF1DB78CB2AB0D036AD786B046B5D93BD473C95779914CB93F607FD7EFB9D34161951263CE794BF181FB301EE444D170999EAFF9427CC4151BD91A755F1A184009C1418B16EEC7BFC2D5F88D42B38A4CC176B73EAB132FE37DD7E1162DCA1D13E81A6F10F090DE77EB8CC492CD0B19BB6FC151F5B4AD56B14308D582D86471390C4223400AEE3D5E94C973FB997D59F8A9F309F', + 'Verifier': 'rhmWw1f4YsGq/cVgIePVSaot9Zj2kGNLxIgPytirz9/Nd8X8a28ZvFnWMQD0Jj8IgJPfO4EsI2nU5NuFqIbPI4OKs6s0nWvEHXkdtzxT1n451MGThvZ6o7I+0Ofi1Dh6Rgkv3MxVL/6cNev3EDyhcvh5w9hUIOk5OfRDcFMKv8ht5OYI5a/++m1++x58LQyrrTsRMMIcvDFXsjmMj4Ch3leP02S56cxDl6IQTosU+JcXGHsgNDf90aDnlsDFMGt6As1FSQm8bw0Yat+P02IZrXURQKsMLOKxdwb8xWySVymXAyjcJ7Z9ZabGjCX6Lelo9Wiz0hIEGE7nnOeVwzJKaA==', + 'Salt': 'Jl54BOeNTVl8Ng==', + 'Exception': None + }, + { + 'Username': 'abc123', + 'Password': 'LongerPassword', + 'Modulus': 'FB6443D98DA170445C9795DC351398F1DE1518FB2827F757E8805C5F43DC2927499060A929171245B20FAED4F0EF5611276430A1943F6FD8E7999D8F40407494EE2FD147B31ECC1D59AC7F63E9266CE6EE58FA9B54D3FF3F712F1F210353E7714730A7A787D36D7B7D0940F16A30263EAD448C09BE1EA9F322FE8A844A30C4B900747F30057F33CF850BD717D0AB8008BE6EB333D30F02C1575601F8077307FCA6DA0DBE0156C485E30343A371E9083B58F8F57DB049F46A9ACEE00C1A702E99D04C0777543B3A25B8B33BF35C6E95332E0C907FB357A46A28DB073510DC7903F0E14B5B6DD11945F0D19B7E3939D942E8808D8BFFF2A4AE35E4EECE4AB069BD', + 'Verifier': '9U87bD4BYK3aDRQntSh0pR4AneEQ97Dd3rfysHfShFatlZGjntcJqAheiz6BAJPNnk+ui4Ps842bBZdrXMcfI9kftWra/eByMA1uzJ16rH9UQy2gpUH2hBRqoSMWGduiwnFp168NGAfAX+Zp9ce+N4J4t9E04arJnnfUfjja6iMK9wzzpXZn6XcdpbRJ5EeY8FO8Y6I0Y7TM8sxfeeSyGLhFSZLvLoXFjALi48zXSjNNw7GJBcH+hbejeYFiIf0cSK7sPFn8O5CJFXXCmjO5wx4KdYuTH5y919guttwdS70M67iVlISyyXTHcpxH6967IsbWJms/pNoMrjXT2xauCA==', + 'Salt': 'hyzJpo9GoQaQZg==', + 'Exception': None + }, + { + 'Username': 'VeryLongUsername_123456789', + 'Password': 'VeryLongAndSecurePassword', + 'Modulus': 'C3358515FE673964104A3BCC015F3079F6711ACC6E1C35CA4AED5A85C345B258160B73E32A1DC1F1BD8389FE96B7A6EFD1CA8A6265186FE256BD67DB5507A81CA5BBBA1AFC6E854794343F2DF91182A4FAEC84FA6FDC3028C85DE344EF545D5A5668CFBE00D0A98E7DF9C4E3833BFE49E5E938F4658D23EB791757852A520C7908471C249324FD87B382A17973CBB962E20FCD598051CC43E792013412603294ACAFE51185E6D57AFBAC13F83E8B1ACCC296094B2B6D76B8DF9210FF00FBAFEB9ED333C123AF5E0E8607E1EE01AC80A7FBEC194952050C100D1CF0C58740509DC82A0EE6F82536F8AD0E651284E8277407BB625DDA23A5CD906B00B76D21DADD', + 'Verifier': 'AuYwtTLuT5coocpDHcZCQMqMQbFevrcxnqoijrOveXe7+h4XqdzpakUnKxJ8n+IrHt5wo2Nhsgdipw7woeb1lskt3WBdJ6KCTHCwjOcBUlHpMnJbea7ere7qCl+ar7LQOE/hh1xu2uotSBFMCDrPxoVLX00/jwtfFnr3e9he0tZ2+lZgUgPqKWo3OT4SthM9N9ILMQ4GSW5s5HgPw1QVhTXXQ6Nr6fwW5ID+ViyPS2n66WdCV2JcVwTza90/pYT3ZEvw/DOzD91v09lIdarp3a4c2/lbLORbmUiip0wHu4gLa5jv/J73xZEOGg54REwHrlx6eOgl31TdFjexC1n/EA==', + 'Salt': 'cujclj9P6zluIQ==', + 'Exception': None + }, + { + 'Username': 'TestUsername', + 'Password': 'Test Password With Spaces', + 'Modulus': '530B86096FE2BA84CBD4518EE26386BFF353127BC9545537B55148374619392CB90C05EA35F0032583D9928EB6309642236DF0D2EB00736BD8ACC349AE8094C97F330AEE0F493E76D1C092C98C61B880C942C54D9BA0AE2483AAD6A125E601685ECDD8BD984C4F15EDDEC8D21F873C5CE2F47B16069D95023F6CAF2E4C25703DCF2E09C66E3927E19D4C6377D5C31F9EB2D881E820BCB97747274C3F465DF3C17BFAC9C37DF6B255209697755AC068517A191364BF7D7A3A3B4321606A540590D70A1A0AEEA580B7731B89D2D2FD4EC5D0AB65ED91757041C1E088ABC8C4BAC2B9B586DB036F035F04BC9ED6ADE0713CFCB85BAE556C4D3FA2A00B9CE55D04C4', + 'Verifier': '9TcpTx2TZ2oyPXMtQ+jn+rTIGxl6MpOMnazV5tGMlcNg00VujDzqsEi3O0pwB2r1x0T9H4UMi7/l+1qy0HlsdG53OTO4ij+z+gRuA5xQAb/521g34q+/zJobBYuHISfV+LaHRgn1Z9VMYURkTNF66WyHMn1nd7swzGE+zWxBa0gN1d4HFW1h9VjLUOeOGGp4UwWIR2+pSO50hJO6yGC8pQgEZhmyM0VfliWWzKsLMxRdfS4eHArvKsVOLN/IChSRKjeiDSSkOVGQZyNUoZDpg4AsGBLeXJovlbdjQoM/8pByJJpX71Ze6WZIltTRyjTYirYORDrLNCQr8DHpE6hLKA==', + 'Salt': 'vSds4CNJcRFDog==', + 'Exception': None + }, + { + 'Username': 'Test1', + 'Password': '', + 'Salt': 'vSds4CNJcRFDog==', + 'Modulus': '6B4DDFC843BE2777F709F7E3379E308581D3976CF4A85F37C44302816BAE75F0E99C038A763E864A2367AFDA5C06875DF0990D1121196809B0D8D44423DBF43BC66164341E0CCC3A09637D04DE96146492935827372197B3B470CBFCFDEF5B9245DF5761D0E9267DF9293E32D9F5A503F827AE90E39414C90F19A6D4CCDE664227268D6C8164E9C570A0E79968CDF5597260502FAC9FDB9C585F2BD37597FD7149AE066EE0ED1B3958958DCB697DCD878E097FB8543AD0CAA99407A6F991DC70F137DABE97E8D07CE3D58B922F8BEF3C862C8CE224501E953B17E94B8F79EDD1B8E287087B172EC217FA5183CD1D4C3C66B950A06BAD64DA1C0DDCAF91BB5FBF', + 'Exception': ValueError + }, + { + 'Username': 'Test2', + 'Password': 'PasswordThatIsLongerThan72CharsAndContainsSomeRandomStuffThatNoOneCaresAbout', + 'Modulus': 'C376377AC6C62697E211C0BE80DA3C5F7B7381AB92D94536B406594AD0C1BFE90A424E36085CD2F553F370ED863E72597210DD94A19B0DA4257CD18EEDAF33A71E5BD7657DB25BE602F0430FE4762D9F6100DD319D7E5870DC0583D0782832E68E8A12C1AA0B8018FB71D5F3C9AEB80208DE62CAB066FCC80E274F32199AA2193882E256A86E2B8993C7278CE470BA4A9B315AF33C029967EB96C470987F440F7CD4688072B98DC0B57686B580DE76BF10F3D277D24CAD012A83A98A834F90A22A29D113F272A38E750DF3188ACFCADC8642B7F9847CE4F721EBF1D9105BE33CDC19A01080A9427F4F76B27D3FCF8926A5C4884C42D1B6D052C2F744452283DB', + 'Verifier': 'Ft8p5GMgowgwSRb2jJ7uoTkvptOQmNqH+Aov/Lawsc5vbaBIIoIwnF03jiGVEEO9kVVTZG/+5zQorvY5voZJrgJev+I/LR0WbzIY+Tqm7BbVRSARc7jkTFHXYFFiasuOR5myTDPyctxyfTQzAwGX7MccC82nWUuPn/yWsAgqppvVatC+QUilEKALvHqjnupJoLay0ZfmLOkV8eeXUQFkOcRzGFYtkCSD5aCQYBMZYkuLm/4rUEQsYQ9GyluGDNfYm0kDUy9oq0ujJdDciSvFAw3arIANsuEDmGg/eo0/1iZLywcZPzS20Cu07KIgQ+Ct2HemCmgFDJKQ2CBnW+HvFQ==', + 'Salt': 'lSIG/btGTkKS4Q==', + 'Exception': None + }, + { + 'Username': 'Test3', + 'Password': 'Test3', + 'Modulus': '6BF8F261CD7B1C9125CCB16F6FDCB59E1E7835E2E1C83D43B51DD319CCB6EFDC56F775448538F40B2259DCF464829E9E2E8CFE9E8A5658A5429BBCF65928EBF91276896148B920E1A3719BFC07ABDE69265B9296079E539F3B20B4FD88457DFD7776F300F79A6B01F5438F80C05A2E27D1903C2AED087C8D1566919FDA443D61E61BF5095CCD5F59E9B7C12E6210138A2B48EC39311A12442E298BA94D994E0038725706084EF993F5D10884E9ED1235A2C72E5E8A26F5FF0BF77B1F98D84F93CB9A3598DE647CF2ACD85E91541593834E7253DC417262488E02D4BA53873B1F7FBA17AE90A189AB7562E74691F396CD8C99311C9B6D4810528FC3698895BEE6', + 'Verifier': 'drKnuCh+aADaBMQdO+CM3USFXb5s4qAOgKyk2vxGLi6Gu8Z+SLskGYwx25djGgLqDlo6OncjlS1KPkc+euklc9CoPKDn0ZI3qGKSE4JK7LytiqC2IbpNa3j6jFQu020suyLYtAPJ+9IW6mvgiRi1dKuBlEArSAHz4aAO/ThKxjSArombm8F+tIQG4dznQWe2l7XzAmB9sFsYplDnnAtcydWtBiM1lnYqPJ3APWB/J1+r7YUNw/GPt8PuCz3tVDxyUJoe061iLQmPBsKfNpuSKBgwmMidjwN6UBbuLRhOyYhNO+sk6ER479NuYg6O7lvbnRdS4ELJs84Q2LmKLa3wGg==', + 'Salt': 'rPelup77AgUCQg==', + 'Exception': None + }, + { + 'Username': 'Test4', + 'Password': 'Test4', + 'Modulus': 'F3FC06AB4FCED0A9E73D85826EA1C21AEEED978C3C938524265E32DBFA0287979278ED80990E854421BF186116E538F8302E749A683B9272795F70352CD98554E6FBA7714BA0AB5C02A1CC641BB931F50E0A8F8FB31446C21950A3620E514F1ABEEE102BB8B225DE5EF34B0064010EE70CF7371D2D0FC154586E42F99701BEAAE1EBFC041A7975A782A3455AA4EDEA9ED0B126E4DEC746E5CCC696B9511E61DC0C26A7C39438F99C71CAA47F1BEF3FF25FB8C61D20A9D191E5B56273FA90C0D310A0296E2B86156EEEC27536A3252AE4ED3AEF6708E6C51D464E0EE15EC4B50FAA22095790D4FE93A6BDCF572DA36850015B3DE882407DCFB37EA45176E230E0', + 'Verifier': 'ED24sqbunfInO0jSyO+x2nbva3Uc4jetsw0oGZhYwo31azz8vU6lI1t6B8+a0fNR9yUxPU/Fr03wx8MH0JkIWulqDCYkU4THrnL6Uu6xOJEX4RRReT/l5SJA4zHbT0Zk7XFBvwUXtRa3mJRedJkR/bqpp7QUOobdKQKdm1n2l+ktgAq4hHuxLS1BUSvMVJh2B3bFc4BLgTAj9EQA8VEZVetSniMejNBdcEPFDnEXgFNddVPoWuuTpd0jWikeFXSyVT7q5D+Jg1UvO3wXWarYbGnn2lYubv9WY6spgLwbJv796YuFcso9/7dtPRpT/TJDqmHTYsoPbNYl017uODCjSw==', + 'Salt': 'dRuDue2lP/mG2w==', + 'Exception': None + }, + { + 'Username': 'Test5', + 'Password': 'Test5', + 'Modulus': '434EAA334EFC1225D376B2D38DC6FDF4E1E1182AB4B47803AE9DAC05865D1A91D95E7F3018B9893EBCFDED5EA6CAD29F953175EE25ED35AE4D9B37999360503D9D3EB8B31A2A64139A9928C7CD4600C433012CE52D105AB4718EC2F525FA4F2F4E5B4376BD3A8698E3C6CE60E98646E71EB26B18565EB90C3367FFB9CD4FCF8FDF75F6E9DE30D252DAED835BEF13D21EDFFB8163E56EBCFB2D884AE2C7778E28279F61252F5E24C79103B16078A980E8F6ED9A62A51187B7166A7AE335BE785A4DE8C5E5BE4EF1B0B9DBF8C3C42C445BD236B279DB53F28F7685ABC440A09B8CCC4D6B1723866ABBDEE916CC5F68EEF5F902A2E7A15805788DBE74FA1F20AEEC', + 'Verifier': 'F1MlhQI02Ek1HSGe1ow3wYbzjUJicjbMePOqBZynRpLDd3cD6meWe8coXKS9z63jOPORh9c9hSi/lgwDS+lhYPD50/xTRrKoq4rMdo0gHOazBCRbjTddN8NW3jPYIPpdNSuxr4ReJCc9rNizvjIpqNYb0q02KD3CewEtvZDhmeIl0Y/b82yahlB5Dkx0sEJ4KPCqPYru8BhnOl1/CTxz/CCIyQhHNt700vFGRLoewxCKhIteb+sGHFRerC35OOJIsuJc3Snf1o800YQATpxVxnqEzrYUs2ofXIRNGOFX8WU9PP+zFx7mAbp6KvSfoWVQB+a263p9D/3TzAN0kflm0A==', + 'Salt': 'JNmIaPOOZEzaWg==', + 'Exception': None + }, + { + 'Username': 'Test6', + 'Password': 'Test6', + 'Modulus': '5BA65673D3C677FF42CD223C6130CE991DB345B2647DD4ED19881FEB6D695AC5FB33B402E813CF6B829372E13C611740A37B231F3034F1142950F8B48A55250FAAAC29FBE12B18BE3852258FC8DE37F53238EE68DF8BAF41B4BF4ECE551D7DE91B428A2EB78DDB86C8F154B2593D18EEE3331441D98F86AFCCCAF51424E8E20ACB86EC478FA7F596805DE700532070EB8F90E66FB050385ACFE7ED45F044144962A97327237EB77F2A9A810D18E9FB366070E88315852F597D991B291484F17E2BC011528A0E9F7551667C6740A41397E229EB24C9735F814546AFA17E5A999B4CFE65A08EBFF5EB58F90BF2BDACD04BD00967AF01CCA06608B15716BE0F05AA', + 'Verifier': 'H9/tTIo5U2GClndXSJW7JHQdw3XmiZfPztBJvM7UNbrL851hJdXCpypXRvS3NCdt5AIr3ic48ll3NKFDmA6ORdPsE9vII0IdOWMxWIU0enpPi2cb8AbvVXS8RF6+LPOwOF8wDfro+/gDoj7ofk2hnbUdnbf9bjKxVau+V+NQJPL7m1P8XTxAhP3IYf3flfWjhmNisasPIXkBxwMM/rVg/9Wqkzrzpoo2eaaMolQirmBZe8gdJlsURSR1v7PPaCYP8rRY7RZMV2RWUm/W8o56n2iKm2F9ldRFveMnI9W+aUrnI7lAT/H7PvP1hPDXPwVnhhPlYHk8ovj9q1KGl12MBA==', + 'Salt': 'ZLpMUnhDn1QjkQ==', + 'Exception': None + } +] diff --git a/tests/testserver.py b/tests/testserver.py new file mode 100644 index 0000000..e01be6f --- /dev/null +++ b/tests/testserver.py @@ -0,0 +1,62 @@ +from proton.srp.util import * +from proton.srp.pmhash import pmhash + + +class TestServer: + def setup(self, username, modulus, verifier): + self.hash_class = pmhash + self.generator = 2 + self._authenticated = False + + self.user = username.encode() + self.modulus = bytes_to_long(modulus) + self.verifier = bytes_to_long(verifier) + + self.b = get_random_of_length(32) + self.B = (self.calculate_k() * self.verifier + pow(self.generator, self.b, self.modulus)) + + self.secret = None + self.A = None + self.u = None + self.key = None + + def calculate_server_proof(self, client_proof): + h = self.hash_class() + h.update(long_to_bytes(self.A)) + h.update(client_proof) + h.update(long_to_bytes(self.secret)) + return h.digest() + + def calculate_client_proof(self): + h = self.hash_class() + h.update(long_to_bytes(self.A)) + h.update(long_to_bytes(self.B)) + h.update(long_to_bytes(self.secret)) + return h.digest() + + def calculate_k(self): + width = long_length(self.modulus) + h = self.hash_class() + h.update(self.generator.to_bytes(width, 'little')) + h.update(long_to_bytes(self.modulus)) + return bytes_to_long(h.digest()) + + def get_challenge(self): + return long_to_bytes(self.B) + + def get_session_key(self): + return long_to_bytes(self.secret) #if self._authenticated else None + + def get_authenticated(self): + return self._authenticated + + def process_challenge(self, client_challenge, client_proof): + self.A = bytes_to_long(client_challenge) + self.u = custom_hash(self.hash_class, self.A, self.B) + self.secret = pow((self.A * pow(self.verifier, self.u, self.modulus)), self.b, self.modulus) + + if client_proof != self.calculate_client_proof(): + return False + + self._authenticated = True + return self.calculate_server_proof(client_proof)