/usr/lib/python2.7/dist-packages/pyramid/tests/test_scripts/test_pserve.py is in python-pyramid 1.6+dfsg-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 | import atexit
import os
import tempfile
import unittest
from pyramid.compat import PY3
if PY3:
import builtins as __builtin__
else:
import __builtin__
class TestPServeCommand(unittest.TestCase):
def setUp(self):
from pyramid.compat import NativeIO
self.out_ = NativeIO()
self.pid_file = None
def tearDown(self):
if self.pid_file and os.path.exists(self.pid_file):
os.remove(self.pid_file)
def out(self, msg):
self.out_.write(msg)
def _get_server(*args, **kwargs):
def server(app):
return ''
return server
def _getTargetClass(self):
from pyramid.scripts.pserve import PServeCommand
return PServeCommand
def _makeOne(self, *args):
effargs = ['pserve']
effargs.extend(args)
cmd = self._getTargetClass()(effargs)
cmd.out = self.out
return cmd
def _makeOneWithPidFile(self, pid):
self.pid_file = tempfile.mktemp()
inst = self._makeOne()
with open(self.pid_file, 'w') as f:
f.write(str(pid))
return inst
def test_remove_pid_file_verbose(self):
inst = self._makeOneWithPidFile(os.getpid())
inst._remove_pid_file(os.getpid(), self.pid_file, verbosity=1)
self._assert_pid_file_removed(verbose=True)
def test_remove_pid_file_not_verbose(self):
inst = self._makeOneWithPidFile(os.getpid())
inst._remove_pid_file(os.getpid(), self.pid_file, verbosity=0)
self._assert_pid_file_removed(verbose=False)
def test_remove_pid_not_a_number(self):
inst = self._makeOneWithPidFile('not a number')
inst._remove_pid_file(os.getpid(), self.pid_file, verbosity=1)
self._assert_pid_file_removed(verbose=True)
def test_remove_pid_current_pid_is_not_written_pid(self):
inst = self._makeOneWithPidFile(os.getpid())
inst._remove_pid_file('99999', self.pid_file, verbosity=1)
self._assert_pid_file_not_removed('')
def test_remove_pid_current_pid_is_not_pid_in_file(self):
inst = self._makeOneWithPidFile('99999')
inst._remove_pid_file(os.getpid(), self.pid_file, verbosity=1)
msg = 'PID file %s contains 99999, not expected PID %s'
self._assert_pid_file_not_removed(msg % (self.pid_file, os.getpid()))
def test_remove_pid_no_pid_file(self):
inst = self._makeOne()
self.pid_file = 'some unknown path'
inst._remove_pid_file(os.getpid(), self.pid_file, verbosity=1)
self._assert_pid_file_removed(verbose=False)
def test_remove_pid_file_unlink_exception(self):
inst = self._makeOneWithPidFile(os.getpid())
self._remove_pid_unlink_exception(inst)
msg = [
'Removing PID file %s' % (self.pid_file),
'Cannot remove PID file: (Some OSError - unlink)',
'Stale PID removed']
self._assert_pid_file_not_removed(msg=''.join(msg))
with open(self.pid_file) as f:
self.assertEqual(f.read(), '')
def test_remove_pid_file_stale_pid_write_exception(self):
inst = self._makeOneWithPidFile(os.getpid())
self._remove_pid_unlink_and_write_exceptions(inst)
msg = [
'Removing PID file %s' % (self.pid_file),
'Cannot remove PID file: (Some OSError - unlink)',
'Stale PID left in file: %s ' % (self.pid_file),
'(Some OSError - open)']
self._assert_pid_file_not_removed(msg=''.join(msg))
with open(self.pid_file) as f:
self.assertEqual(int(f.read()), os.getpid())
def test_record_pid_verbose(self):
self._assert_record_pid(verbosity=2, msg='Writing PID %d to %s')
def test_record_pid_not_verbose(self):
self._assert_record_pid(verbosity=1, msg='')
def _remove_pid_unlink_exception(self, inst):
old_unlink = os.unlink
def fake_unlink(filename):
raise OSError('Some OSError - unlink')
try:
os.unlink = fake_unlink
inst._remove_pid_file(os.getpid(), self.pid_file, verbosity=1)
finally:
os.unlink = old_unlink
def _remove_pid_unlink_and_write_exceptions(self, inst):
old_unlink = os.unlink
def fake_unlink(filename):
raise OSError('Some OSError - unlink')
run_already = []
old_open = __builtin__.open
def fake_open(*args):
if not run_already:
run_already.append(True)
return old_open(*args)
raise OSError('Some OSError - open')
try:
os.unlink = fake_unlink
__builtin__.open = fake_open
inst._remove_pid_file(os.getpid(), self.pid_file, verbosity=1)
finally:
os.unlink = old_unlink
__builtin__.open = old_open
def _assert_pid_file_removed(self, verbose=False):
self.assertFalse(os.path.exists(self.pid_file))
msg = 'Removing PID file %s' % (self.pid_file) if verbose else ''
self.assertEqual(self.out_.getvalue(), msg)
def _assert_pid_file_not_removed(self, msg):
self.assertTrue(os.path.exists(self.pid_file))
self.assertEqual(self.out_.getvalue(), msg)
def _assert_record_pid(self, verbosity, msg):
old_atexit = atexit.register
def fake_atexit(*args):
pass
self.pid_file = tempfile.mktemp()
pid = os.getpid()
inst = self._makeOne()
inst.options.verbose = verbosity
try:
atexit.register = fake_atexit
inst.record_pid(self.pid_file)
finally:
atexit.register = old_atexit
msg = msg % (pid, self.pid_file) if msg else ''
self.assertEqual(self.out_.getvalue(), msg)
with open(self.pid_file) as f:
self.assertEqual(int(f.read()), pid)
def test_run_no_args(self):
inst = self._makeOne()
result = inst.run()
self.assertEqual(result, 2)
self.assertEqual(self.out_.getvalue(), 'You must give a config file')
def test_run_stop_daemon_no_such_pid_file(self):
path = os.path.join(os.path.dirname(__file__), 'wontexist.pid')
inst = self._makeOne('--stop-daemon', '--pid-file=%s' % path)
inst.run()
msg = 'No PID file exists in %s' % path
self.assertTrue(msg in self.out_.getvalue())
def test_run_stop_daemon_bad_pid_file(self):
path = __file__
inst = self._makeOne('--stop-daemon', '--pid-file=%s' % path)
inst.run()
msg = 'Not a valid PID file in %s' % path
self.assertTrue(msg in self.out_.getvalue())
def test_run_stop_daemon_invalid_pid_in_file(self):
fn = tempfile.mktemp()
with open(fn, 'wb') as tmp:
tmp.write(b'9999999')
tmp.close()
inst = self._makeOne('--stop-daemon', '--pid-file=%s' % fn)
inst.run()
msg = 'PID in %s is not valid (deleting)' % fn
self.assertTrue(msg in self.out_.getvalue())
def test_get_options_with_command(self):
inst = self._makeOne()
inst.args = ['foo', 'stop', 'a=1', 'b=2']
result = inst.get_options()
self.assertEqual(result, {'a': '1', 'b': '2'})
def test_get_options_no_command(self):
inst = self._makeOne()
inst.args = ['foo', 'a=1', 'b=2']
result = inst.get_options()
self.assertEqual(result, {'a': '1', 'b': '2'})
def test_parse_vars_good(self):
from pyramid.tests.test_scripts.dummy import DummyApp
inst = self._makeOne('development.ini', 'a=1', 'b=2')
inst.loadserver = self._get_server
app = DummyApp()
def get_app(*args, **kwargs):
app.global_conf = kwargs.get('global_conf', None)
inst.loadapp = get_app
inst.run()
self.assertEqual(app.global_conf, {'a': '1', 'b': '2'})
def test_parse_vars_bad(self):
inst = self._makeOne('development.ini', 'a')
inst.loadserver = self._get_server
self.assertRaises(ValueError, inst.run)
class Test_read_pidfile(unittest.TestCase):
def _callFUT(self, filename):
from pyramid.scripts.pserve import read_pidfile
return read_pidfile(filename)
def test_read_pidfile(self):
filename = tempfile.mktemp()
try:
with open(filename, 'w') as f:
f.write('12345')
result = self._callFUT(filename)
self.assertEqual(result, 12345)
finally:
os.remove(filename)
def test_read_pidfile_no_pid_file(self):
result = self._callFUT('some unknown path')
self.assertEqual(result, None)
def test_read_pidfile_not_a_number(self):
result = self._callFUT(__file__)
self.assertEqual(result, None)
class Test_main(unittest.TestCase):
def _callFUT(self, argv):
from pyramid.scripts.pserve import main
return main(argv, quiet=True)
def test_it(self):
result = self._callFUT(['pserve'])
self.assertEqual(result, 2)
class TestLazyWriter(unittest.TestCase):
def _makeOne(self, filename, mode='w'):
from pyramid.scripts.pserve import LazyWriter
return LazyWriter(filename, mode)
def test_open(self):
filename = tempfile.mktemp()
try:
inst = self._makeOne(filename)
fp = inst.open()
self.assertEqual(fp.name, filename)
finally:
fp.close()
os.remove(filename)
def test_write(self):
filename = tempfile.mktemp()
try:
inst = self._makeOne(filename)
inst.write('hello')
finally:
with open(filename) as f:
data = f.read()
self.assertEqual(data, 'hello')
inst.close()
os.remove(filename)
def test_writeline(self):
filename = tempfile.mktemp()
try:
inst = self._makeOne(filename)
inst.writelines('hello')
finally:
with open(filename) as f:
data = f.read()
self.assertEqual(data, 'hello')
inst.close()
os.remove(filename)
def test_flush(self):
filename = tempfile.mktemp()
try:
inst = self._makeOne(filename)
inst.flush()
fp = inst.fileobj
self.assertEqual(fp.name, filename)
finally:
fp.close()
os.remove(filename)
class Test__methodwrapper(unittest.TestCase):
def _makeOne(self, func, obj, type):
from pyramid.scripts.pserve import _methodwrapper
return _methodwrapper(func, obj, type)
def test___call__succeed(self):
def foo(self, cls, a=1): return 1
class Bar(object): pass
wrapper = self._makeOne(foo, Bar, None)
result = wrapper(a=1)
self.assertEqual(result, 1)
def test___call__fail(self):
def foo(self, cls, a=1): return 1
class Bar(object): pass
wrapper = self._makeOne(foo, Bar, None)
self.assertRaises(AssertionError, wrapper, cls=1)
|