/usr/lib/python2.7/dist-packages/etcd/tests/test_auth.py is in python-etcd 0.4.3-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | from etcd.tests.integration.test_simple import EtcdIntegrationTest
from etcd import auth
import etcd
class TestEtcdAuthBase(EtcdIntegrationTest):
cl_size = 1
def setUp(self):
# Sets up the root user, toggles auth
u = auth.EtcdUser(self.client, 'root')
u.password = 'testpass'
u.write()
self.client = etcd.Client(port=6001, username='root',
password='testpass')
self.unauth_client = etcd.Client(port=6001)
a = auth.Auth(self.client)
a.active = True
def tearDown(self):
u = auth.EtcdUser(self.client, 'test_user')
r = auth.EtcdRole(self.client, 'test_role')
try:
u.delete()
except:
pass
try:
r.delete()
except:
pass
a = auth.Auth(self.client)
a.active = False
class EtcdUserTest(TestEtcdAuthBase):
def test_names(self):
u = auth.EtcdUser(self.client, 'test_user')
self.assertEquals(u.names, ['root'])
def test_read(self):
u = auth.EtcdUser(self.client, 'root')
# Reading an existing user succeeds
try:
u.read()
except Exception:
self.fail("reading the root user raised an exception")
# roles for said user are fetched
self.assertEquals(u.roles, set(['root']))
# The user is correctly rendered out
self.assertEquals(u._to_net(), [{'user': 'root', 'password': None,
'roles': ['root']}])
# An inexistent user raises the appropriate exception
u = auth.EtcdUser(self.client, 'user.does.not.exist')
self.assertRaises(etcd.EtcdKeyNotFound, u.read)
# Reading with an unauthenticated client raises an exception
u = auth.EtcdUser(self.unauth_client, 'root')
self.assertRaises(etcd.EtcdInsufficientPermissions, u.read)
# Generic errors are caught
c = etcd.Client(port=9999)
u = auth.EtcdUser(c, 'root')
self.assertRaises(etcd.EtcdException, u.read)
def test_write_and_delete(self):
# Create an user
u = auth.EtcdUser(self.client, 'test_user')
u.roles.add('guest')
u.roles.add('root')
# directly from my suitcase
u.password = '123456'
try:
u.write()
except:
self.fail("creating a user doesn't work")
# Password gets wiped
self.assertEquals(u.password, None)
u.read()
# Verify we can log in as this user and access the auth (it has the
# root role)
cl = etcd.Client(port=6001, username='test_user',
password='123456')
ul = auth.EtcdUser(cl, 'root')
try:
ul.read()
except etcd.EtcdInsufficientPermissions:
self.fail("Reading auth with the new user is not possible")
self.assertEquals(u.name, "test_user")
self.assertEquals(u.roles, set(['guest', 'root']))
# set roles as a list, it works!
u.roles = ['guest', 'test_group']
try:
u.write()
except:
self.fail("updating a user you previously created fails")
u.read()
self.assertIn('test_group', u.roles)
# Unauthorized access is properly handled
ua = auth.EtcdUser(self.unauth_client, 'test_user')
self.assertRaises(etcd.EtcdInsufficientPermissions, ua.write)
# now let's test deletion
du = auth.EtcdUser(self.client, 'user.does.not.exist')
self.assertRaises(etcd.EtcdKeyNotFound, du.delete)
# Delete test_user
u.delete()
self.assertRaises(etcd.EtcdKeyNotFound, u.read)
# Permissions are properly handled
self.assertRaises(etcd.EtcdInsufficientPermissions, ua.delete)
class EtcdRoleTest(TestEtcdAuthBase):
def test_names(self):
r = auth.EtcdRole(self.client, 'guest')
self.assertListEqual(r.names, [u'guest', u'root'])
def test_read(self):
r = auth.EtcdRole(self.client, 'guest')
try:
r.read()
except:
self.fail('Reading an existing role failed')
self.assertEquals(r.acls, {'*': 'RW'})
# We can actually skip most other read tests as they are common
# with EtcdUser
def test_write_and_delete(self):
r = auth.EtcdRole(self.client, 'test_role')
r.acls = {'*': 'R', '/test/*': 'RW'}
try:
r.write()
except:
self.fail("Writing a simple groups should not fail")
r1 = auth.EtcdRole(self.client, 'test_role')
r1.read()
self.assertEquals(r1.acls, r.acls)
r.revoke('/test/*', 'W')
r.write()
r1.read()
self.assertEquals(r1.acls, {'*': 'R', '/test/*': 'R'})
r.grant('/pub/*', 'RW')
r.write()
r1.read()
self.assertEquals(r1.acls['/pub/*'], 'RW')
# All other exceptions are tested by the user tests
r1.name = None
self.assertRaises(etcd.EtcdException, r1.write)
# ditto for delete
try:
r.delete()
except:
self.fail("A normal delete should not fail")
self.assertRaises(etcd.EtcdKeyNotFound, r.read)
|