/usr/share/pyshared/sprox/test/base.py is in python-sprox 0.6.4-4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | import os, re
from copy import copy
from difflib import unified_diff
from sqlalchemy import *
from sqlalchemy.orm import *
from model import *
from cStringIO import StringIO
from cgi import FieldStorage
from tw import framework
framework.default_view = 'mako'
#try:
import xml.etree.ElementTree as etree
#except ImportError:
# import cElementTree as etree
session = None
engine = None
connect = None
sorted_user_columns = ['_password', 'created', 'display_name', 'email_address',
'groups', 'password', 'sprox_id', 'town',
'town_id', 'user_id', 'user_name']
def remove_whitespace_nodes(node):
new_node = copy(node)
new_node._children = []
if new_node.text and new_node.text.strip() == '':
new_node.text = ''
if new_node.tail and new_node.tail.strip() == '':
new_node.tail = ''
for child in node.getchildren():
if child is not None:
child = remove_whitespace_nodes(child)
new_node.append(child)
return new_node
def remove_namespace(doc):
"""Remove namespace in the passed document in place."""
for elem in doc.getiterator():
match = re.match('(\{.*\})(.*)', elem.tag)
if match:
elem.tag = match.group(2)
def replace_escape_chars(needle):
needle = needle.replace(' ', ' ')
return needle
def fix_xml(needle):
needle = replace_escape_chars(needle)
needle_node = etree.fromstring(needle)
needle_node = remove_whitespace_nodes(needle_node)
remove_namespace(needle_node)
needle_s = etree.tostring(needle_node)
return needle_s
def in_xml(needle, haystack):
needle_s, haystack_s = map(fix_xml, (needle, haystack))
return needle_s in haystack_s
def eq_xml(needle, haystack):
needle_s, haystack_s = map(fix_xml, (needle, haystack))
return needle_s == haystack_s
def assert_in_xml(needle, haystack):
assert in_xml(needle, haystack), "%s not found in %s"%(needle, haystack)
def assert_eq_xml(needle, haystack):
assert eq_xml(needle, haystack), "%s does not equal %s"%(needle, haystack)
database_setup=False
def setup_database():
global session, engine, database_setup, connect, metadata
#singletonizes things
if not database_setup:
engine = create_engine(os.environ.get('DBURL', 'sqlite://'), strategy="threadlocal")
connect = engine.connect()
# print 'testing on', engine
metadata.bind = engine
metadata.drop_all()
metadata.create_all()
Session = sessionmaker(bind=engine, autoflush=True, autocommit=False)
session = Session()
database_setup = True
return session, engine, metadata
records_setup = None
def setup_records(session):
session.expunge_all()
user = User()
user.user_name = u'asdf'
user.email_address = u"asdf@asdf.com"
user.password = u"asdf"
session.add(user)
arvada = Town(name=u'Arvada')
session.add(arvada)
session.flush()
user.town = arvada
session.add(Town(name=u'Denver'))
session.add(Town(name=u'Golden'))
session.add(Town(name=u'Boulder'))
#test_table.insert(values=dict(BLOB=FieldStorage('asdf', StringIO()).value)).execute()
#user_reference_table.insert(values=dict(user_id=user.user_id)).execute()
# print user.user_id
for i in range (5):
group = Group(group_name=unicode(i))
session.add(group)
user.groups.append(group)
session.flush()
return user
def teardown_database():
pass
#metadata.drop_all()
def _reassign_from_metadata():
global visits_table, visit_identity_table, groups_table, town_table
global users_table, permissions_table, user_group_table
global group_permission_table, test_table
visits_table = metadata.tables['visit']
visit_identity_table = metadata.tables['visit_identity']
groups_table = metadata.tables['tg_group']
town_table = metadata.tables['town_table']
users_table = metadata.tables['tg_user']
permissions_table = metadata.tables['permission']
user_group_table = metadata.tables['user_group']
group_permission_table = metadata.tables['group_permission']
test_table = metadata.tables['test_table']
def setup_reflection():
#if os.environ.get('AUTOLOAD', False):
metadata.clear()
metadata.reflect()
_reassign_from_metadata()
clear_mappers()
tables = metadata.tables
mapper(Town, tables['town_table'])
mapper(Example, tables['test_table'])
mapper(Visit, tables['visit'])
mapper(VisitIdentity, tables['visit_identity'],
properties=dict(users=relation(User, backref='visit_identity')))
mapper(User, tables['tg_user'])
mapper(Group, tables['tg_group'],
properties=dict(users=relation(User,
secondary=tables['user_group'],
backref='groups')))
mapper(Permission, tables['permission'],
properties=dict(groups=relation(Group,
secondary=tables['group_permission'],
backref='permissions')))
class SproxTest(object):
def setup(self):
self.session = session
self.engine = engine
try:
self.user = setup_records(session)
except:
self.session.rollback()
def teardown(self):
self.session.rollback()
if __name__ == '__main__':
setupDatabase()
|