/usr/lib/python2.7/dist-packages/openstack/session.py is in python-openstacksdk 0.8.1-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The :class:`~openstack.session.Session` overrides
:class:`~keystoneauth1.session.Session` to provide end point filtering and
mapping KSA exceptions to SDK exceptions.
"""
import re
from keystoneauth1 import exceptions as _exceptions
from keystoneauth1 import session as _session
import openstack
from openstack import exceptions
from six.moves.urllib import parse
DEFAULT_USER_AGENT = "openstacksdk/%s" % openstack.__version__
VERSION_PATTERN = re.compile('/v\d[\d.]*')
def parse_url(filt, url):
result = parse.urlparse(url)
path = result.path
vstr = VERSION_PATTERN.search(path)
if not vstr:
return result.scheme + "://" + result.netloc + "/" + filt.get_path()
start, end = vstr.span()
prefix = path[:start]
version = '/' + filt.get_path(path[start + 1:end])
postfix = path[end:].rstrip('/') if path[end:] else ''
url = result.scheme + "://" + result.netloc + prefix + version + postfix
return url
def map_exceptions(func):
def map_exceptions_wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except _exceptions.HttpError as e:
if e.http_status == 404:
raise exceptions.NotFoundException(
message=e.message, details=e.details,
response=e.response, request_id=e.request_id,
url=e.url, method=e.method,
http_status=e.http_status, cause=e)
else:
raise exceptions.HttpException(
message=e.message, details=e.details,
response=e.response, request_id=e.request_id,
url=e.url, method=e.method,
http_status=e.http_status, cause=e)
except _exceptions.ClientException as e:
raise exceptions.SDKException(message=e.message, cause=e)
return map_exceptions_wrapper
class Session(_session.Session):
def __init__(self, profile, user_agent=None, **kwargs):
"""Create a new Keystone auth session with a profile.
:param profile: If the user has any special profiles such as the
service name, region, version or interface, they may be provided
in the profile object. If no profiles are provided, the
services that appear first in the service catalog will be used.
:param user_agent: A User-Agent header string to use for the
request. If not provided, a default of
:attr:`~openstack.session.DEFAULT_USER_AGENT`
is used, which contains the openstacksdk version
When a non-None value is passed, it will be
prepended to the default.
:type profile: :class:`~openstack.profile.Profile`
"""
if user_agent is not None:
self.user_agent = "%s %s" % (user_agent, DEFAULT_USER_AGENT)
else:
self.user_agent = DEFAULT_USER_AGENT
super(Session, self).__init__(user_agent=self.user_agent, **kwargs)
self.profile = profile
def get_endpoint(self, auth=None, interface=None, **kwargs):
"""Override get endpoint to automate endpoint filtering"""
service_type = kwargs.get('service_type')
filt = self.profile.get_filter(service_type)
if filt.interface is None:
filt.interface = interface
url = super(Session, self).get_endpoint(auth, **filt.get_filter())
return parse_url(filt, url)
@map_exceptions
def request(self, *args, **kwargs):
return super(Session, self).request(*args, **kwargs)
|