/usr/lib/python2.7/dist-packages/ubuntu-kylin-sso-client/ubuntu_kylin_sso/tests/test_account.py is in python-ubuntu-kylin-sso-client.tests 0.1.2.3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 | # -*- coding: utf-8 -*-
#
# Author: Natalia Bidart <natalia.bidart@canonical.com>
# Author: Alejandro J. Cura <alecu@canonical.com>
#
# Copyright 2010-2012 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
#
# In addition, as a special exception, the copyright holders give
# permission to link the code of portions of this program with the
# OpenSSL library under certain conditions as described in each
# individual source file, and distribute linked combinations
# including the two.
# You must obey the GNU General Public License in all respects
# for all of the code used other than OpenSSL. If you modify
# file(s) with this exception, you may extend this exception to your
# version of the file(s), but you are not obligated to do so. If you
# do not wish to do so, delete this exception statement from your
# version. If you delete this exception statement from all source
# files in the program, then also delete it here.
"""Tests for the SSO account code."""
from __future__ import unicode_literals
import os
# pylint: disable=W0621,F0401,E0611
try:
import urllib.request as url_lib
except ImportError:
import urllib2 as url_lib
# pylint: enable=W0621,F0401,E0611
from twisted.trial.unittest import TestCase
from twisted.internet import defer
from ubuntu_kylin_sso import account
from ubuntu_kylin_sso.account import (
Account,
AuthenticationError,
EmailTokenError,
InvalidEmailError,
InvalidPasswordError,
NewPasswordError,
SERVICE_URL,
RegistrationError,
ResetPasswordTokenError,
SSO_STATUS_OK,
SSO_STATUS_ERROR,
)
from ubuntu_kylin_sso.tests import (
APP_NAME,
CAPTCHA_ID,
CAPTCHA_PATH,
CAPTCHA_SOLUTION,
EMAIL,
EMAIL_TOKEN,
NAME,
PASSWORD,
RESET_PASSWORD_TOKEN,
TOKEN,
TOKEN_NAME,
)
from ubuntu_kylin_sso.utils import compat
CANT_RESET_PASSWORD_CONTENT = "CanNotResetPassowrdError: " \
"Can't reset password for this account"
RESET_TOKEN_INVALID_CONTENT = "AuthToken matching query does not exist."
EMAIL_ALREADY_REGISTERED = 'a@example.com'
STATUS_UNKNOWN = {'status': 'yadda-yadda'}
STATUS_ERROR = {'status': SSO_STATUS_ERROR,
'errors': {'something': ['Bla', 'Ble']}}
STATUS_OK = {'status': SSO_STATUS_OK}
STATUS_EMAIL_UNKNOWN = {'status': 'yadda-yadda'}
STATUS_EMAIL_ERROR = {'errors': {'email_token': ['Error1', 'Error2']}}
STATUS_EMAIL_OK = {'email': EMAIL}
FAKE_NEW_CAPTCHA = {
'image_url': "file://" + compat.text_type(CAPTCHA_PATH),
'captcha_id': CAPTCHA_ID,
}
class FakeWebClientResponse(object):
"""A fake webclient.Response."""
content = open(CAPTCHA_PATH, "rb").read()
class FakeWebClient(object):
"""A fake webclient."""
def __init__(self):
self.started = True
def request(self, url):
"""Do a fake request, return a fake Response."""
return FakeWebClientResponse()
def shutdown(self):
"""Turn off this webclient."""
self.started = False
class FakeRestfulClient(object):
"""A fake restfulclient."""
preferred_email = EMAIL
def __init__(self):
self.started = True
def shutdown(self):
"""Stop this restfulclient."""
self.started = False
def fake_captchas_new(self):
"""Return a local fake captcha."""
return FAKE_NEW_CAPTCHA
def fake_registration_register(self, email, password, displayname,
captcha_id, captcha_solution):
"""Fake registration. Return a fix result."""
if email == EMAIL_ALREADY_REGISTERED:
return {'status': SSO_STATUS_ERROR,
'errors': {'email': 'Email already registered'}}
elif captcha_id is None and captcha_solution is None:
return STATUS_UNKNOWN
elif captcha_id != CAPTCHA_ID or captcha_solution != CAPTCHA_SOLUTION:
return STATUS_ERROR
else:
return STATUS_OK
def fake_registration_request_password_reset_token(self, email):
"""Fake password reset token. Return a fix result."""
if email is None:
return STATUS_UNKNOWN
elif email != EMAIL:
raise account.WebClientError("Misc error",
CANT_RESET_PASSWORD_CONTENT)
else:
return STATUS_OK
def fake_registration_set_new_password(self, email, token, new_password):
"""Fake the setting of new password. Return a fix result."""
if email is None and token is None and new_password is None:
return STATUS_UNKNOWN
elif email != EMAIL or token != RESET_PASSWORD_TOKEN:
raise account.WebClientError("Misc error",
RESET_TOKEN_INVALID_CONTENT)
else:
return STATUS_OK
def fake_authentications_authenticate(self, token_name):
"""Fake authenticate. Return a fix result."""
if not token_name.startswith(TOKEN_NAME):
raise account.WebClientError()
else:
return TOKEN
def fake_accounts_validate_email(self, email_token):
"""Fake the email validation. Return a fix result."""
if email_token is None:
return STATUS_EMAIL_UNKNOWN
elif email_token == EMAIL_ALREADY_REGISTERED:
return {
'status': SSO_STATUS_ERROR,
'errors': {'email': 'Email already registered'}
}
elif email_token != EMAIL_TOKEN:
return STATUS_EMAIL_ERROR
else:
return STATUS_EMAIL_OK
def fake_accounts_me(self):
"""Fake the 'me' information."""
return {'username': 'Wh46bKY',
'preferred_email': self.preferred_email,
'displayname': '',
'unverified_emails': ['aaaaaa@example.com'],
'verified_emails': [],
'openid_identifier': 'Wh46bKY'}
def check_all_kwargs_unicode(self, **kwargs):
"""Check that the values of all keyword arguments are unicode."""
for (key, value) in kwargs.items():
if isinstance(value, compat.binary_type):
raise AssertionError("Error: kwarg '%s' is non-unicode." % key)
def restcall(self, method_name, **kwargs):
"""Fake an async restcall."""
self.check_all_kwargs_unicode(**kwargs)
method = getattr(self, "fake_" + method_name.replace(".", "_"))
try:
return defer.succeed(method(**kwargs))
# pylint: disable=W0703
except Exception as e:
return defer.fail(e)
class AccountTestCase(TestCase):
"""Test suite for the SSO login processor."""
@defer.inlineCallbacks
def setUp(self):
"""Set up."""
yield super(AccountTestCase, self).setUp()
def fake_urlopen(url):
"""Fake an urlopen which will read from the disk."""
f = open(url)
self.addCleanup(f.close)
return f
self.patch(url_lib, 'urlopen', fake_urlopen) # fd to the path
self.processor = Account()
self.register_kwargs = dict(email=EMAIL, password=PASSWORD,
displayname=NAME,
captcha_id=CAPTCHA_ID,
captcha_solution=CAPTCHA_SOLUTION)
self.login_kwargs = dict(email=EMAIL, password=PASSWORD,
token_name=TOKEN_NAME)
self.frc = FakeRestfulClient()
self.patch(account.restful, "RestfulClient",
lambda *args, **kwargs: self.frc)
self.addCleanup(self.verify_frc_shutdown)
def verify_frc_shutdown(self):
"""Verify that the FakeRestfulClient was stopped."""
assert self.frc.started is False, "Restfulclient must be shut down."
@defer.inlineCallbacks
def test_generate_captcha(self):
"""Captcha can be generated."""
filename = self.mktemp()
self.addCleanup(lambda: os.remove(filename)
if os.path.exists(filename) else None)
wc = FakeWebClient()
self.patch(account.webclient, "webclient_factory", lambda: wc)
captcha_id = yield self.processor.generate_captcha(filename)
self.assertEqual(CAPTCHA_ID, captcha_id, 'captcha id must be correct.')
self.assertTrue(os.path.isfile(filename), '%s must exist.' % filename)
with open(CAPTCHA_PATH) as f:
expected = f.read()
with open(filename) as f:
actual = f.read()
self.assertEqual(expected, actual, 'captcha image must be correct.')
self.assertFalse(wc.started, "Webclient must be shut down.")
@defer.inlineCallbacks
def test_register_user_checks_valid_email(self):
"""Email is validated."""
self.register_kwargs['email'] = 'notavalidemail'
d = self.processor.register_user(**self.register_kwargs)
yield self.assertFailure(d, InvalidEmailError)
@defer.inlineCallbacks
def test_register_user_checks_valid_password(self):
"""Password is validated."""
self.register_kwargs['password'] = ''
d = self.processor.register_user(**self.register_kwargs)
yield self.assertFailure(d, InvalidPasswordError)
# 7 chars, one less than expected
self.register_kwargs['password'] = 'tesT3it'
d = self.processor.register_user(**self.register_kwargs)
yield self.assertFailure(d, InvalidPasswordError)
self.register_kwargs['password'] = 'test3it!' # no upper case
d = self.processor.register_user(**self.register_kwargs)
yield self.assertFailure(d, InvalidPasswordError)
self.register_kwargs['password'] = 'testIt!!' # no number
d = self.processor.register_user(**self.register_kwargs)
yield self.assertFailure(d, InvalidPasswordError)
# register
@defer.inlineCallbacks
def test_register_user_if_status_ok(self):
"""A user is succesfuy registered into the SSO server."""
result = yield self.processor.register_user(**self.register_kwargs)
self.assertEqual(EMAIL, result, 'registration was successful.')
@defer.inlineCallbacks
def test_register_user_if_status_error(self):
"""Proper error is raised if register fails."""
self.register_kwargs['captcha_id'] = CAPTCHA_ID * 2 # incorrect
d = self.processor.register_user(**self.register_kwargs)
failure = yield self.assertFailure(d, RegistrationError)
for k, val in failure.args[0].items():
self.assertIn(k, STATUS_ERROR['errors'])
self.assertEqual(val, "\n".join(STATUS_ERROR['errors'][k]))
@defer.inlineCallbacks
def test_register_user_if_status_error_with_string_message(self):
"""Proper error is raised if register fails."""
self.register_kwargs['email'] = EMAIL_ALREADY_REGISTERED
d = self.processor.register_user(**self.register_kwargs)
failure = yield self.assertFailure(d, RegistrationError)
for k, val in failure.args[0].items():
self.assertIn(k, {'email': 'Email already registered'})
self.assertEqual(val, 'Email already registered')
@defer.inlineCallbacks
def test_register_user_if_status_unknown(self):
"""Proper error is raised if register returns an unknown status."""
self.register_kwargs['captcha_id'] = None
self.register_kwargs['captcha_solution'] = None
d = self.processor.register_user(**self.register_kwargs)
failure = yield self.assertFailure(d, RegistrationError)
self.assertIn('Received unknown status: %s' % STATUS_UNKNOWN, failure)
# login
@defer.inlineCallbacks
def test_login_if_http_error(self):
"""Proper error is raised if authentication fails."""
# use an invalid token name
self.login_kwargs['token_name'] = APP_NAME * 2
d = self.processor.login(**self.login_kwargs)
yield self.assertFailure(d, AuthenticationError)
@defer.inlineCallbacks
def test_login_if_no_error(self):
"""A user can be succesfully logged in into the SSO service."""
result = yield self.processor.login(**self.login_kwargs)
self.assertEqual(TOKEN, result, 'authentication was successful.')
# is_validated
@defer.inlineCallbacks
def test_is_validated(self):
"""If preferred email is not None, user is validated."""
result = yield self.processor.is_validated(token=TOKEN)
self.assertTrue(result, 'user must be validated.')
@defer.inlineCallbacks
def test_is_not_validated(self):
"""If preferred email is None, user is not validated."""
self.frc.preferred_email = None
result = yield self.processor.is_validated(token=TOKEN)
self.assertFalse(result, 'user must not be validated.')
@defer.inlineCallbacks
def test_is_not_validated_empty_result(self):
"""If preferred email is None, user is not validated."""
self.patch(self.frc, "fake_accounts_me", lambda *args: {})
result = yield self.processor.is_validated(token=TOKEN)
self.assertFalse(result, 'user must not be validated.')
# validate_email
@defer.inlineCallbacks
def test_validate_email_if_status_ok(self):
"""A email is succesfuy validated in the SSO server."""
self.login_kwargs['email_token'] = EMAIL_TOKEN # valid email token
result = yield self.processor.validate_email(**self.login_kwargs)
self.assertEqual(TOKEN, result, 'email validation was successful.')
@defer.inlineCallbacks
def test_validate_email_if_status_error(self):
"""Proper error is raised if email validation fails."""
self.login_kwargs['email_token'] = EMAIL_TOKEN * 2 # invalid token
d = self.processor.validate_email(**self.login_kwargs)
failure = yield self.assertFailure(d, EmailTokenError)
for k, val in failure.args[0].items():
self.assertIn(k, STATUS_EMAIL_ERROR['errors'])
self.assertEqual(val, "\n".join(STATUS_EMAIL_ERROR['errors'][k]))
@defer.inlineCallbacks
def test_validate_email_if_status_error_with_string_message(self):
"""Proper error is raised if register fails."""
self.login_kwargs['email_token'] = EMAIL_ALREADY_REGISTERED
d = self.processor.validate_email(**self.login_kwargs)
failure = yield self.assertFailure(d, EmailTokenError)
for k, val in failure.args[0].items():
self.assertIn(k, {'email': 'Email already registered'})
self.assertEqual(val, 'Email already registered')
@defer.inlineCallbacks
def test_validate_email_if_status_unknown(self):
"""Proper error is raised if email validation returns unknown."""
self.login_kwargs['email_token'] = None
d = self.processor.validate_email(**self.login_kwargs)
failure = yield self.assertFailure(d, EmailTokenError)
self.assertIn('Received invalid reply: %s' % STATUS_UNKNOWN, failure)
# reset_password
@defer.inlineCallbacks
def test_request_password_reset_token_if_status_ok(self):
"""A reset password token is succesfuly sent."""
result = yield self.processor.request_password_reset_token(email=EMAIL)
self.assertEqual(EMAIL, result,
'password reset token must be successful.')
@defer.inlineCallbacks
def test_request_password_reset_token_if_http_error(self):
"""Proper error is raised if password token request fails."""
d = self.processor.request_password_reset_token(email=EMAIL * 2)
exc = yield self.assertFailure(d, ResetPasswordTokenError)
self.assertIn(CANT_RESET_PASSWORD_CONTENT, exc)
@defer.inlineCallbacks
def test_request_password_reset_token_if_status_unknown(self):
"""Proper error is raised if password token request returns unknown."""
d = self.processor.request_password_reset_token(email=None)
exc = yield self.assertFailure(d, ResetPasswordTokenError)
self.assertIn('Received invalid reply: %s' % STATUS_UNKNOWN, exc)
@defer.inlineCallbacks
def test_set_new_password_if_status_ok(self):
"""A new password is succesfuy set."""
result = yield self.processor.set_new_password(email=EMAIL,
token=RESET_PASSWORD_TOKEN,
new_password=PASSWORD)
self.assertEqual(EMAIL, result,
'new password must be set successfully.')
@defer.inlineCallbacks
def test_set_new_password_if_http_error(self):
"""Proper error is raised if setting a new password fails."""
d = self.processor.set_new_password(email=EMAIL * 2,
token=RESET_PASSWORD_TOKEN * 2,
new_password=PASSWORD)
exc = yield self.assertFailure(d, NewPasswordError)
self.assertIn(RESET_TOKEN_INVALID_CONTENT, exc)
@defer.inlineCallbacks
def test_set_new_password_if_status_unknown(self):
"""Proper error is raised if setting a new password returns unknown."""
d = self.processor.set_new_password(email=None, token=None,
new_password=None)
exc = yield self.assertFailure(d, NewPasswordError)
self.assertIn('Received invalid reply: %s' % STATUS_UNKNOWN, exc)
class EnvironOverridesTestCase(TestCase):
"""Some URLs can be set from the environment for testing/QA purposes."""
def test_override_service_url(self):
"""The service url can be set from the env var USSOC_SERVICE_URL."""
fake_url = 'this is not really a URL, but ends with slash: /'
old_url = os.environ.get('USSOC_SERVICE_URL')
os.environ['USSOC_SERVICE_URL'] = fake_url
try:
proc = Account()
self.assertEqual(proc.service_url, fake_url)
finally:
if old_url:
os.environ['USSOC_SERVICE_URL'] = old_url
else:
del os.environ['USSOC_SERVICE_URL']
def test_no_override_service_url(self):
"""If the environ is unset, the default service url is used."""
proc = Account()
self.assertEqual(proc.service_url, SERVICE_URL)
def test_service_url_as_parameter(self):
"""If the parameter service url is given, is used."""
expected = 'http://foo/bar/baz/'
proc = Account(service_url=expected)
self.assertEqual(proc.service_url, expected)
|