/usr/lib/python2.7/dist-packages/requests_toolbelt/auth/handler.py is in python-requests-toolbelt 0.7.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | # -*- coding: utf-8 -*-
"""
requests_toolbelt.auth.handler
==============================
This holds all of the implementation details of the Authentication Handler.
"""
from requests.auth import AuthBase, HTTPBasicAuth
from requests.compat import urlparse, urlunparse
class AuthHandler(AuthBase):
"""
The ``AuthHandler`` object takes a dictionary of domains paired with
authentication strategies and will use this to determine which credentials
to use when making a request. For example, you could do the following:
.. code-block:: python
from requests import HTTPDigestAuth
from requests_toolbelt.auth.handler import AuthHandler
import requests
auth = AuthHandler({
'https://api.github.com': ('sigmavirus24', 'fakepassword'),
'https://example.com': HTTPDigestAuth('username', 'password')
})
r = requests.get('https://api.github.com/user', auth=auth)
# => <Response [200]>
r = requests.get('https://example.com/some/path', auth=auth)
# => <Response [200]>
s = requests.Session()
s.auth = auth
r = s.get('https://api.github.com/user')
# => <Response [200]>
.. warning::
:class:`requests.auth.HTTPDigestAuth` is not yet thread-safe. If you
use :class:`AuthHandler` across multiple threads you should
instantiate a new AuthHandler for each thread with a new
HTTPDigestAuth instance for each thread.
"""
def __init__(self, strategies):
self.strategies = dict(strategies)
self._make_uniform()
def __call__(self, request):
auth = self.get_strategy_for(request.url)
return auth(request)
def __repr__(self):
return '<AuthHandler({0!r})>'.format(self.strategies)
def _make_uniform(self):
existing_strategies = list(self.strategies.items())
self.strategies = {}
for (k, v) in existing_strategies:
self.add_strategy(k, v)
@staticmethod
def _key_from_url(url):
parsed = urlparse(url)
return urlunparse((parsed.scheme.lower(),
parsed.netloc.lower(),
'', '', '', ''))
def add_strategy(self, domain, strategy):
"""Add a new domain and authentication strategy.
:param str domain: The domain you wish to match against. For example:
``'https://api.github.com'``
:param str strategy: The authentication strategy you wish to use for
that domain. For example: ``('username', 'password')`` or
``requests.HTTPDigestAuth('username', 'password')``
.. code-block:: python
a = AuthHandler({})
a.add_strategy('https://api.github.com', ('username', 'password'))
"""
# Turn tuples into Basic Authentication objects
if isinstance(strategy, tuple):
strategy = HTTPBasicAuth(*strategy)
key = self._key_from_url(domain)
self.strategies[key] = strategy
def get_strategy_for(self, url):
"""Retrieve the authentication strategy for a specified URL.
:param str url: The full URL you will be making a request against. For
example, ``'https://api.github.com/user'``
:returns: Callable that adds authentication to a request.
.. code-block:: python
import requests
a = AuthHandler({'example.com', ('foo', 'bar')})
strategy = a.get_strategy_for('http://example.com/example')
assert isinstance(strategy, requests.auth.HTTPBasicAuth)
"""
key = self._key_from_url(url)
return self.strategies.get(key, NullAuthStrategy())
def remove_strategy(self, domain):
"""Remove the domain and strategy from the collection of strategies.
:param str domain: The domain you wish remove. For example,
``'https://api.github.com'``.
.. code-block:: python
a = AuthHandler({'example.com', ('foo', 'bar')})
a.remove_strategy('example.com')
assert a.strategies == {}
"""
key = self._key_from_url(domain)
if key in self.strategies:
del self.strategies[key]
class NullAuthStrategy(AuthBase):
def __repr__(self):
return '<NullAuthStrategy>'
def __call__(self, r):
return r
|