chore: apply black formatting

Signed-off-by: Adrien Barreau <adrien.barreau@ovhcloud.com>
This commit is contained in:
Adrien Barreau 2023-03-06 14:49:45 +00:00
parent c5bd0bb62e
commit b24d52e619
13 changed files with 576 additions and 477 deletions

View file

@ -18,225 +18,219 @@ import os
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#sys.path.insert(0, os.path.abspath('.'))
# sys.path.insert(0, os.path.abspath('.'))
# -- General configuration ------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#needs_sphinx = '1.0'
# needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.doctest',
'sphinx.ext.coverage',
'sphinx.ext.viewcode',
"sphinx.ext.autodoc",
"sphinx.ext.doctest",
"sphinx.ext.coverage",
"sphinx.ext.viewcode",
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
templates_path = ["_templates"]
# The suffix of source filenames.
source_suffix = '.rst'
source_suffix = ".rst"
# The encoding of source files.
#source_encoding = 'utf-8-sig'
# source_encoding = 'utf-8-sig'
# The master toctree document.
master_doc = 'index'
master_doc = "index"
# General information about the project.
project = u'Python-OVH'
copyright = u'2013-2014, OVH SAS'
project = "Python-OVH"
copyright = "2013-2014, OVH SAS"
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = '0.3'
version = "0.3"
# The full version, including alpha/beta/rc tags.
release = '0.5.0'
release = "0.5.0"
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#language = None
# language = None
# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
#today = ''
# today = ''
# Else, today_fmt is used as the format for a strftime call.
#today_fmt = '%B %d, %Y'
# today_fmt = '%B %d, %Y'
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
exclude_patterns = ['_build']
exclude_patterns = ["_build"]
# The reST default role (used for this markup: `text`) to use for all
# documents.
#default_role = None
# default_role = None
# If true, '()' will be appended to :func: etc. cross-reference text.
#add_function_parentheses = True
# add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
#add_module_names = True
# add_module_names = True
# If true, sectionauthor and moduleauthor directives will be shown in the
# output. They are ignored by default.
#show_authors = False
# show_authors = False
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
pygments_style = "sphinx"
# A list of ignored prefixes for module index sorting.
#modindex_common_prefix = []
# modindex_common_prefix = []
# If true, keep warnings as "system message" paragraphs in the built documents.
#keep_warnings = False
# keep_warnings = False
# -- Options for HTML output ----------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
html_theme = 'default'
html_theme = "default"
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#html_theme_options = {}
# html_theme_options = {}
# Add any paths that contain custom themes here, relative to this directory.
#html_theme_path = []
# html_theme_path = []
# The name for this set of Sphinx documents. If None, it defaults to
# "<project> v<release> documentation".
#html_title = None
# html_title = None
# A shorter title for the navigation bar. Default is the same as html_title.
#html_short_title = None
# html_short_title = None
# The name of an image file (relative to this directory) to place at the top
# of the sidebar.
#html_logo = None
# html_logo = None
# The name of an image file (within the static path) to use as favicon of the
# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
# pixels large.
#html_favicon = None
# html_favicon = None
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
html_static_path = ["_static"]
# Add any extra paths that contain custom files (such as robots.txt or
# .htaccess) here, relative to this directory. These files are copied
# directly to the root of the documentation.
#html_extra_path = []
# html_extra_path = []
# If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
# using the given strftime format.
#html_last_updated_fmt = '%b %d, %Y'
# html_last_updated_fmt = '%b %d, %Y'
# If true, SmartyPants will be used to convert quotes and dashes to
# typographically correct entities.
#html_use_smartypants = True
# html_use_smartypants = True
# Custom sidebar templates, maps document names to template names.
#html_sidebars = {}
# html_sidebars = {}
# Additional templates that should be rendered to pages, maps page names to
# template names.
#html_additional_pages = {}
# html_additional_pages = {}
# If false, no module index is generated.
#html_domain_indices = True
# html_domain_indices = True
# If false, no index is generated.
#html_use_index = True
# html_use_index = True
# If true, the index is split into individual pages for each letter.
#html_split_index = False
# html_split_index = False
# If true, links to the reST sources are added to the pages.
#html_show_sourcelink = True
# html_show_sourcelink = True
# If true, "Created using Sphinx" is shown in the HTML footer. Default is True.
#html_show_sphinx = True
# html_show_sphinx = True
# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True.
#html_show_copyright = True
# html_show_copyright = True
# If true, an OpenSearch description file will be output, and all pages will
# contain a <link> tag referring to it. The value of this option must be the
# base URL from which the finished HTML is served.
#html_use_opensearch = ''
# html_use_opensearch = ''
# This is the file name suffix for HTML files (e.g. ".xhtml").
#html_file_suffix = None
# html_file_suffix = None
# Output file base name for HTML help builder.
htmlhelp_basename = 'python-ovh-doc'
htmlhelp_basename = "python-ovh-doc"
# -- Options for LaTeX output ---------------------------------------------
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#'preamble': '',
# The paper size ('letterpaper' or 'a4paper').
#'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#'preamble': '',
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
('index', 'Python-OVH.tex', u'Python-OVH Documentation',
u'Jean-Tiare Le Bigot', 'manual'),
("index", "Python-OVH.tex", "Python-OVH Documentation", "Jean-Tiare Le Bigot", "manual"),
]
# The name of an image file (relative to this directory) to place at the top of
# the title page.
#latex_logo = None
# latex_logo = None
# For "manual" documents, if this is true, then toplevel headings are parts,
# not chapters.
#latex_use_parts = False
# latex_use_parts = False
# If true, show page references after internal links.
#latex_show_pagerefs = False
# latex_show_pagerefs = False
# If true, show URL addresses after external links.
#latex_show_urls = False
# latex_show_urls = False
# Documents to append as an appendix to all manuals.
#latex_appendices = []
# latex_appendices = []
# If false, no module index is generated.
#latex_domain_indices = True
# latex_domain_indices = True
# -- Options for manual page output ---------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
('index', 'python-ovh', u'Python-OVH Documentation',
[u'Jean-Tiare Le Bigot'], 1)
]
man_pages = [("index", "python-ovh", "Python-OVH Documentation", ["Jean-Tiare Le Bigot"], 1)]
# If true, show URL addresses after external links.
#man_show_urls = False
# man_show_urls = False
# -- Options for Texinfo output -------------------------------------------
@ -245,19 +239,25 @@ man_pages = [
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
('index', 'Python-OVH', u'Python-OVH Documentation',
u'Jean-Tiare Le Bigot', 'Python-OVH', 'OVH Rest API wrapper.',
'API'),
(
"index",
"Python-OVH",
"Python-OVH Documentation",
"Jean-Tiare Le Bigot",
"Python-OVH",
"OVH Rest API wrapper.",
"API",
),
]
# Documents to append as an appendix to all manuals.
#texinfo_appendices = []
# texinfo_appendices = []
# If false, no module index is generated.
#texinfo_domain_indices = True
# texinfo_domain_indices = True
# How to display URL addresses: 'footnote', 'no', or 'inline'.
#texinfo_show_urls = 'footnote'
# texinfo_show_urls = 'footnote'
# If true, do not generate a @detailmenu in the "Top" node's menu.
#texinfo_no_detailmenu = False
# texinfo_no_detailmenu = False

View file

