This file is indexed.

/usr/lib/python2.7/dist-packages/requests_toolbelt/auth/guess.py is in python-requests-toolbelt 0.7.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# -*- coding: utf-8 -*-
"""The module containing the code for GuessAuth."""
from requests import auth
from requests import cookies

from . import _digest_auth_compat as auth_compat, http_proxy_digest


class GuessAuth(auth.AuthBase):
    """Guesses the auth type by the WWW-Authentication header."""
    def __init__(self, username, password):
        self.username = username
        self.password = password
        self.auth = None
        self.pos = None

    def _handle_basic_auth_401(self, r, kwargs):
        if self.pos is not None:
            r.request.body.seek(self.pos)

        # Consume content and release the original connection
        # to allow our new request to reuse the same one.
        r.content
        r.raw.release_conn()
        prep = r.request.copy()
        if not hasattr(prep, '_cookies'):
            prep._cookies = cookies.RequestsCookieJar()
        cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw)
        prep.prepare_cookies(prep._cookies)

        self.auth = auth.HTTPBasicAuth(self.username, self.password)
        prep = self.auth(prep)
        _r = r.connection.send(prep, **kwargs)
        _r.history.append(r)
        _r.request = prep

        return _r

    def _handle_digest_auth_401(self, r, kwargs):
        self.auth = auth_compat.HTTPDigestAuth(self.username, self.password)
        try:
            self.auth.init_per_thread_state()
        except AttributeError:
            # If we're not on requests 2.8.0+ this method does not exist and
            # is not relevant.
            pass

        # Check that the attr exists because much older versions of requests
        # set this attribute lazily. For example:
        # https://github.com/kennethreitz/requests/blob/33735480f77891754304e7f13e3cdf83aaaa76aa/requests/auth.py#L59
        if (hasattr(self.auth, 'num_401_calls') and
                self.auth.num_401_calls is None):
            self.auth.num_401_calls = 1
        # Digest auth would resend the request by itself. We can take a
        # shortcut here.
        return self.auth.handle_401(r, **kwargs)

    def handle_401(self, r, **kwargs):
        """Resends a request with auth headers, if needed."""

        www_authenticate = r.headers.get('www-authenticate', '').lower()

        if 'basic' in www_authenticate:
            return self._handle_basic_auth_401(r, kwargs)

        if 'digest' in www_authenticate:
            return self._handle_digest_auth_401(r, kwargs)

    def __call__(self, request):
        if self.auth is not None:
            return self.auth(request)

        try:
            self.pos = request.body.tell()
        except AttributeError:
            pass

        request.register_hook('response', self.handle_401)
        return request


class GuessProxyAuth(GuessAuth):
    """
    Guesses the auth type by WWW-Authentication and Proxy-Authentication
    headers
    """
    def __init__(self, username=None, password=None,
                 proxy_username=None, proxy_password=None):
        super(GuessProxyAuth, self).__init__(username, password)
        self.proxy_username = proxy_username
        self.proxy_password = proxy_password
        self.proxy_auth = None

    def _handle_basic_auth_407(self, r, kwargs):
        if self.pos is not None:
            r.request.body.seek(self.pos)

        r.content
        r.raw.release_conn()
        prep = r.request.copy()
        if not hasattr(prep, '_cookies'):
            prep._cookies = cookies.RequestsCookieJar()
        cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw)
        prep.prepare_cookies(prep._cookies)

        self.proxy_auth = auth.HTTPProxyAuth(self.proxy_username,
                                             self.proxy_password)
        prep = self.proxy_auth(prep)
        _r = r.connection.send(prep, **kwargs)
        _r.history.append(r)
        _r.request = prep

        return _r

    def _handle_digest_auth_407(self, r, kwargs):
        self.proxy_auth = http_proxy_digest.HTTPProxyDigestAuth(
            username=self.proxy_username,
            password=self.proxy_password)

        try:
            self.auth.init_per_thread_state()
        except AttributeError:
            pass

        return self.proxy_auth.handle_407(r, **kwargs)

    def handle_407(self, r, **kwargs):
        proxy_authenticate = r.headers.get('Proxy-Authenticate', '').lower()

        if 'basic' in proxy_authenticate:
            return self._handle_basic_auth_407(r, kwargs)

        if 'digest' in proxy_authenticate:
            return self._handle_digest_auth_407(r, kwargs)

    def __call__(self, request):
        if self.proxy_auth is not None:
            request = self.proxy_auth(request)

        try:
            self.pos = request.body.tell()
        except AttributeError:
            pass

        request.register_hook('response', self.handle_407)
        return super(GuessProxyAuth, self).__call__(request)