/usr/lib/python3/dist-packages/social/tests/backends/open_id.py is in python3-social-auth 1:0.2.21+dfsg-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 | # -*- coding: utf-8 -*-
import datetime
import json
import sys
from calendar import timegm
import jwt
import requests
from openid import oidutil
PY3 = sys.version_info[0] == 3
if PY3:
from html.parser import HTMLParser
HTMLParser # placate pyflakes
else:
from HTMLParser import HTMLParser
from httpretty import HTTPretty
sys.path.insert(0, '..')
from social.utils import parse_qs, module_member
from social.backends.utils import load_backends
from social.exceptions import AuthTokenError
from social.tests.backends.base import BaseBackendTest
from social.tests.models import TestStorage, User, TestUserSocialAuth, \
TestNonce, TestAssociation
from social.tests.strategy import TestStrategy
# Patch to remove the too-verbose output until a new version is released
oidutil.log = lambda *args, **kwargs: None
class FormHTMLParser(HTMLParser):
form = {}
inputs = {}
def handle_starttag(self, tag, attrs):
attrs = dict(attrs)
if tag == 'form':
self.form.update(attrs)
elif tag == 'input' and 'name' in attrs:
self.inputs[attrs['name']] = attrs['value']
class OpenIdTest(BaseBackendTest):
backend_path = None
backend = None
access_token_body = None
user_data_body = None
user_data_url = ''
expected_username = ''
settings = None
partial_login_settings = None
raw_complete_url = '/complete/{0}/'
def setUp(self):
HTTPretty.enable()
Backend = module_member(self.backend_path)
self.strategy = TestStrategy(TestStorage)
self.complete_url = self.raw_complete_url.format(Backend.name)
self.backend = Backend(self.strategy, redirect_uri=self.complete_url)
self.strategy.set_settings({
'SOCIAL_AUTH_AUTHENTICATION_BACKENDS': (
self.backend_path,
'social.tests.backends.test_broken.BrokenBackendAuth'
)
})
# Force backends loading to trash PSA cache
load_backends(
self.strategy.get_setting('SOCIAL_AUTH_AUTHENTICATION_BACKENDS'),
force_load=True
)
def tearDown(self):
self.strategy = None
User.reset_cache()
TestUserSocialAuth.reset_cache()
TestNonce.reset_cache()
TestAssociation.reset_cache()
HTTPretty.disable()
def get_form_data(self, html):
parser = FormHTMLParser()
parser.feed(html)
return parser.form, parser.inputs
def openid_url(self):
return self.backend.openid_url()
def post_start(self):
pass
def do_start(self):
HTTPretty.register_uri(HTTPretty.GET,
self.openid_url(),
status=200,
body=self.discovery_body,
content_type='application/xrds+xml')
start = self.backend.start()
self.post_start()
form, inputs = self.get_form_data(start)
HTTPretty.register_uri(HTTPretty.POST,
form.get('action'),
status=200,
body=self.server_response)
response = requests.post(form.get('action'), data=inputs)
self.strategy.set_request_data(parse_qs(response.content),
self.backend)
HTTPretty.register_uri(HTTPretty.POST,
form.get('action'),
status=200,
body='is_valid:true\n')
return self.backend.complete()
class OpenIdConnectTestMixin(object):
"""
Mixin to test OpenID Connect consumers. Inheriting classes should also
inherit OAuth2Test.
"""
client_key = 'a-key'
client_secret = 'a-secret-key'
issuer = None # id_token issuer
id_token_max_age = 600 # seconds
def extra_settings(self):
settings = super(OpenIdConnectTestMixin, self).extra_settings()
settings.update({
'SOCIAL_AUTH_{0}_KEY'.format(self.name): self.client_key,
'SOCIAL_AUTH_{0}_SECRET'.format(self.name): self.client_secret,
'SOCIAL_AUTH_{0}_ID_TOKEN_DECRYPTION_KEY'.format(self.name):
self.client_secret,
'SOCIAL_AUTH_{0}_ID_TOKEN_MAX_AGE'.format(self.name):
self.id_token_max_age
})
return settings
def access_token_body(self, request, _url, headers):
"""
Get the nonce from the request parameters, add it to the id_token, and
return the complete response.
"""
nonce = parse_qs(request.body).get('nonce')
body = self.prepare_access_token_body(nonce=nonce)
return 200, headers, body
def get_id_token(self, client_key=None, expiration_datetime=None,
issue_datetime=None, nonce=None, issuer=None):
"""
Return the id_token to be added to the access token body.
"""
id_token = {
'iss': issuer,
'nonce': nonce,
'aud': client_key,
'azp': client_key,
'exp': expiration_datetime,
'iat': issue_datetime,
'sub': '1234'
}
return id_token
def prepare_access_token_body(self, client_key=None, client_secret=None,
expiration_datetime=None,
issue_datetime=None, nonce=None,
issuer=None):
"""
Prepares a provider access token response. Arguments:
client_id -- (str) OAuth ID for the client that requested
authentication.
client_secret -- (str) OAuth secret for the client that requested
authentication.
expiration_time -- (datetime) Date and time after which the response
should be considered invalid.
"""
body = {'access_token': 'foobar', 'token_type': 'bearer'}
client_key = client_key or self.client_key
client_secret = client_secret or self.client_secret
now = datetime.datetime.utcnow()
expiration_datetime = expiration_datetime or \
(now + datetime.timedelta(seconds=30))
issue_datetime = issue_datetime or now
nonce = nonce or 'a-nonce'
issuer = issuer or self.issuer
id_token = self.get_id_token(
client_key, timegm(expiration_datetime.utctimetuple()),
timegm(issue_datetime.utctimetuple()), nonce, issuer)
body['id_token'] = jwt.encode(id_token, client_secret,
algorithm='HS256').decode('utf-8')
return json.dumps(body)
def authtoken_raised(self, expected_message_regexp, **access_token_kwargs):
self.access_token_body = self.prepare_access_token_body(
**access_token_kwargs
)
with self.assertRaisesRegexp(AuthTokenError, expected_message_regexp):
self.do_login()
def test_invalid_secret(self):
self.authtoken_raised(
'Token error: Signature verification failed',
client_secret='wrong!'
)
def test_expired_signature(self):
expiration_datetime = datetime.datetime.utcnow() - \
datetime.timedelta(seconds=30)
self.authtoken_raised('Token error: Signature has expired',
expiration_datetime=expiration_datetime)
def test_invalid_issuer(self):
self.authtoken_raised('Token error: Invalid issuer',
issuer='someone-else')
def test_invalid_audience(self):
self.authtoken_raised('Token error: Invalid audience',
client_key='someone-else')
def test_invalid_issue_time(self):
issue_datetime = datetime.datetime.utcnow() - \
datetime.timedelta(seconds=self.id_token_max_age + 1)
self.authtoken_raised('Token error: Incorrect id_token: iat',
issue_datetime=issue_datetime)
def test_invalid_nonce(self):
self.authtoken_raised(
'Token error: Incorrect id_token: nonce',
nonce='something-wrong'
)
|