/usr/share/pyshared/boto/sts/connection.py is in python-boto 2.2.2-0ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | # Copyright (c) 2011 Mitch Garnaat http://garnaat.org/
# Copyright (c) 2011, Eucalyptus Systems, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
from boto.connection import AWSQueryConnection
from boto.regioninfo import RegionInfo
from credentials import Credentials, FederationToken
import boto
import boto.utils
import datetime
import threading
_session_token_cache = {}
class STSConnection(AWSQueryConnection):
DefaultRegionName = 'us-east-1'
DefaultRegionEndpoint = 'sts.amazonaws.com'
APIVersion = '2011-06-15'
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
is_secure=True, port=None, proxy=None, proxy_port=None,
proxy_user=None, proxy_pass=None, debug=0,
https_connection_factory=None, region=None, path='/',
converter=None):
if not region:
region = RegionInfo(self, self.DefaultRegionName,
self.DefaultRegionEndpoint,
connection_cls=STSConnection)
self.region = region
self._mutex = threading.Semaphore()
AWSQueryConnection.__init__(self, aws_access_key_id,
aws_secret_access_key,
is_secure, port, proxy, proxy_port,
proxy_user, proxy_pass,
self.region.endpoint, debug,
https_connection_factory, path)
def _required_auth_capability(self):
return ['sign-v2']
def _check_token_cache(self, token_key, duration=None, window_seconds=60):
token = _session_token_cache.get(token_key, None)
if token:
now = datetime.datetime.utcnow()
expires = boto.utils.parse_ts(token.expiration)
delta = expires - now
if delta < datetime.timedelta(seconds=window_seconds):
msg = 'Cached session token %s is expired' % token_key
boto.log.debug(msg)
token = None
return token
def _get_session_token(self, duration=None):
params = {}
if duration:
params['DurationSeconds'] = duration
return self.get_object('GetSessionToken', params,
Credentials, verb='POST')
def get_session_token(self, duration=None, force_new=False):
"""
Return a valid session token. Because retrieving new tokens
from the Secure Token Service is a fairly heavyweight operation
this module caches previously retrieved tokens and returns
them when appropriate. Each token is cached with a key
consisting of the region name of the STS endpoint
concatenated with the requesting user's access id. If there
is a token in the cache meeting with this key, the session
expiration is checked to make sure it is still valid and if
so, the cached token is returned. Otherwise, a new session
token is requested from STS and it is placed into the cache
and returned.
:type duration: int
:param duration: The number of seconds the credentials should
remain valid.
:type force_new: bool
:param force_new: If this parameter is True, a new session token
will be retrieved from the Secure Token Service regardless
of whether there is a valid cached token or not.
"""
token_key = '%s:%s' % (self.region.name, self.provider.access_key)
token = self._check_token_cache(token_key, duration)
if force_new or not token:
boto.log.debug('fetching a new token for %s' % token_key)
self._mutex.acquire()
token = self._get_session_token(duration)
_session_token_cache[token_key] = token
self._mutex.release()
return token
def get_federation_token(self, name, duration=None, policy=None):
"""
:type name: str
:param name: The name of the Federated user associated with
the credentials.
:type duration: int
:param duration: The number of seconds the credentials should
remain valid.
:type policy: str
:param policy: A JSON policy to associate with these credentials.
"""
params = {'Name' : name}
if duration:
params['DurationSeconds'] = duration
if policy:
params['Policy'] = policy
return self.get_object('GetFederationToken', params,
FederationToken, verb='POST')
|