Implement human verification

This commit is contained in:
Alexandru Cheltuitor 2021-09-24 11:17:21 +01:00
parent 782c0e4d6c
commit 83d6023da7
No known key found for this signature in database
GPG key ID: 66B58C938EBBFA07

View file

@ -262,21 +262,24 @@ class Session:
response = self.__try_with_alt_routing(fct, **request_params)
try:
json_response = response.json()
response = response.json()
except json.decoder.JSONDecodeError as e:
self._logger.exception(e)
raise ProtonAPIError(
{
"Code": json_response.status_code,
"Error": json_response.reason,
"Headers": json_response.headers
"Code": response.status_code,
"Error": response.reason,
"Headers": response.headers
}
)
if json_response['Code'] not in [1000, 1001]:
raise ProtonAPIError(json_response)
if response['Code'] not in [1000, 1001]:
if response['Code'] == 9001:
self.__captcha_token = response["Details"]["HumanVerificationToken"]
return json_response
raise ProtonAPIError(response)
return response
def __try_with_alt_routing(self, fct, **request_params):
alternative_routes = self.get_alternative_routes_from_dns()
@ -345,7 +348,7 @@ class Session:
return base64.b64decode(verified.data.strip())
def authenticate(self, username, password):
def authenticate(self, username, password, human_verification=None):
"""Authenticate user against API.
Args:
@ -363,7 +366,16 @@ class Session:
payload = {"Username": username}
if self.__clientsecret:
payload['ClientSecret'] = self.__clientsecret
info_response = self.api_request("/auth/info", payload)
if human_verification:
human_verification_header = {
"X-PM-Human-Verification-Token-Type": human_verification[0],
"X-PM-Human-Verification-Token": human_verification[1]
}
info_response = self.api_request(
"/auth/info", payload,
additional_headers=human_verification_header
)
modulus = self.verify_modulus(info_response['Modulus'])
server_challenge = base64.b64decode(info_response["ServerEphemeral"])
@ -422,9 +434,7 @@ class Session:
The returning dict contains the Scope of the account. This allows
to identify if the account is locked, has unpaid invoices, etc.
"""
ret = self.api_request('/auth/2fa', {
"TwoFactorCode": code
})
ret = self.api_request('/auth/2fa', {"TwoFactorCode": code})
self._session_data['Scope'] = ret['Scope']
return self.Scope
@ -444,12 +454,15 @@ class Session:
If the RefreshToken is invalid then the user will have to
re-authenticate.
"""
refresh_response = self.api_request('/auth/refresh', {
"ResponseType": "token",
"GrantType": "refresh_token",
"RefreshToken": self.RefreshToken,
"RedirectURI": "http://protonmail.ch"
})
refresh_response = self.api_request(
'/auth/refresh',
{
"ResponseType": "token",
"GrantType": "refresh_token",
"RefreshToken": self.RefreshToken,
"RedirectURI": "http://protonmail.ch"
}
)
self._session_data['AccessToken'] = refresh_response["AccessToken"]
self._session_data['RefreshToken'] = refresh_response["RefreshToken"]
self.s.headers['Authorization'] = 'Bearer ' + self.AccessToken
@ -578,6 +591,12 @@ class Session:
return routes
@property
def captcha_url(self):
return "{}/core/v4/captcha?Token={}".format(
self.__api_url, self.__captcha_token
)
@property
def enable_alternative_routing(self):
"""Alternative routing getter."""