/usr/lib/python2.7/dist-packages/repoze/who/api.py is in python-repoze.who 2.2-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 | from zope.interface import implementer
from repoze.who.interfaces import IAPI
from repoze.who.interfaces import IAPIFactory
from repoze.who.interfaces import IIdentifier
from repoze.who.interfaces import IAuthenticator
from repoze.who.interfaces import IChallenger
from repoze.who.interfaces import IMetadataProvider
def get_api(environ):
return environ.get('repoze.who.api')
@implementer(IAPIFactory)
class APIFactory(object):
def __init__(self,
identifiers=(),
authenticators=(),
challengers=(),
mdproviders=(),
request_classifier=None,
challenge_decider=None,
remote_user_key = 'REMOTE_USER',
logger=None,
):
self.identifiers = identifiers
self.authenticators = authenticators
self.challengers = challengers
self.mdproviders = mdproviders
self.request_classifier = request_classifier
self.challenge_decider = challenge_decider
self.remote_user_key = remote_user_key
self.logger = logger
def __call__(self, environ):
""" See IAPIFactory.
"""
api = environ.get('repoze.who.api')
if api is None:
api = environ['repoze.who.api'] = API(environ,
self.identifiers,
self.authenticators,
self.challengers,
self.mdproviders,
self.request_classifier,
self.challenge_decider,
self.remote_user_key,
self.logger,
)
return api
def verify(plugin, iface):
from zope.interface.verify import verifyObject
verifyObject(iface, plugin, tentative=True)
def make_registries(identifiers, authenticators, challengers, mdproviders):
from zope.interface.verify import BrokenImplementation
interface_registry = {}
name_registry = {}
for supplied, iface in [ (identifiers, IIdentifier),
(authenticators, IAuthenticator),
(challengers, IChallenger),
(mdproviders, IMetadataProvider)]:
for name, value in supplied:
try:
verify(value, iface)
except BrokenImplementation as why:
why = str(why)
raise ValueError(str(name) + ': ' + why)
L = interface_registry.setdefault(iface, [])
L.append(value)
name_registry[name] = value
return interface_registry, name_registry
def match_classification(iface, plugins, classification):
result = []
for plugin in plugins:
plugin_classifications = getattr(plugin, 'classifications', {})
iface_classifications = plugin_classifications.get(iface)
if not iface_classifications: # good for any
result.append(plugin)
continue
if classification in iface_classifications:
result.append(plugin)
return result
@implementer(IAPI)
class API(object):
def __init__(self,
environ,
identifiers,
authenticators,
challengers,
mdproviders,
request_classifier,
challenge_decider,
remote_user_key,
logger,
):
self.environ = environ
(self.interface_registry,
self.name_registry) = make_registries(identifiers, authenticators,
challengers, mdproviders)
self.identifiers = identifiers
self.authenticators = authenticators
self.challengers = challengers
self.mdproviders = mdproviders
self.challenge_decider = challenge_decider
self.remote_user_key = remote_user_key
self.logger = logger
classification = self.classification = (request_classifier and
request_classifier(environ))
logger and logger.info('request classification: %s' % classification)
def authenticate(self):
ids = self._identify()
# ids will be list of tuples: [ (IIdentifier, identity) ]
if ids:
auth_ids = self._authenticate(ids)
# auth_ids will be a list of five-tuples in the form
# ( (auth_rank, id_rank), authenticator, identifier, identity,
# userid )
#
# When sorted, its first element will represent the "best"
# identity for this request.
if auth_ids:
auth_ids.sort()
best = auth_ids[0]
rank, authenticator, identifier, identity, userid = best
identity = Identity(identity) # dont show contents at print
identity['authenticator'] = authenticator
identity['identifier'] = identifier
# allow IMetadataProvider plugins to scribble on the identity
self._add_metadata(identity)
# add the identity to the environment; a downstream
# application can mutate it to do an 'identity reset'
# as necessary, e.g. identity['login'] = 'foo',
# identity['password'] = 'bar'
self.environ['repoze.who.identity'] = identity
# set the REMOTE_USER
self.environ[self.remote_user_key] = userid
return identity
self.logger and self.logger.info(
'no identities found, not authenticating')
def challenge(self, status='403 Forbidden', app_headers=()):
""" See IAPI.
"""
identity = self.environ.get('repoze.who.identity', {})
identifier = identity.get('identifier')
logger = self.logger
forget_headers = []
if identifier:
id_forget_headers = identifier.forget(self.environ, identity)
if id_forget_headers is not None:
forget_headers.extend(id_forget_headers)
logger and logger.info('forgetting via headers from %s: %s'
% (identifier, forget_headers))
candidates = self.interface_registry.get(IChallenger, ())
logger and logger.debug('challengers registered: %s' % repr(candidates))
plugins = match_classification(IChallenger, candidates,
self.classification)
logger and logger.debug('challengers matched for '
'classification "%s": %s'
% (self.classification, plugins))
for plugin in plugins:
app = plugin.challenge(self.environ, status, app_headers,
forget_headers)
if app is not None:
# new WSGI application
logger and logger.info(
'challenger plugin %s "challenge" returned an app' % (
plugin))
return app
# signifies no challenge
logger and logger.info('no challenge app returned')
return None
def remember(self, identity=None):
""" See IAPI.
"""
headers = ()
if identity is None:
identity = self.environ.get('repoze.who.identity', {})
identifier = identity.get('identifier')
if identifier:
got_headers = identifier.remember(self.environ, identity)
if got_headers:
headers = got_headers
logger = self.logger
logger and logger.info('remembering via headers from %s: %s'
% (identifier, headers))
return headers
def forget(self, identity=None):
""" See IAPI.
"""
headers = ()
if identity is None:
identity = self.environ.get('repoze.who.identity', {})
identifier = identity.get('identifier')
if identifier:
got_headers = identifier.forget(self.environ, identity)
if got_headers:
headers = got_headers
logger = self.logger
logger and logger.info('forgetting via headers from %s: %s'
% (identifier, headers))
return headers
def login(self, credentials, identifier_name=None):
""" See IAPI.
"""
authenticated = identity = plugin = None
headers = []
# Filter identifiers using 'identifier_name', if provided.
if identifier_name is not None:
identifiers = [(name, plugin) for name, plugin in self.identifiers
if name == identifier_name]
else:
identifiers = self.identifiers
# First pass: for each identifier, pretend that it was the source
# of the credentials, and try to authenticate.
for name, identifier in identifiers:
authenticated = self._authenticate([(identifier, credentials)])
if authenticated: # and therefore can remember it
rank, plugin, identifier, identity, userid = authenticated[0]
break
# Second pass to allow identifiers which passed on auth to participate
# in remember / forget.
for name, identifier in identifiers:
if identity is not None:
i_headers = identifier.remember(self.environ, identity)
else:
i_headers = identifier.forget(self.environ, None)
if i_headers is not None:
headers.extend(i_headers)
return identity, headers
def logout(self, identifier_name=None):
""" See IAPI.
"""
authenticated = None
headers = []
# Filter identifiers using 'identifier_name', if provided.
if identifier_name is not None:
identifiers = [(name, plugin) for name, plugin in self.identifiers
if name == identifier_name]
else:
identifiers = self.identifiers
for name, identifier in identifiers:
headers.extend(identifier.forget(self.environ, None))
# we need to remove the identity for hybrid middleware/api usages to
# work correctly: middleware calls ``remember`` unconditionally "on
# the way out", and if an identity is found, competing login headers
# will be set.
if 'repoze.who.identity' in self.environ:
del self.environ['repoze.who.identity']
return headers
def _identify(self):
""" See IAPI.
"""
logger = self.logger
candidates = self.interface_registry.get(IIdentifier, ())
logger and self.logger.debug('identifier plugins registered: %s' %
(candidates,))
plugins = match_classification(IIdentifier, candidates,
self.classification)
logger and self.logger.debug(
'identifier plugins matched for '
'classification "%s": %s' % (self.classification, plugins))
results = []
for plugin in plugins:
identity = plugin.identify(self.environ)
if identity is not None:
logger and logger.debug(
'identity returned from %s: %s' % (plugin, identity))
results.append((plugin, identity))
else:
logger and logger.debug(
'no identity returned from %s (%s)' % (plugin, identity))
logger and logger.debug('identities found: %s' % (results,))
return results
def _authenticate(self, identities):
""" See IAPI.
"""
logger = self.logger
candidates = self.interface_registry.get(IAuthenticator, [])
logger and self.logger.debug('authenticator plugins registered: %s' %
candidates)
plugins = match_classification(IAuthenticator, candidates,
self.classification)
logger and self.logger.debug(
'authenticator plugins matched for '
'classification "%s": %s' % (self.classification, plugins))
auth_rank = 0
results = []
for plugin in plugins:
identifier_rank = 0
for identifier, identity in identities:
userid = plugin.authenticate(self.environ, identity)
if userid is not None:
logger and logger.debug(
'userid returned from %s: "%s"' % (plugin, userid))
# stamp the identity with the userid
identity['repoze.who.userid'] = userid
rank = (auth_rank, identifier_rank)
results.append(
(rank, plugin, identifier, identity, userid)
)
else:
logger and logger.debug(
'no userid returned from %s: (%s)' % (
plugin, userid))
identifier_rank += 1
auth_rank += 1
logger and logger.debug('identities authenticated: %s' % (results,))
return results
def _add_metadata(self, identity):
""" See IAPI.
"""
candidates = self.interface_registry.get(IMetadataProvider, ())
plugins = match_classification(IMetadataProvider, candidates,
self.classification)
for plugin in plugins:
plugin.add_metadata(self.environ, identity)
class Identity(dict):
""" dict subclass: prevent members from being rendered during print
"""
def __repr__(self):
return '<repoze.who identity (hidden, dict-like) at %s>' % id(self)
__str__ = __repr__
|