@ -5,46 +5,46 @@ from tabulate import tabulate
import ovh
# Services type desired to mine. To speed up the script, delete service type you don't use!
service_types = [
'allDom',
'cdn/dedicated',
'cdn/website',
'cdn/webstorage',
'cloud/project',
'cluster/hadoop',
'dedicated/housing',
'dedicated/nas',
'dedicated/nasha',
'dedicated/server',
'dedicatedCloud',
'domain/zone',
'email/domain',
'email/exchange',
'freefax',
'hosting/privateDatabase',
'hosting/web',
'hosting/windows',
'hpcspot',
'license/cloudLinux',
'license/cpanel',
'license/directadmin',
'license/office',
'license/plesk',
'license/sqlserver',
'license/virtuozzo',
'license/windows',
'license/worklight',
'overTheBox',
'pack/xdsl',
'partner',
'router',
'sms',
'telephony',
'telephony/spare',
'veeamCloudConnect',
'vps',
'xdsl',
'xdsl/spare',
service_types = [
"allDom",
"cdn/dedicated",
"cdn/website",
"cdn/webstorage",
"cloud/project",
"cluster/hadoop",
"dedicated/housing",
"dedicated/nas",
"dedicated/nasha",
"dedicated/server",
"dedicatedCloud",
"domain/zone",
"email/domain",
"email/exchange",
"freefax",
"hosting/privateDatabase",
"hosting/web",
"hosting/windows",
"hpcspot",
"license/cloudLinux",
"license/cpanel",
"license/directadmin",
"license/office",
"license/plesk",
"license/sqlserver",
"license/virtuozzo",
"license/windows",
"license/worklight",
"overTheBox",
"pack/xdsl",
"partner",
"router",
"sms",
"telephony",
"telephony/spare",
"veeamCloudConnect",
"vps",
"xdsl",
"xdsl/spare",
]
# Delay before expiration in days
delay = 60
@ -59,16 +59,16 @@ services_will_expired = []
# Check all OVH product (service type)
for service_type in service_types:
service_list = client.get('/%s' % service_type )
service_list = client.get("/%s" % service_type)
# If we found you have this one or more of this product, we get these information
for service in service_list:
service_infos = client.get('/%s/%s/serviceInfos' % (service_type, service) )
service_expiration_date = datetime.datetime.strptime(service_infos['expiration'], '%Y-%m-%d')
# If we found you have this one or more of this product, we get these information
for service in service_list:
service_infos = client.get("/%s/%s/serviceInfos" % (service_type, service))
service_expiration_date = datetime.datetime.strptime(service_infos["expiration"], "%Y-%m-%d")
# If the expiration date is before (now + delay) date, we add it into our listing
if service_expiration_date < delay_date:
services_will_expired.append([service_type, service, service_infos["status"], service_infos["expiration"]])
# If the expiration date is before (now + delay) date, we add it into our listing
if service_expiration_date < delay_date:
services_will_expired.append( [ service_type, service, service_infos['status'], service_infos['expiration'] ] )
# At the end, we show service expired or that will expire (in a table with tabulate)
print(tabulate(services_will_expired, headers=['Type', 'ID', 'status', 'expiration date']))
print(tabulate(services_will_expired, headers=["Type", "ID", "status", "expiration date"]))

View file

@ -5,46 +5,46 @@ from tabulate import tabulate
import ovh
# Services type desired to mine. To speed up the script, delete service type you don't use!
service_types = [
'allDom',
'cdn/dedicated',
'cdn/website',
'cdn/webstorage',
'cloud/project',
'cluster/hadoop',
'dedicated/housing',
'dedicated/nas',
'dedicated/nasha',
'dedicated/server',
'dedicatedCloud',
'domain/zone',
'email/domain',
'email/exchange',
'freefax',
'hosting/privateDatabase',
'hosting/web',
'hosting/windows',
'hpcspot',
'license/cloudLinux',
'license/cpanel',
'license/directadmin',
'license/office',
'license/plesk',
'license/sqlserver',
'license/virtuozzo',
'license/windows',
'license/worklight',
'overTheBox',
'pack/xdsl',
'partner',
'router',
'sms',
'telephony',
'telephony/spare',
'veeamCloudConnect',
'vps',
'xdsl',
'xdsl/spare',
service_types = [
"allDom",
"cdn/dedicated",
"cdn/website",
"cdn/webstorage",
"cloud/project",
"cluster/hadoop",
"dedicated/housing",
"dedicated/nas",
"dedicated/nasha",
"dedicated/server",
"dedicatedCloud",
"domain/zone",
"email/domain",
"email/exchange",
"freefax",
"hosting/privateDatabase",
"hosting/web",
"hosting/windows",
"hpcspot",
"license/cloudLinux",
"license/cpanel",
"license/directadmin",
"license/office",
"license/plesk",
"license/sqlserver",
"license/virtuozzo",
"license/windows",
"license/worklight",
"overTheBox",
"pack/xdsl",
"partner",
"router",
"sms",
"telephony",
"telephony/spare",
"veeamCloudConnect",
"vps",
"xdsl",
"xdsl/spare",
]
# Create a client using ovh.conf
@ -54,13 +54,13 @@ services_will_expired = []
# Check all OVH product (service type)
for service_type in service_types:
service_list = client.get('/%s' % service_type )
service_list = client.get("/%s" % service_type)
# If we found you have this one or more of this product, we get these information
for service in service_list:
service_infos = client.get("/%s/%s/serviceInfos" % (service_type, service))
service_expiration_date = datetime.datetime.strptime(service_infos["expiration"], "%Y-%m-%d")
services_will_expired.append([service_type, service, service_infos["status"], service_infos["expiration"]])
# If we found you have this one or more of this product, we get these information
for service in service_list:
service_infos = client.get('/%s/%s/serviceInfos' % (service_type, service) )
service_expiration_date = datetime.datetime.strptime(service_infos['expiration'], '%Y-%m-%d')
services_will_expired.append( [ service_type, service, service_infos['status'], service_infos['expiration'] ] )
# At the end, we show service expired or that will expire (in a table with tabulate)
print(tabulate(services_will_expired, headers=['Type', 'ID', 'status', 'expiration date']))
print(tabulate(services_will_expired, headers=["Type", "ID", "status", "expiration date"]))

View file

@ -28,9 +28,20 @@
from .client import Client
from .exceptions import (
APIError, NetworkError, InvalidResponse, InvalidRegion, ReadOnlyError,
ResourceNotFoundError, BadParametersError, ResourceConflictError, HTTPError,
InvalidKey, InvalidCredential, NotGrantedCall, NotCredential, Forbidden,
APIError,
NetworkError,
InvalidResponse,
InvalidRegion,
ReadOnlyError,
ResourceNotFoundError,
BadParametersError,
ResourceConflictError,
HTTPError,
InvalidKey,
InvalidCredential,
NotGrantedCall,
NotCredential,
Forbidden,
)
from .consumer_key import (
ConsumerKeyRequest,

View file

@ -42,7 +42,7 @@ import json
try:
from urllib import urlencode
except ImportError: # pragma: no cover
except ImportError: # pragma: no cover
# Python 3
from urllib.parse import urlencode
@ -53,21 +53,31 @@ from requests.exceptions import RequestException
from .config import config
from .consumer_key import ConsumerKeyRequest
from .exceptions import (
APIError, NetworkError, InvalidResponse, InvalidRegion, InvalidKey,
ResourceNotFoundError, BadParametersError, ResourceConflictError, HTTPError,
NotGrantedCall, NotCredential, Forbidden, InvalidCredential,
APIError,
NetworkError,
InvalidResponse,
InvalidRegion,
InvalidKey,
ResourceNotFoundError,
BadParametersError,
ResourceConflictError,
HTTPError,
NotGrantedCall,
NotCredential,
Forbidden,
InvalidCredential,
ResourceExpiredError,
)
#: Mapping between OVH API region names and corresponding endpoints
ENDPOINTS = {
'ovh-eu': 'https://eu.api.ovh.com/1.0',
'ovh-us': 'https://api.us.ovhcloud.com/1.0',
'ovh-ca': 'https://ca.api.ovh.com/1.0',
'kimsufi-eu': 'https://eu.api.kimsufi.com/1.0',
'kimsufi-ca': 'https://ca.api.kimsufi.com/1.0',
'soyoustart-eu': 'https://eu.api.soyoustart.com/1.0',
'soyoustart-ca': 'https://ca.api.soyoustart.com/1.0',
"ovh-eu": "https://eu.api.ovh.com/1.0",
"ovh-us": "https://api.us.ovhcloud.com/1.0",
"ovh-ca": "https://ca.api.ovh.com/1.0",
"kimsufi-eu": "https://eu.api.kimsufi.com/1.0",
"kimsufi-ca": "https://ca.api.kimsufi.com/1.0",
"soyoustart-eu": "https://eu.api.soyoustart.com/1.0",
"soyoustart-ca": "https://ca.api.soyoustart.com/1.0",
}
#: Default timeout for each request. 180 seconds connect, 180 seconds read.
@ -106,9 +116,15 @@ class Client(object):
"""
def __init__(self, endpoint=None, application_key=None,
application_secret=None, consumer_key=None, timeout=TIMEOUT,
config_file=None):
def __init__(
self,
endpoint=None,
application_key=None,
application_secret=None,
consumer_key=None,
timeout=TIMEOUT,
config_file=None,
):
"""
Creates a new Client. No credential check is done at this point.
@ -144,25 +160,24 @@ class Client(object):
# load endpoint
if endpoint is None:
endpoint = config.get('default', 'endpoint')
endpoint = config.get("default", "endpoint")
try:
self._endpoint = ENDPOINTS[endpoint]
except KeyError:
raise InvalidRegion("Unknown endpoint %s. Valid endpoints: %s",
endpoint, ENDPOINTS.keys())
raise InvalidRegion("Unknown endpoint %s. Valid endpoints: %s", endpoint, ENDPOINTS.keys())
# load keys
if application_key is None:
application_key = config.get(endpoint, 'application_key')
application_key = config.get(endpoint, "application_key")
self._application_key = application_key
if application_secret is None:
application_secret = config.get(endpoint, 'application_secret')
application_secret = config.get(endpoint, "application_secret")
self._application_secret = application_secret
if consumer_key is None:
consumer_key = config.get(endpoint, 'consumer_key')
consumer_key = config.get(endpoint, "consumer_key")
self._consumer_key = consumer_key
# lazy load time delta
@ -193,7 +208,7 @@ class Client(object):
:rtype: int
"""
if self._time_delta is None:
server_time = self.get('/auth/time', _need_auth=False)
server_time = self.get("/auth/time", _need_auth=False)
self._time_delta = server_time - int(time.time())
return self._time_delta
@ -275,9 +290,8 @@ class Client(object):
:returns: dict with ``consumerKey`` and ``validationUrl`` keys
:rtype: dict
"""
res = self.post('/auth/credential', _need_auth=False,
accessRules=access_rules, redirection=redirect_url)
self._consumer_key = res['consumerKey']
res = self.post("/auth/credential", _need_auth=False, accessRules=access_rules, redirection=redirect_url)
self._consumer_key = res["consumerKey"]
return res
## API shortcuts
@ -294,7 +308,7 @@ class Client(object):
arguments = {}
for k, v in kwargs.items():
if k[0] == '_' and k[1:] in keyword.kwlist:
if k[0] == "_" and k[1:] in keyword.kwlist:
k = k[1:]
arguments[k] = v
@ -335,12 +349,12 @@ class Client(object):
kwargs = self._canonicalize_kwargs(kwargs)
query_string = self._prepare_query_string(kwargs)
if query_string != "":
if '?' in _target:
_target = '%s&%s' % (_target, query_string)
if "?" in _target:
_target = "%s&%s" % (_target, query_string)
else:
_target = '%s?%s' % (_target, query_string)
_target = "%s?%s" % (_target, query_string)
return self.call('GET', _target, None, _need_auth)
return self.call("GET", _target, None, _need_auth)
def put(self, _target, _need_auth=True, **kwargs):
"""
@ -357,7 +371,7 @@ class Client(object):
kwargs = self._canonicalize_kwargs(kwargs)
if not kwargs:
kwargs = None
return self.call('PUT', _target, kwargs, _need_auth)
return self.call("PUT", _target, kwargs, _need_auth)
def post(self, _target, _need_auth=True, **kwargs):
"""
@ -374,7 +388,7 @@ class Client(object):
kwargs = self._canonicalize_kwargs(kwargs)
if not kwargs:
kwargs = None
return self.call('POST', _target, kwargs, _need_auth)
return self.call("POST", _target, kwargs, _need_auth)
def delete(self, _target, _need_auth=True, **kwargs):
"""
@ -392,12 +406,12 @@ class Client(object):
kwargs = self._canonicalize_kwargs(kwargs)
query_string = self._prepare_query_string(kwargs)
if query_string != "":
if '?' in _target:
_target = '%s&%s' % (_target, query_string)
if "?" in _target:
_target = "%s&%s" % (_target, query_string)
else:
_target = '%s?%s' % (_target, query_string)
_target = "%s?%s" % (_target, query_string)
return self.call('DELETE', _target, None, _need_auth)
return self.call("DELETE", _target, None, _need_auth)
## low level helpers
@ -441,35 +455,28 @@ class Client(object):
# error check
if status >= 100 and status < 300:
return json_result
elif status == 403 and json_result.get('errorCode') == 'NOT_GRANTED_CALL':
raise NotGrantedCall(json_result.get('message'),
response=result)
elif status == 403 and json_result.get('errorCode') == 'NOT_CREDENTIAL':
raise NotCredential(json_result.get('message'),
response=result)
elif status == 403 and json_result.get('errorCode') == 'INVALID_KEY':
raise InvalidKey(json_result.get('message'), response=result)
elif status == 403 and json_result.get('errorCode') == 'INVALID_CREDENTIAL':
raise InvalidCredential(json_result.get('message'),
response=result)
elif status == 403 and json_result.get('errorCode') == 'FORBIDDEN':
raise Forbidden(json_result.get('message'), response=result)
elif status == 403 and json_result.get("errorCode") == "NOT_GRANTED_CALL":
raise NotGrantedCall(json_result.get("message"), response=result)
elif status == 403 and json_result.get("errorCode") == "NOT_CREDENTIAL":
raise NotCredential(json_result.get("message"), response=result)
elif status == 403 and json_result.get("errorCode") == "INVALID_KEY":
raise InvalidKey(json_result.get("message"), response=result)
elif status == 403 and json_result.get("errorCode") == "INVALID_CREDENTIAL":
raise InvalidCredential(json_result.get("message"), response=result)
elif status == 403 and json_result.get("errorCode") == "FORBIDDEN":
raise Forbidden(json_result.get("message"), response=result)
elif status == 404:
raise ResourceNotFoundError(json_result.get('message'),
response=result)
raise ResourceNotFoundError(json_result.get("message"), response=result)
elif status == 400:
raise BadParametersError(json_result.get('message'),
response=result)
raise BadParametersError(json_result.get("message"), response=result)
elif status == 409:
raise ResourceConflictError(json_result.get('message'),
response=result)
raise ResourceConflictError(json_result.get("message"), response=result)
elif status == 460:
raise ResourceExpiredError(json_result.get('message'),
response=result)
raise ResourceExpiredError(json_result.get("message"), response=result)
elif status == 0:
raise NetworkError()
else:
raise APIError(json_result.get('message'), response=result)
raise APIError(json_result.get("message"), response=result)
def raw_call(self, method, path, data=None, need_auth=True, headers=None):
"""
@ -495,40 +502,36 @@ class Client(object):
OVH API authentication headers, as well as
the Content-Type header.
"""
body = ''
body = ""
target = self._endpoint + path
if headers is None:
headers = {}
headers['X-Ovh-Application'] = self._application_key
headers["X-Ovh-Application"] = self._application_key
# include payload
if data is not None:
headers['Content-type'] = 'application/json'
headers["Content-type"] = "application/json"
body = json.dumps(data)
# sign request. Never sign 'time' or will recurse infinitely
if need_auth:
if not self._application_secret:
raise InvalidKey("Invalid ApplicationSecret '%s'" %
self._application_secret)
raise InvalidKey("Invalid ApplicationSecret '%s'" % self._application_secret)
if not self._consumer_key:
raise InvalidKey("Invalid ConsumerKey '%s'" %
self._consumer_key)
raise InvalidKey("Invalid ConsumerKey '%s'" % self._consumer_key)
now = str(int(time.time()) + self.time_delta)
signature = hashlib.sha1()
signature.update("+".join([
self._application_secret, self._consumer_key,
method.upper(), target,
body,
now
]).encode('utf-8'))
signature.update(
"+".join([self._application_secret, self._consumer_key, method.upper(), target, body, now]).encode(
"utf-8"
)
)
headers['X-Ovh-Consumer'] = self._consumer_key
headers['X-Ovh-Timestamp'] = now
headers['X-Ovh-Signature'] = "$1$" + signature.hexdigest()
headers["X-Ovh-Consumer"] = self._consumer_key
headers["X-Ovh-Timestamp"] = now
headers["X-Ovh-Signature"] = "$1$" + signature.hexdigest()
return self._session.request(method, target, headers=headers,
data=body, timeout=self._timeout)
return self._session.request(method, target, headers=headers, data=body, timeout=self._timeout)

View file

@ -66,33 +66,35 @@ import os
try:
from ConfigParser import RawConfigParser, NoSectionError, NoOptionError
except ImportError: # pragma: no cover
except ImportError: # pragma: no cover
# Python 3
from configparser import RawConfigParser, NoSectionError, NoOptionError
__all__ = ['config']
__all__ = ["config"]
#: Locations where to look for configuration file by *increasing* priority
CONFIG_PATH = [
'/etc/ovh.conf',
os.path.expanduser('~/.ovh.conf'),
os.path.realpath('./ovh.conf'),
"/etc/ovh.conf",
os.path.expanduser("~/.ovh.conf"),
os.path.realpath("./ovh.conf"),
]
class ConfigurationManager(object):
'''
"""
Application wide configuration manager
'''
"""
def __init__(self):
'''
"""
Create a config parser and load config from environment.
'''
"""
# create config parser
self.config = RawConfigParser()
self.config.read(CONFIG_PATH)
def get(self, section, name):
'''
"""
Load parameter ``name`` from configuration, respecting priority order.
Most of the time, ``section`` will correspond to the current api
``endpoint``. ``default`` section only contains ``endpoint`` and general
@ -101,10 +103,10 @@ class ConfigurationManager(object):
:param str section: configuration section or region name. Ignored when
looking in environment
:param str name: configuration parameter to lookup
'''
"""
# 1/ try env
try:
return os.environ['OVH_'+name.upper()]
return os.environ["OVH_" + name.upper()]
except KeyError:
pass
@ -121,5 +123,6 @@ class ConfigurationManager(object):
# Read an other config file
self.config.read(config_file)
#: System wide instance :py:class:`ConfigurationManager` instance
config = ConfigurationManager()

View file

@ -40,30 +40,31 @@ Hence this module
"""
# Common authorization patterns
API_READ_ONLY = ["GET"]
API_READ_WRITE = ["GET", "POST", "PUT", "DELETE"]
API_READ_ONLY = ["GET"]
API_READ_WRITE = ["GET", "POST", "PUT", "DELETE"]
API_READ_WRITE_SAFE = ["GET", "POST", "PUT"]
class ConsumerKeyRequest(object):
'''
"""
ConsumerKey request. The generated consumer key will be linked to the
client's ``application_key``. When performing the request, the
``consumer_key`` will automatically be registered in the client.
It is recommended to save the generated key as soon as it validated to avoid
requesting a new one on each API access.
'''
"""
def __init__(self, client):
'''
"""
Create a new consumer key helper on API ``client``. The keys will be
tied to the ``application_key`` defined in the client.
'''
"""
self._client = client
self._access_rules = []
def request(self, redirect_url=None):
'''
"""
Create the consumer key with the configures autorizations. The user will
need to validate it before it can be used with the API
@ -73,41 +74,40 @@ class ConsumerKeyRequest(object):
'consumerKey': 'TnpZAd5pYNqxk4RhlPiSRfJ4WrkmII2i',
'validationUrl': 'https://eu.api.ovh.com/auth/?credentialToken=now2OOAVO4Wp6t7bemyN9DMWIobhGjFNZSHmixtVJM4S7mzjkN2L5VBfG96Iy1i0'
}
'''
"""
return self._client.request_consumerkey(self._access_rules, redirect_url)
def add_rule(self, method, path):
'''
"""
Add a new rule to the request. Will grant the ``(method, path)`` tuple.
Path can be any API route pattern like ``/sms/*`` or ``/me``. For example,
to grant RO access on personal data:
>>> ck.add_rule("GET", "/me")
'''
self._access_rules.append({'method': method.upper(), 'path': path})
"""
self._access_rules.append({"method": method.upper(), "path": path})
def add_rules(self, methods, path):
'''
"""
Add rules for ``path`` pattern, for each methods in ``methods``. This is
a convenient helper over ``add_rule``. For example, this could be used
to grant all access on the API at once:
>>> ck.add_rules(["GET", "POST", "PUT", "DELETE"], "/*")
'''
"""
for method in methods:
self.add_rule(method, path)
def add_recursive_rules(self, methods, path):
'''
"""
Use this method to grant access on a full API tree. This is the
recommended way to grant access in the API. It will take care of granted
the root call *AND* sub-calls for you. Which is commonly forgotten...
For example, to grant a full access on ``/sms``:
>>> ck.add_recursive_rules(["GET", "POST", "PUT", "DELETE"], "/sms")
'''
path = path.rstrip('*/ ')
"""
path = path.rstrip("*/ ")
if path:
self.add_rules(methods, path)
self.add_rules(methods, path+'/*')
self.add_rules(methods, path + "/*")

View file

@ -30,10 +30,12 @@
All exceptions used in OVH SDK derives from `APIError`
"""
class APIError(Exception):
"""Base OVH API exception, all specific exceptions inherits from it."""
def __init__(self, *args, **kwargs):
self.response = kwargs.pop('response', None)
self.response = kwargs.pop("response", None)
if self.response is not None:
self.query_id = self.response.headers.get("X-OVH-QUERYID")
else:
@ -41,49 +43,63 @@ class APIError(Exception):
super(APIError, self).__init__(*args, **kwargs)
def __str__(self):
if self.query_id: # pragma: no cover
if self.query_id: # pragma: no cover
return "{} \nOVH-Query-ID: {}".format(super(APIError, self).__str__(), self.query_id)
else: # pragma: no cover
else: # pragma: no cover
return super(APIError, self).__str__()
class HTTPError(APIError):
"""Raised when the request fails at a low level (DNS, network, ...)"""
class InvalidKey(APIError):
"""Raised when trying to sign request with invalid key"""
class InvalidCredential(APIError):
"""Raised when trying to sign request with invalid consumer key"""
class InvalidResponse(APIError):
"""Raised when api response is not valid json"""
class InvalidRegion(APIError):
"""Raised when region is not in `REGIONS`."""
class ReadOnlyError(APIError):
"""Raised when attempting to modify readonly data."""
class ResourceNotFoundError(APIError):
"""Raised when requested resource does not exist."""
class BadParametersError(APIError):
"""Raised when request contains bad parameters."""
class ResourceConflictError(APIError):
"""Raised when trying to create an already existing resource."""
class NetworkError(APIError):
"""Raised when there is an error from network layer."""
class NotGrantedCall(APIError):
"""Raised when there is an error from network layer."""
class NotCredential(APIError):
"""Raised when there is an error from network layer."""
class Forbidden(APIError):
"""Raised when there is an error from network layer."""
class ResourceExpiredError(APIError):
"""Raised when requested resource expired."""

21
pyproject.toml Normal file
View file

@ -0,0 +1,21 @@
[tool.black]
line-length = 120
target-version = ['py310']
include='''
^(
\/[^\/]*
| \/docs\/conf
| \/examples\/.*
| \/ovh\/.*
| \/tests\/.*
).py$
'''
[tool.isort]
profile = "black"
line_length = 120
multi_line_output = 3
forced_separate = ["tests"]
no_lines_before = "LOCALFOLDER"
known_first_party = ["ovh"]
force_sort_within_sections = true

View file

@ -5,6 +5,7 @@ try:
from setuptools import setup
except ImportError:
from distribute_setup import use_setuptools
use_setuptools()
from setuptools import setup

View file

@ -35,38 +35,50 @@ import requests
from ovh.client import Client, ENDPOINTS
from ovh.exceptions import (
APIError, NetworkError, InvalidResponse, InvalidRegion, ReadOnlyError,
ResourceNotFoundError, BadParametersError, ResourceConflictError, HTTPError,
InvalidKey, InvalidCredential, NotGrantedCall, NotCredential, Forbidden,
APIError,
NetworkError,
InvalidResponse,
InvalidRegion,
ReadOnlyError,
ResourceNotFoundError,
BadParametersError,
ResourceConflictError,
HTTPError,
InvalidKey,
InvalidCredential,
NotGrantedCall,
NotCredential,
Forbidden,
ResourceExpiredError,
)
M_ENVIRON = {
'OVH_ENDPOINT': 'soyoustart-ca',
'OVH_APPLICATION_KEY': 'application key from environ',
'OVH_APPLICATION_SECRET': 'application secret from environ',
'OVH_CONSUMER_KEY': 'consumer key from from environ',
"OVH_ENDPOINT": "soyoustart-ca",
"OVH_APPLICATION_KEY": "application key from environ",
"OVH_APPLICATION_SECRET": "application secret from environ",
"OVH_CONSUMER_KEY": "consumer key from from environ",
}
M_CUSTOM_CONFIG_PATH = 'tests/fixtures/custom_ovh.conf'
M_CUSTOM_CONFIG_PATH = "tests/fixtures/custom_ovh.conf"
APPLICATION_KEY = 'fake application key'
APPLICATION_SECRET = 'fake application secret'
CONSUMER_KEY = 'fake consumer key'
ENDPOINT = 'ovh-eu'
ENDPOINT_BAD = 'laponie'
BASE_URL = 'https://eu.api.ovh.com/1.0'
FAKE_URL = 'http://gopher.ovh.net/'
APPLICATION_KEY = "fake application key"
APPLICATION_SECRET = "fake application secret"
CONSUMER_KEY = "fake consumer key"
ENDPOINT = "ovh-eu"
ENDPOINT_BAD = "laponie"
BASE_URL = "https://eu.api.ovh.com/1.0"
FAKE_URL = "http://gopher.ovh.net/"
FAKE_TIME = 1404395889.467238
FAKE_METHOD = 'MeThOd'
FAKE_PATH = '/unit/test'
FAKE_METHOD = "MeThOd"
FAKE_PATH = "/unit/test"
TIMEOUT = 180
class testClient(unittest.TestCase):
def setUp(self):
self.time_patch = mock.patch('time.time', return_value=FAKE_TIME)
self.time_patch = mock.patch("time.time", return_value=FAKE_TIME)
self.time_patch.start()
def tearDown(self):
@ -85,32 +97,31 @@ class testClient(unittest.TestCase):
# override default timeout
timeout = (1, 1)
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET,
CONSUMER_KEY, timeout=timeout)
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET, CONSUMER_KEY, timeout=timeout)
self.assertEqual(timeout, api._timeout)
# invalid region
self.assertRaises(InvalidRegion, Client, ENDPOINT_BAD, '', '', '')
self.assertRaises(InvalidRegion, Client, ENDPOINT_BAD, "", "", "")
def test_init_from_config(self):
with mock.patch.dict('os.environ', M_ENVIRON):
with mock.patch.dict("os.environ", M_ENVIRON):
api = Client()
self.assertEqual('https://ca.api.soyoustart.com/1.0', api._endpoint)
self.assertEqual(M_ENVIRON['OVH_APPLICATION_KEY'], api._application_key)
self.assertEqual(M_ENVIRON['OVH_APPLICATION_SECRET'], api._application_secret)
self.assertEqual(M_ENVIRON['OVH_CONSUMER_KEY'], api._consumer_key)
self.assertEqual("https://ca.api.soyoustart.com/1.0", api._endpoint)
self.assertEqual(M_ENVIRON["OVH_APPLICATION_KEY"], api._application_key)
self.assertEqual(M_ENVIRON["OVH_APPLICATION_SECRET"], api._application_secret)
self.assertEqual(M_ENVIRON["OVH_CONSUMER_KEY"], api._consumer_key)
def test_init_from_custom_config(self):
# custom config file
api = Client(config_file=M_CUSTOM_CONFIG_PATH)
self.assertEqual('https://ca.api.ovh.com/1.0', api._endpoint)
self.assertEqual('This is a fake custom application key', api._application_key)
self.assertEqual('This is a *real* custom application key', api._application_secret)
self.assertEqual('I am customingly kidding', api._consumer_key)
self.assertEqual("https://ca.api.ovh.com/1.0", api._endpoint)
self.assertEqual("This is a fake custom application key", api._application_key)
self.assertEqual("This is a *real* custom application key", api._application_secret)
self.assertEqual("I am customingly kidding", api._consumer_key)
@mock.patch.object(Client, 'call')
@mock.patch.object(Client, "call")
def test_time_delta(self, m_call):
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET, CONSUMER_KEY)
m_call.return_value = 1404395895
@ -118,7 +129,7 @@ class testClient(unittest.TestCase):
# nominal
time_delta = api.time_delta
m_call.assert_called_once_with('GET', '/auth/time', None, False)
m_call.assert_called_once_with("GET", "/auth/time", None, False)
self.assertEqual(time_delta, 6)
self.assertEqual(api._time_delta, 6)
@ -128,23 +139,28 @@ class testClient(unittest.TestCase):
self.assertEqual(api.time_delta, 6)
self.assertFalse(m_call.called)
@mock.patch.object(Client, 'call')
@mock.patch.object(Client, "call")
def test_request_consumerkey(self, m_call):
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET, CONSUMER_KEY)
# nominal
FAKE_RULES = object()
FAKE_CK = object()
RET = {'consumerKey': FAKE_CK}
RET = {"consumerKey": FAKE_CK}
m_call.return_value = RET
ret = api.request_consumerkey(FAKE_RULES, FAKE_URL)
self.assertEqual(RET, ret)
m_call.assert_called_once_with('POST', '/auth/credential', {
'redirection': FAKE_URL,
'accessRules': FAKE_RULES,
}, False)
m_call.assert_called_once_with(
"POST",
"/auth/credential",
{
"redirection": FAKE_URL,
"accessRules": FAKE_RULES,
},
False,
)
def test_new_consumer_key_request(self):
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET, CONSUMER_KEY)
@ -158,121 +174,119 @@ class testClient(unittest.TestCase):
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET, CONSUMER_KEY)
self.assertEqual({}, api._canonicalize_kwargs({}))
self.assertEqual({'from': 'value'}, api._canonicalize_kwargs({'from': 'value'}))
self.assertEqual({'_to': 'value'}, api._canonicalize_kwargs({'_to': 'value'}))
self.assertEqual({'from': 'value'}, api._canonicalize_kwargs({'_from': 'value'}))
self.assertEqual({"from": "value"}, api._canonicalize_kwargs({"from": "value"}))
self.assertEqual({"_to": "value"}, api._canonicalize_kwargs({"_to": "value"}))
self.assertEqual({"from": "value"}, api._canonicalize_kwargs({"_from": "value"}))
@mock.patch.object(Client, 'call')
@mock.patch.object(Client, "call")
def test_get(self, m_call):
# basic test
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET, CONSUMER_KEY)
self.assertEqual(m_call.return_value, api.get(FAKE_URL))
m_call.assert_called_once_with('GET', FAKE_URL, None, True)
m_call.assert_called_once_with("GET", FAKE_URL, None, True)
# append query string
m_call.reset_mock()
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET, CONSUMER_KEY)
self.assertEqual(m_call.return_value, api.get(FAKE_URL, param="test"))
m_call.assert_called_once_with('GET', FAKE_URL+'?param=test', None, True)
m_call.assert_called_once_with("GET", FAKE_URL + "?param=test", None, True)
# append to existing query string
m_call.reset_mock()
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET, CONSUMER_KEY)
self.assertEqual(m_call.return_value, api.get(FAKE_URL+'?query=string', param="test"))
m_call.assert_called_once_with('GET', FAKE_URL+'?query=string&param=test', None, True)
self.assertEqual(m_call.return_value, api.get(FAKE_URL + "?query=string", param="test"))
m_call.assert_called_once_with("GET", FAKE_URL + "?query=string&param=test", None, True)
# boolean arguments
m_call.reset_mock()
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET, CONSUMER_KEY)
self.assertEqual(m_call.return_value, api.get(FAKE_URL+'?query=string', checkbox=True))
m_call.assert_called_once_with('GET', FAKE_URL+'?query=string&checkbox=true', None, True)
self.assertEqual(m_call.return_value, api.get(FAKE_URL + "?query=string", checkbox=True))
m_call.assert_called_once_with("GET", FAKE_URL + "?query=string&checkbox=true", None, True)
# keyword calling convention
m_call.reset_mock()
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET, CONSUMER_KEY)
self.assertEqual(m_call.return_value, api.get(FAKE_URL, _from="start", to="end"))
try:
m_call.assert_called_once_with('GET', FAKE_URL+'?to=end&from=start', None, True)
m_call.assert_called_once_with("GET", FAKE_URL + "?to=end&from=start", None, True)
except:
m_call.assert_called_once_with('GET', FAKE_URL+'?from=start&to=end', None, True)
m_call.assert_called_once_with("GET", FAKE_URL + "?from=start&to=end", None, True)
@mock.patch.object(Client, 'call')
@mock.patch.object(Client, "call")
def test_delete(self, m_call):
# basic test
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET, CONSUMER_KEY)
self.assertEqual(m_call.return_value, api.delete(FAKE_URL))
m_call.assert_called_once_with('DELETE', FAKE_URL, None, True)
m_call.assert_called_once_with("DELETE", FAKE_URL, None, True)
# append query string
m_call.reset_mock()
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET, CONSUMER_KEY)
self.assertEqual(m_call.return_value, api.delete(FAKE_URL, param="test"))
m_call.assert_called_once_with('DELETE', FAKE_URL+'?param=test', None, True)
m_call.assert_called_once_with("DELETE", FAKE_URL + "?param=test", None, True)
# append to existing query string
m_call.reset_mock()
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET, CONSUMER_KEY)
self.assertEqual(m_call.return_value, api.delete(FAKE_URL+'?query=string', param="test"))
m_call.assert_called_once_with('DELETE', FAKE_URL+'?query=string&param=test', None, True)
self.assertEqual(m_call.return_value, api.delete(FAKE_URL + "?query=string", param="test"))
m_call.assert_called_once_with("DELETE", FAKE_URL + "?query=string&param=test", None, True)
# boolean arguments
m_call.reset_mock()
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET, CONSUMER_KEY)
self.assertEqual(m_call.return_value, api.delete(FAKE_URL+'?query=string', checkbox=True))
m_call.assert_called_once_with('DELETE', FAKE_URL+'?query=string&checkbox=true', None, True)
self.assertEqual(m_call.return_value, api.delete(FAKE_URL + "?query=string", checkbox=True))
m_call.assert_called_once_with("DELETE", FAKE_URL + "?query=string&checkbox=true", None, True)
# keyword calling convention
m_call.reset_mock()
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET, CONSUMER_KEY)
self.assertEqual(m_call.return_value, api.delete(FAKE_URL, _from="start", to="end"))
try:
m_call.assert_called_once_with('DELETE', FAKE_URL+'?to=end&from=start', None, True)
m_call.assert_called_once_with("DELETE", FAKE_URL + "?to=end&from=start", None, True)
except:
m_call.assert_called_once_with('DELETE', FAKE_URL+'?from=start&to=end', None, True)
m_call.assert_called_once_with("DELETE", FAKE_URL + "?from=start&to=end", None, True)
@mock.patch.object(Client, 'call')
@mock.patch.object(Client, "call")
def test_post(self, m_call):
PAYLOAD = {
'arg1': object(),
'arg2': object(),
'arg3': object(),
'arg4': False,
"arg1": object(),
"arg2": object(),
"arg3": object(),
"arg4": False,
}
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET, CONSUMER_KEY)
self.assertEqual(m_call.return_value, api.post(FAKE_URL, **PAYLOAD))
m_call.assert_called_once_with('POST', FAKE_URL, PAYLOAD, True)
m_call.assert_called_once_with("POST", FAKE_URL, PAYLOAD, True)
@mock.patch.object(Client, 'call')
@mock.patch.object(Client, "call")
def test_post_no_body(self, m_call):
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET, CONSUMER_KEY)
self.assertEqual(m_call.return_value, api.post(FAKE_URL))
m_call.assert_called_once_with('POST', FAKE_URL, None, True)
m_call.assert_called_once_with("POST", FAKE_URL, None, True)
@mock.patch.object(Client, 'call')
@mock.patch.object(Client, "call")
def test_put(self, m_call):
PAYLOAD = {
'arg1': object(),
'arg2': object(),
'arg3': object(),
'arg4': False,
"arg1": object(),
"arg2": object(),
"arg3": object(),
"arg4": False,
}
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET, CONSUMER_KEY)
self.assertEqual(m_call.return_value, api.put(FAKE_URL, **PAYLOAD))
m_call.assert_called_once_with('PUT', FAKE_URL, PAYLOAD, True)
m_call.assert_called_once_with("PUT", FAKE_URL, PAYLOAD, True)
@mock.patch.object(Client, 'call')
@mock.patch.object(Client, "call")
def test_put_no_body(self, m_call):
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET, CONSUMER_KEY)
self.assertEqual(m_call.return_value, api.put(FAKE_URL))
m_call.assert_called_once_with('PUT', FAKE_URL, None, True)
m_call.assert_called_once_with("PUT", FAKE_URL, None, True)
## test core function
@mock.patch('ovh.client.Session.request')
@mock.patch("ovh.client.Session.request")
def test_call_no_sign(self, m_req):
m_res = m_req.return_value
m_json = m_res.json.return_value
@ -283,23 +297,28 @@ class testClient(unittest.TestCase):
m_res.status_code = 200
self.assertEqual(m_json, api.call(FAKE_METHOD, FAKE_PATH, None, False))
m_req.assert_called_once_with(
FAKE_METHOD, BASE_URL+'/unit/test',
headers={'X-Ovh-Application': APPLICATION_KEY}, data='',
timeout=TIMEOUT
FAKE_METHOD,
BASE_URL + "/unit/test",
headers={"X-Ovh-Application": APPLICATION_KEY},
data="",
timeout=TIMEOUT,
)
m_req.reset_mock()
# data, nominal
m_res.status_code = 200
data = {'key': 'value'}
data = {"key": "value"}
j_data = json.dumps(data)
self.assertEqual(m_json, api.call(FAKE_METHOD, FAKE_PATH, data, False))
m_req.assert_called_once_with(
FAKE_METHOD, BASE_URL+'/unit/test',
FAKE_METHOD,
BASE_URL + "/unit/test",
headers={
'X-Ovh-Application': APPLICATION_KEY,
'Content-type': 'application/json',
}, data=j_data, timeout=TIMEOUT
"X-Ovh-Application": APPLICATION_KEY,
"Content-type": "application/json",
},
data=j_data,
timeout=TIMEOUT,
)
m_req.reset_mock()
@ -317,19 +336,19 @@ class testClient(unittest.TestCase):
m_res.status_code = 404
self.assertRaises(ResourceNotFoundError, api.call, FAKE_METHOD, FAKE_PATH, None, False)
m_res.status_code = 403
m_res.json.return_value = {'errorCode': "NOT_GRANTED_CALL"}
m_res.json.return_value = {"errorCode": "NOT_GRANTED_CALL"}
self.assertRaises(NotGrantedCall, api.call, FAKE_METHOD, FAKE_PATH, None, False)
m_res.status_code = 403
m_res.json.return_value = {'errorCode': "NOT_CREDENTIAL"}
m_res.json.return_value = {"errorCode": "NOT_CREDENTIAL"}
self.assertRaises(NotCredential, api.call, FAKE_METHOD, FAKE_PATH, None, False)
m_res.status_code = 403
m_res.json.return_value = {'errorCode': "INVALID_KEY"}
m_res.json.return_value = {"errorCode": "INVALID_KEY"}
self.assertRaises(InvalidKey, api.call, FAKE_METHOD, FAKE_PATH, None, False)
m_res.status_code = 403
m_res.json.return_value = {'errorCode': "INVALID_CREDENTIAL"}
m_res.json.return_value = {"errorCode": "INVALID_CREDENTIAL"}
self.assertRaises(InvalidCredential, api.call, FAKE_METHOD, FAKE_PATH, None, False)
m_res.status_code = 403
m_res.json.return_value = {'errorCode': "FORBIDDEN"}
m_res.json.return_value = {"errorCode": "FORBIDDEN"}
self.assertRaises(Forbidden, api.call, FAKE_METHOD, FAKE_PATH, None, False)
m_res.status_code = 400
self.assertRaises(BadParametersError, api.call, FAKE_METHOD, FAKE_PATH, None, False)
@ -344,8 +363,7 @@ class testClient(unittest.TestCase):
m_res.status_code = 306
self.assertRaises(APIError, api.call, FAKE_METHOD, FAKE_PATH, None, False)
@mock.patch('ovh.client.Session.request')
@mock.patch("ovh.client.Session.request")
def test_call_query_id(self, m_req):
m_res = m_req.return_value
m_json = m_res.json.return_value
@ -357,13 +375,12 @@ class testClient(unittest.TestCase):
self.assertRaises(APIError, api.call, FAKE_METHOD, FAKE_PATH, None, False)
try:
api.call(FAKE_METHOD, FAKE_PATH, None, False)
self.assertEqual(0, 1) # should fail as method have to raise APIError
self.assertEqual(0, 1) # should fail as method have to raise APIError
except APIError as e:
self.assertEqual(e.query_id, "FR.test1")
@mock.patch('ovh.client.Session.request')
@mock.patch('ovh.client.Client.time_delta', new_callable=mock.PropertyMock)
@mock.patch("ovh.client.Session.request")
@mock.patch("ovh.client.Client.time_delta", new_callable=mock.PropertyMock)
def test_call_signature(self, m_time_delta, m_req):
m_res = m_req.return_value
m_json = m_res.json.return_value
@ -376,38 +393,44 @@ class testClient(unittest.TestCase):
self.assertEqual(m_json, api.call(FAKE_METHOD, FAKE_PATH, None, True))
m_time_delta.assert_called_once_with()
m_req.assert_called_once_with(
FAKE_METHOD, BASE_URL+'/unit/test',
FAKE_METHOD,
BASE_URL + "/unit/test",
headers={
'X-Ovh-Consumer': CONSUMER_KEY,
'X-Ovh-Application': APPLICATION_KEY,
'X-Ovh-Signature': '$1$16ae5ba8c63841b1951575be905867991d5f49dc',
'X-Ovh-Timestamp': '1404395931',
}, data='', timeout=TIMEOUT
"X-Ovh-Consumer": CONSUMER_KEY,
"X-Ovh-Application": APPLICATION_KEY,
"X-Ovh-Signature": "$1$16ae5ba8c63841b1951575be905867991d5f49dc",
"X-Ovh-Timestamp": "1404395931",
},
data="",
timeout=TIMEOUT,
)
m_time_delta.reset_mock()
m_req.reset_mock()
# data, nominal
data = OrderedDict([('some', 'random'), ('data', 42)])
data = OrderedDict([("some", "random"), ("data", 42)])
m_res.status_code = 200
self.assertEqual(m_json, api.call(FAKE_METHOD, FAKE_PATH, data, True))
m_time_delta.assert_called_once_with()
m_req.assert_called_once_with(
FAKE_METHOD, BASE_URL+'/unit/test',
FAKE_METHOD,
BASE_URL + "/unit/test",
headers={
'X-Ovh-Consumer': CONSUMER_KEY,
'X-Ovh-Application': APPLICATION_KEY,
'Content-type': 'application/json',
'X-Ovh-Timestamp': '1404395931',
'X-Ovh-Signature': '$1$9acb1ac0120006d16261a635aed788e83ab172d2',
}, data=json.dumps(data), timeout=TIMEOUT
"X-Ovh-Consumer": CONSUMER_KEY,
"X-Ovh-Application": APPLICATION_KEY,
"Content-type": "application/json",
"X-Ovh-Timestamp": "1404395931",
"X-Ovh-Signature": "$1$9acb1ac0120006d16261a635aed788e83ab172d2",
},
data=json.dumps(data),
timeout=TIMEOUT,
)
m_time_delta.reset_mock()
m_req.reset_mock()
# Overwrite configuration to avoid interfering with any local config
from ovh.client import config
try:
from ConfigParser import RawConfigParser
except ImportError:
@ -427,32 +450,38 @@ class testClient(unittest.TestCase):
# Restore configuration
config.config = self._orig_config
@mock.patch('ovh.client.Session.request', return_value="Let's assume requests will return this")
@mock.patch("ovh.client.Session.request", return_value="Let's assume requests will return this")
def test_raw_call(self, m_req):
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET)
r = api.raw_call(FAKE_METHOD, FAKE_PATH, None, False)
self.assertEqual(r, "Let's assume requests will return this")
@mock.patch('ovh.client.Session.request', return_value="Let's assume requests will return this")
@mock.patch("ovh.client.Session.request", return_value="Let's assume requests will return this")
def test_raw_call_with_headers(self, m_req):
api = Client(ENDPOINT, APPLICATION_KEY, APPLICATION_SECRET)
r = api.raw_call(FAKE_METHOD, FAKE_PATH, None, False, headers={
'Custom-Header': "1",
})
r = api.raw_call(
FAKE_METHOD,
FAKE_PATH,
None,
False,
headers={
"Custom-Header": "1",
},
)
self.assertEqual(r, "Let's assume requests will return this")
m_req.assert_called_once_with(
FAKE_METHOD, BASE_URL+FAKE_PATH,
FAKE_METHOD,
BASE_URL + FAKE_PATH,
headers={
'Custom-Header': "1",
'X-Ovh-Application': APPLICATION_KEY,
"Custom-Header": "1",
"X-Ovh-Application": APPLICATION_KEY,
},
data='',
timeout=TIMEOUT
data="",
timeout=TIMEOUT,
)
# Perform real API tests.
def test_endpoints(self):
for endpoint in ENDPOINTS.keys():
auth_time = Client(endpoint).get('/auth/time', _need_auth=False)
auth_time = Client(endpoint).get("/auth/time", _need_auth=False)
self.assertTrue(auth_time > 0)

View file

@ -32,20 +32,21 @@ from unittest import mock
import os
M_CONFIG_PATH = [
'tests/fixtures/etc_ovh.conf',
'tests/fixtures/home_ovh.conf',
'tests/fixtures/pwd_ovh.conf',
"tests/fixtures/etc_ovh.conf",
"tests/fixtures/home_ovh.conf",
"tests/fixtures/pwd_ovh.conf",
]
M_CUSTOM_CONFIG_PATH = 'tests/fixtures/custom_ovh.conf'
M_CUSTOM_CONFIG_PATH = "tests/fixtures/custom_ovh.conf"
M_ENVIRON = {
'OVH_ENDPOINT': 'endpoint from environ',
'OVH_APPLICATION_KEY': 'application key from environ',
'OVH_APPLICATION_SECRET': 'application secret from environ',
'OVH_CONSUMER_KEY': 'consumer key from from environ',
"OVH_ENDPOINT": "endpoint from environ",
"OVH_APPLICATION_KEY": "application key from environ",
"OVH_APPLICATION_SECRET": "application secret from environ",
"OVH_CONSUMER_KEY": "consumer key from from environ",
}
class testConfig(unittest.TestCase):
def setUp(self):
"""Overload configuration lookup path"""
@ -57,48 +58,50 @@ class testConfig(unittest.TestCase):
config.CONFIG_PATH = self._orig_CONFIG_PATH
def test_real_lookup_path(self):
home = os.environ['HOME']
pwd = os.environ['PWD']
home = os.environ["HOME"]
pwd = os.environ["PWD"]
self.assertEqual([
'/etc/ovh.conf',
home+'/.ovh.conf',
pwd+'/ovh.conf',
], self._orig_CONFIG_PATH)
self.assertEqual(
[
"/etc/ovh.conf",
home + "/.ovh.conf",
pwd + "/ovh.conf",
],
self._orig_CONFIG_PATH,
)
def test_config_get_conf(self):
conf = config.ConfigurationManager()
self.assertEqual('soyoustart-ca', conf.get('default', 'endpoint'))
self.assertEqual('This is a *fake* global application key', conf.get('ovh-eu', 'application_key'))
self.assertEqual('This is a *real* global application secret', conf.get('ovh-eu', 'application_secret'))
self.assertEqual('I am kidding at home', conf.get('ovh-eu', 'consumer_key'))
self.assertEqual('This is a fake local application key', conf.get('soyoustart-ca', 'application_key'))
self.assertEqual('This is a *real* local application key', conf.get('soyoustart-ca', 'application_secret'))
self.assertEqual('I am locally kidding', conf.get('soyoustart-ca', 'consumer_key'))
self.assertEqual("soyoustart-ca", conf.get("default", "endpoint"))
self.assertEqual("This is a *fake* global application key", conf.get("ovh-eu", "application_key"))
self.assertEqual("This is a *real* global application secret", conf.get("ovh-eu", "application_secret"))
self.assertEqual("I am kidding at home", conf.get("ovh-eu", "consumer_key"))
self.assertEqual("This is a fake local application key", conf.get("soyoustart-ca", "application_key"))
self.assertEqual("This is a *real* local application key", conf.get("soyoustart-ca", "application_secret"))
self.assertEqual("I am locally kidding", conf.get("soyoustart-ca", "consumer_key"))
self.assertTrue(conf.get('ovh-eu', 'non-existent') is None)
self.assertTrue(conf.get('ovh-ca', 'application_key') is None)
self.assertTrue(conf.get('ovh-laponie', 'application_key') is None)
self.assertTrue(conf.get("ovh-eu", "non-existent") is None)
self.assertTrue(conf.get("ovh-ca", "application_key") is None)
self.assertTrue(conf.get("ovh-laponie", "application_key") is None)
def test_config_get_conf_env_rules_them_all(self):
conf = config.ConfigurationManager()
with mock.patch.dict('os.environ', M_ENVIRON):
self.assertEqual(M_ENVIRON['OVH_ENDPOINT'], conf.get('wathever', 'endpoint'))
self.assertEqual(M_ENVIRON['OVH_APPLICATION_KEY'], conf.get('wathever', 'application_key'))
self.assertEqual(M_ENVIRON['OVH_APPLICATION_SECRET'], conf.get('wathever', 'application_secret'))
self.assertEqual(M_ENVIRON['OVH_CONSUMER_KEY'], conf.get('wathever', 'consumer_key'))
with mock.patch.dict("os.environ", M_ENVIRON):
self.assertEqual(M_ENVIRON["OVH_ENDPOINT"], conf.get("wathever", "endpoint"))
self.assertEqual(M_ENVIRON["OVH_APPLICATION_KEY"], conf.get("wathever", "application_key"))
self.assertEqual(M_ENVIRON["OVH_APPLICATION_SECRET"], conf.get("wathever", "application_secret"))
self.assertEqual(M_ENVIRON["OVH_CONSUMER_KEY"], conf.get("wathever", "consumer_key"))
self.assertTrue(conf.get('ovh-eu', 'non-existent') is None)
self.assertTrue(conf.get("ovh-eu", "non-existent") is None)
def test_config_get_custom_conf(self):
conf = config.ConfigurationManager()
conf.read(M_CUSTOM_CONFIG_PATH)
self.assertEqual('ovh-ca', conf.get('default', 'endpoint'))
self.assertEqual('This is a fake custom application key', conf.get('ovh-ca', 'application_key'))
self.assertEqual('This is a *real* custom application key', conf.get('ovh-ca', 'application_secret'))
self.assertEqual('I am customingly kidding', conf.get('ovh-ca', 'consumer_key'))
self.assertTrue(conf.get('ovh-eu', 'non-existent') is None)
self.assertEqual("ovh-ca", conf.get("default", "endpoint"))
self.assertEqual("This is a fake custom application key", conf.get("ovh-ca", "application_key"))
self.assertEqual("This is a *real* custom application key", conf.get("ovh-ca", "application_secret"))
self.assertEqual("I am customingly kidding", conf.get("ovh-ca", "consumer_key"))
self.assertTrue(conf.get("ovh-eu", "non-existent") is None)

View file

@ -29,10 +29,12 @@
import unittest
from unittest import mock
class testConsumerKeyRequest(unittest.TestCase):
def test_add_rules(self):
# Prepare
import ovh
m_client = mock.Mock()
ck = ovh.ConsumerKeyRequest(m_client)
@ -41,48 +43,58 @@ class testConsumerKeyRequest(unittest.TestCase):
ck._access_rules = []
# Test: allow one
ck.add_rule("GET", '/me')
self.assertEqual([
{'method': 'GET', 'path': '/me'},
], ck._access_rules)
ck.add_rule("GET", "/me")
self.assertEqual(
[
{"method": "GET", "path": "/me"},
],
ck._access_rules,
)
ck._access_rules = []
# Test: allow safe methods on domain
ck.add_rules(ovh.API_READ_WRITE_SAFE, '/domains/test.com')
self.assertEqual([
{'method': 'GET', 'path': '/domains/test.com'},
{'method': 'POST', 'path': '/domains/test.com'},
{'method': 'PUT', 'path': '/domains/test.com'},
], ck._access_rules)
ck.add_rules(ovh.API_READ_WRITE_SAFE, "/domains/test.com")
self.assertEqual(
[
{"method": "GET", "path": "/domains/test.com"},
{"method": "POST", "path": "/domains/test.com"},
{"method": "PUT", "path": "/domains/test.com"},
],
ck._access_rules,
)
ck._access_rules = []
# Test: allow all sms, strips suffix
ck.add_recursive_rules(ovh.API_READ_WRITE, '/sms/*')
self.assertEqual([
{'method': 'GET', 'path': '/sms'},
{'method': 'POST', 'path': '/sms'},
{'method': 'PUT', 'path': '/sms'},
{'method': 'DELETE', 'path': '/sms'},
{'method': 'GET', 'path': '/sms/*'},
{'method': 'POST', 'path': '/sms/*'},
{'method': 'PUT', 'path': '/sms/*'},
{'method': 'DELETE', 'path': '/sms/*'},
], ck._access_rules)
ck.add_recursive_rules(ovh.API_READ_WRITE, "/sms/*")
self.assertEqual(
[
{"method": "GET", "path": "/sms"},
{"method": "POST", "path": "/sms"},
{"method": "PUT", "path": "/sms"},
{"method": "DELETE", "path": "/sms"},
{"method": "GET", "path": "/sms/*"},
{"method": "POST", "path": "/sms/*"},
{"method": "PUT", "path": "/sms/*"},
{"method": "DELETE", "path": "/sms/*"},
],
ck._access_rules,
)
ck._access_rules = []
# Test: allow all, does not insert the empty rule
ck.add_recursive_rules(ovh.API_READ_WRITE, '/')
self.assertEqual([
{'method': 'GET', 'path': '/*'},
{'method': 'POST', 'path': '/*'},
{'method': 'PUT', 'path': '/*'},
{'method': 'DELETE', 'path': '/*'},
], ck._access_rules)
ck.add_recursive_rules(ovh.API_READ_WRITE, "/")
self.assertEqual(
[
{"method": "GET", "path": "/*"},
{"method": "POST", "path": "/*"},
{"method": "PUT", "path": "/*"},
{"method": "DELETE", "path": "/*"},
],
ck._access_rules,
)
ck._access_rules = []
# Test launch request
ck.add_recursive_rules(ovh.API_READ_WRITE, '/')
ck.add_recursive_rules(ovh.API_READ_WRITE, "/")
self.assertEqual(m_client.request_consumerkey.return_value, ck.request())
m_client.request_consumerkey.assert_called_once_with(ck._access_rules, None)