/usr/share/pyshared/repoze/who/middleware.py is in python-repoze.who 1.0.18-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 | import logging
from StringIO import StringIO
import sys
from repoze.who.interfaces import IIdentifier
from repoze.who.interfaces import IAuthenticator
from repoze.who.interfaces import IChallenger
from repoze.who.interfaces import IMetadataProvider
_STARTED = '-- repoze.who request started (%s) --'
_ENDED = '-- repoze.who request ended (%s) --'
class PluggableAuthenticationMiddleware(object):
def __init__(self, app,
identifiers,
authenticators,
challengers,
mdproviders,
classifier,
challenge_decider,
log_stream = None,
log_level = logging.INFO,
remote_user_key = 'REMOTE_USER',
):
iregistry, nregistry = make_registries(identifiers, authenticators,
challengers, mdproviders)
self.registry = iregistry
self.name_registry = nregistry
self.app = app
self.classifier = classifier
self.challenge_decider = challenge_decider
self.remote_user_key = remote_user_key
self.logger = None
if isinstance(log_stream, logging.Logger):
self.logger = log_stream
elif log_stream:
handler = logging.StreamHandler(log_stream)
fmt = '%(asctime)s %(message)s'
formatter = logging.Formatter(fmt)
handler.setFormatter(formatter)
self.logger = logging.Logger('repoze.who')
self.logger.addHandler(handler)
self.logger.setLevel(log_level)
def __call__(self, environ, start_response):
if self.remote_user_key in environ:
# act as a pass through if REMOTE_USER (or whatever) is
# already set
return self.app(environ, start_response)
path_info = environ.get('PATH_INFO', None)
environ['repoze.who.plugins'] = self.name_registry
environ['repoze.who.logger'] = self.logger
environ['repoze.who.application'] = self.app
logger = self.logger
logger and logger.info(_STARTED % path_info)
classification = self.classifier(environ)
logger and logger.info('request classification: %s' % classification)
userid = None
identity = None
identifier = None
ids = self.identify(environ, classification)
# ids will be list of tuples: [ (IIdentifier, identity) ]
if ids:
auth_ids = self.authenticate(environ, classification, ids)
# auth_ids will be a list of five-tuples in the form
# ( (auth_rank, id_rank), authenticator, identifier, identity,
# userid )
#
# When sorted, its first element will represent the "best"
# identity for this request.
if auth_ids:
auth_ids.sort()
best = auth_ids[0]
rank, authenticator, identifier, identity, userid = best
identity = Identity(identity) # dont show contents at print
# allow IMetadataProvider plugins to scribble on the identity
self.add_metadata(environ, classification, identity)
# add the identity to the environment; a downstream
# application can mutate it to do an 'identity reset'
# as necessary, e.g. identity['login'] = 'foo',
# identity['password'] = 'bar'
environ['repoze.who.identity'] = identity
# set the REMOTE_USER
environ[self.remote_user_key] = userid
else:
logger and logger.info('no identities found, not authenticating')
# allow identifier plugins to replace the downstream
# application (to do redirection and unauthorized themselves
# mostly)
app = environ.pop('repoze.who.application')
if app is not self.app:
logger and logger.info(
'static downstream application replaced with %s' % app)
wrapper = StartResponseWrapper(start_response)
app_iter = app(environ, wrapper.wrap_start_response)
# The challenge decider almost(?) always needs information from the
# response. The WSGI spec (PEP 333) states that a WSGI application
# must call start_response by the iterable's first iteration. If
# start_response hasn't been called, we'll wrap it in a way that
# triggers that call.
if not wrapper.called:
app_iter = wrap_generator(app_iter)
if self.challenge_decider(environ, wrapper.status, wrapper.headers):
logger and logger.info('challenge required')
challenge_app = self.challenge(
environ,
classification,
wrapper.status,
wrapper.headers,
identifier,
identity
)
if challenge_app is not None:
logger and logger.info('executing challenge app')
if app_iter:
list(app_iter) # unwind the original app iterator
# replace the downstream app with the challenge app
app_iter = challenge_app(environ, start_response)
else:
logger and logger.info('configuration error: no challengers')
raise RuntimeError('no challengers found')
else:
logger and logger.info('no challenge required')
remember_headers = []
if identifier:
remember_headers = identifier.remember(environ, identity)
if remember_headers:
logger and logger.info('remembering via headers from %s: %s'
% (identifier, remember_headers))
wrapper.finish_response(remember_headers)
logger and logger.info(_ENDED % path_info)
return app_iter
def identify(self, environ, classification):
logger = self.logger
candidates = self.registry.get(IIdentifier, ())
logger and self.logger.info('identifier plugins registered %s' %
(candidates,))
plugins = match_classification(IIdentifier, candidates, classification)
logger and self.logger.info(
'identifier plugins matched for '
'classification "%s": %s' % (classification, plugins))
results = []
for plugin in plugins:
identity = plugin.identify(environ)
if identity is not None:
logger and logger.debug(
'identity returned from %s: %s' % (plugin, identity))
results.append((plugin, identity))
else:
logger and logger.debug(
'no identity returned from %s (%s)' % (plugin, identity))
logger and logger.debug('identities found: %s' % (results,))
return results
def add_metadata(self, environ, classification, identity):
candidates = self.registry.get(IMetadataProvider, ())
plugins = match_classification(IMetadataProvider, candidates,
classification)
for plugin in plugins:
plugin.add_metadata(environ, identity)
def authenticate(self, environ, classification, identities):
logger = self.logger
candidates = self.registry.get(IAuthenticator, [])
logger and self.logger.info('authenticator plugins registered %s' %
candidates)
plugins = match_classification(IAuthenticator, candidates,
classification)
logger and self.logger.info(
'authenticator plugins matched for '
'classification "%s": %s' % (classification, plugins))
# 'preauthenticated' identities are considered best-ranking
identities, results, id_rank_start =self._filter_preauthenticated(
identities)
auth_rank = 0
for plugin in plugins:
identifier_rank = id_rank_start
for identifier, identity in identities:
userid = plugin.authenticate(environ, identity)
if userid is not None:
logger and logger.debug(
'userid returned from %s: "%s"' % (plugin, userid))
# stamp the identity with the userid
identity['repoze.who.userid'] = userid
rank = (auth_rank, identifier_rank)
results.append(
(rank, plugin, identifier, identity, userid)
)
else:
logger and logger.debug(
'no userid returned from %s: (%s)' % (
plugin, userid))
identifier_rank += 1
auth_rank += 1
logger and logger.debug('identities authenticated: %s' % (results,))
return results
def _filter_preauthenticated(self, identities):
logger = self.logger
results = []
new_identities = identities[:]
identifier_rank = 0
for thing in identities:
identifier, identity = thing
userid = identity.get('repoze.who.userid')
if userid is not None:
# the identifier plugin has already authenticated this
# user (domain auth, auth ticket, etc)
logger and logger.info(
'userid preauthenticated by %s: "%s" '
'(repoze.who.userid set)' % (identifier, userid)
)
rank = (0, identifier_rank)
results.append(
(rank, None, identifier, identity, userid)
)
identifier_rank += 1
new_identities.remove(thing)
return new_identities, results, identifier_rank
def challenge(self, environ, classification, status, app_headers,
identifier, identity):
# happens on egress
logger = self.logger
forget_headers = []
if identifier:
forget_headers = identifier.forget(environ, identity)
if forget_headers is None:
forget_headers = []
else:
logger and logger.info('forgetting via headers from %s: %s'
% (identifier, forget_headers))
candidates = self.registry.get(IChallenger, ())
logger and logger.info('challengers registered: %s' % candidates)
plugins = match_classification(IChallenger,
candidates, classification)
logger and logger.info('challengers matched for '
'classification "%s": %s' % (classification,
plugins))
for plugin in plugins:
app = plugin.challenge(environ, status, app_headers,
forget_headers)
if app is not None:
# new WSGI application
logger and logger.info(
'challenger plugin %s "challenge" returned an app' % (
plugin))
return app
# signifies no challenge
logger and logger.info('no challenge app returned')
return None
def wrap_generator(result):
"""\
This function returns a generator that behaves exactly the same as the
original. It's only difference is it pulls the first iteration off and
caches it to trigger any immediate side effects (in a WSGI world, this
ensures start_response is called).
"""
# Neat trick to pull the first iteration only. We need to do this outside
# of the generator function to ensure it is called.
for iter in result:
first = iter
break
# Wrapper yields the first iteration, then passes result's iterations
# directly up.
def wrapper():
yield first
for iter in result:
# We'll let result's StopIteration bubble up directly.
yield iter
return wrapper()
def match_classification(iface, plugins, classification):
result = []
for plugin in plugins:
plugin_classifications = getattr(plugin, 'classifications', {})
iface_classifications = plugin_classifications.get(iface)
if not iface_classifications: # good for any
result.append(plugin)
continue
if classification in iface_classifications:
result.append(plugin)
return result
class StartResponseWrapper(object):
def __init__(self, start_response):
self.start_response = start_response
self.status = None
self.headers = []
self.exc_info = None
self.buffer = StringIO()
# A WSGI app may delay calling start_response until the first iteration
# of its generator. We track this so we know whether or not we need to
# trigger an iteration before examining the response.
self.called = False
def wrap_start_response(self, status, headers, exc_info=None):
self.headers = headers
self.status = status
self.exc_info = exc_info
# The response has been initiated, so we have a valid code.
self.called = True
return self.buffer.write
def finish_response(self, extra_headers):
if not extra_headers:
extra_headers = []
headers = self.headers + extra_headers
write = self.start_response(self.status, headers, self.exc_info)
if write:
self.buffer.seek(0)
value = self.buffer.getvalue()
if value:
write(value)
if hasattr(write, 'close'):
write.close()
def make_test_middleware(app, global_conf):
""" Functionally equivalent to
[plugin:form]
use = repoze.who.plugins.form.FormPlugin
rememberer_name = cookie
login_form_qs=__do_login
[plugin:cookie]
use = repoze.who.plugins.cookie:InsecureCookiePlugin
cookie_name = oatmeal
[plugin:basicauth]
use = repoze.who.plugins.basicauth.BasicAuthPlugin
realm = repoze.who
[plugin:htpasswd]
use = repoze.who.plugins.htpasswd.HTPasswdPlugin
filename = <...>
check_fn = repoze.who.plugins.htpasswd:crypt_check
[general]
request_classifier = repoze.who.classifiers:default_request_classifier
challenge_decider = repoze.who.classifiers:default_challenge_decider
[identifiers]
plugins = form:browser cookie basicauth
[authenticators]
plugins = htpasswd
[challengers]
plugins = form:browser basicauth
"""
# be able to test without a config file
from repoze.who.plugins.basicauth import BasicAuthPlugin
from repoze.who.plugins.auth_tkt import AuthTktCookiePlugin
from repoze.who.plugins.cookie import InsecureCookiePlugin
from repoze.who.plugins.form import FormPlugin
from repoze.who.plugins.htpasswd import HTPasswdPlugin
io = StringIO()
salt = 'aa'
for name, password in [ ('admin', 'admin'), ('chris', 'chris') ]:
io.write('%s:%s\n' % (name, password))
io.seek(0)
def cleartext_check(password, hashed):
return password == hashed #pragma NO COVERAGE
htpasswd = HTPasswdPlugin(io, cleartext_check)
basicauth = BasicAuthPlugin('repoze.who')
auth_tkt = AuthTktCookiePlugin('secret', 'auth_tkt')
cookie = InsecureCookiePlugin('oatmeal')
form = FormPlugin('__do_login', rememberer_name='auth_tkt')
form.classifications = { IIdentifier:['browser'],
IChallenger:['browser'] } # only for browser
identifiers = [('form', form),('auth_tkt',auth_tkt),('basicauth',basicauth)]
authenticators = [('htpasswd', htpasswd)]
challengers = [('form',form), ('basicauth',basicauth)]
mdproviders = []
from repoze.who.classifiers import default_request_classifier
from repoze.who.classifiers import default_challenge_decider
log_stream = None
import os
if os.environ.get('WHO_LOG'):
log_stream = sys.stdout
middleware = PluggableAuthenticationMiddleware(
app,
identifiers,
authenticators,
challengers,
mdproviders,
default_request_classifier,
default_challenge_decider,
log_stream = log_stream,
log_level = logging.DEBUG
)
return middleware
def verify(plugin, iface):
from zope.interface.verify import verifyObject
verifyObject(iface, plugin, tentative=True)
def make_registries(identifiers, authenticators, challengers, mdproviders):
from zope.interface.verify import BrokenImplementation
interface_registry = {}
name_registry = {}
for supplied, iface in [ (identifiers, IIdentifier),
(authenticators, IAuthenticator),
(challengers, IChallenger),
(mdproviders, IMetadataProvider)]:
for name, value in supplied:
try:
verify(value, iface)
except BrokenImplementation, why:
why = str(why)
raise ValueError(str(name) + ': ' + why)
L = interface_registry.setdefault(iface, [])
L.append(value)
name_registry[name] = value
return interface_registry, name_registry
class Identity(dict):
""" dict subclass that prevents its members from being rendered
during print """
def __repr__(self):
return '<repoze.who identity (hidden, dict-like) at %s>' % id(self)
__str__ = __repr__
|