This file is indexed.

/usr/lib/python2.7/dist-packages/pyramid/tests/test_scripts/test_pserve.py is in python-pyramid 1.6+dfsg-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
import atexit
import os
import tempfile
import unittest

from pyramid.compat import PY3
if PY3:
    import builtins as __builtin__
else:
    import __builtin__

class TestPServeCommand(unittest.TestCase):
    def setUp(self):
        from pyramid.compat import NativeIO
        self.out_ = NativeIO()
        self.pid_file = None

    def tearDown(self):
        if self.pid_file and os.path.exists(self.pid_file):
            os.remove(self.pid_file)

    def out(self, msg):
        self.out_.write(msg)

    def _get_server(*args, **kwargs):
        def server(app):
            return ''

        return server

    def _getTargetClass(self):
        from pyramid.scripts.pserve import PServeCommand
        return PServeCommand

    def _makeOne(self, *args):
        effargs = ['pserve']
        effargs.extend(args)
        cmd = self._getTargetClass()(effargs)
        cmd.out = self.out
        return cmd

    def _makeOneWithPidFile(self, pid):
        self.pid_file = tempfile.mktemp()
        inst = self._makeOne()
        with open(self.pid_file, 'w') as f:
            f.write(str(pid))
        return inst

    def test_remove_pid_file_verbose(self):
        inst = self._makeOneWithPidFile(os.getpid())
        inst._remove_pid_file(os.getpid(), self.pid_file, verbosity=1)
        self._assert_pid_file_removed(verbose=True)

    def test_remove_pid_file_not_verbose(self):
        inst = self._makeOneWithPidFile(os.getpid())
        inst._remove_pid_file(os.getpid(), self.pid_file, verbosity=0)
        self._assert_pid_file_removed(verbose=False)

    def test_remove_pid_not_a_number(self):
        inst = self._makeOneWithPidFile('not a number')
        inst._remove_pid_file(os.getpid(), self.pid_file, verbosity=1)
        self._assert_pid_file_removed(verbose=True)

    def test_remove_pid_current_pid_is_not_written_pid(self):
        inst = self._makeOneWithPidFile(os.getpid())
        inst._remove_pid_file('99999', self.pid_file, verbosity=1)
        self._assert_pid_file_not_removed('')

    def test_remove_pid_current_pid_is_not_pid_in_file(self):
        inst = self._makeOneWithPidFile('99999')
        inst._remove_pid_file(os.getpid(), self.pid_file, verbosity=1)
        msg = 'PID file %s contains 99999, not expected PID %s'
        self._assert_pid_file_not_removed(msg % (self.pid_file, os.getpid()))

    def test_remove_pid_no_pid_file(self):
        inst = self._makeOne()
        self.pid_file = 'some unknown path'
        inst._remove_pid_file(os.getpid(), self.pid_file, verbosity=1)
        self._assert_pid_file_removed(verbose=False)

    def test_remove_pid_file_unlink_exception(self):
        inst = self._makeOneWithPidFile(os.getpid())
        self._remove_pid_unlink_exception(inst)
        msg = [
            'Removing PID file %s' % (self.pid_file),
            'Cannot remove PID file: (Some OSError - unlink)',
            'Stale PID removed']
        self._assert_pid_file_not_removed(msg=''.join(msg))
        with open(self.pid_file) as f:
            self.assertEqual(f.read(), '')

    def test_remove_pid_file_stale_pid_write_exception(self):
        inst = self._makeOneWithPidFile(os.getpid())
        self._remove_pid_unlink_and_write_exceptions(inst)
        msg = [
            'Removing PID file %s' % (self.pid_file),
            'Cannot remove PID file: (Some OSError - unlink)',
            'Stale PID left in file: %s ' % (self.pid_file),
            '(Some OSError - open)']
        self._assert_pid_file_not_removed(msg=''.join(msg))
        with open(self.pid_file) as f:
            self.assertEqual(int(f.read()), os.getpid())

    def test_record_pid_verbose(self):
        self._assert_record_pid(verbosity=2, msg='Writing PID %d to %s')

    def test_record_pid_not_verbose(self):
        self._assert_record_pid(verbosity=1, msg='')

    def _remove_pid_unlink_exception(self, inst):
        old_unlink = os.unlink
        def fake_unlink(filename):
            raise OSError('Some OSError - unlink')

        try:
            os.unlink = fake_unlink
            inst._remove_pid_file(os.getpid(), self.pid_file, verbosity=1)
        finally:
            os.unlink = old_unlink

    def _remove_pid_unlink_and_write_exceptions(self, inst):
        old_unlink = os.unlink
        def fake_unlink(filename):
            raise OSError('Some OSError - unlink')

        run_already = []
        old_open = __builtin__.open
        def fake_open(*args):
            if not run_already:
                run_already.append(True)
                return old_open(*args)
            raise OSError('Some OSError - open')

        try:
            os.unlink = fake_unlink
            __builtin__.open = fake_open
            inst._remove_pid_file(os.getpid(), self.pid_file, verbosity=1)
        finally:
            os.unlink = old_unlink
            __builtin__.open = old_open

    def _assert_pid_file_removed(self, verbose=False):
        self.assertFalse(os.path.exists(self.pid_file))
        msg = 'Removing PID file %s' % (self.pid_file) if verbose else ''
        self.assertEqual(self.out_.getvalue(), msg)

    def _assert_pid_file_not_removed(self, msg):
        self.assertTrue(os.path.exists(self.pid_file))
        self.assertEqual(self.out_.getvalue(), msg)

    def _assert_record_pid(self, verbosity, msg):
        old_atexit = atexit.register
        def fake_atexit(*args):
            pass

        self.pid_file = tempfile.mktemp()
        pid = os.getpid()
        inst = self._makeOne()
        inst.options.verbose = verbosity

        try:
            atexit.register = fake_atexit
            inst.record_pid(self.pid_file)
        finally:
            atexit.register = old_atexit

        msg = msg % (pid, self.pid_file) if msg else ''
        self.assertEqual(self.out_.getvalue(), msg)
        with open(self.pid_file) as f:
            self.assertEqual(int(f.read()), pid)

    def test_run_no_args(self):
        inst = self._makeOne()
        result = inst.run()
        self.assertEqual(result, 2)
        self.assertEqual(self.out_.getvalue(), 'You must give a config file')

    def test_run_stop_daemon_no_such_pid_file(self):
        path = os.path.join(os.path.dirname(__file__), 'wontexist.pid')
        inst = self._makeOne('--stop-daemon', '--pid-file=%s' % path)
        inst.run()
        msg = 'No PID file exists in %s' % path
        self.assertTrue(msg in self.out_.getvalue())

    def test_run_stop_daemon_bad_pid_file(self):
        path = __file__
        inst = self._makeOne('--stop-daemon', '--pid-file=%s' % path)
        inst.run()
        msg = 'Not a valid PID file in %s' % path
        self.assertTrue(msg in self.out_.getvalue())

    def test_run_stop_daemon_invalid_pid_in_file(self):
        fn = tempfile.mktemp()
        with open(fn, 'wb') as tmp:
            tmp.write(b'9999999')
        tmp.close()
        inst = self._makeOne('--stop-daemon', '--pid-file=%s' % fn)
        inst.run()
        msg = 'PID in %s is not valid (deleting)' % fn
        self.assertTrue(msg in self.out_.getvalue())

    def test_get_options_with_command(self):
        inst = self._makeOne()
        inst.args = ['foo', 'stop', 'a=1', 'b=2']
        result = inst.get_options()
        self.assertEqual(result, {'a': '1', 'b': '2'})

    def test_get_options_no_command(self):
        inst = self._makeOne()
        inst.args = ['foo', 'a=1', 'b=2']
        result = inst.get_options()
        self.assertEqual(result, {'a': '1', 'b': '2'})

    def test_parse_vars_good(self):
        from pyramid.tests.test_scripts.dummy import DummyApp

        inst = self._makeOne('development.ini', 'a=1', 'b=2')
        inst.loadserver = self._get_server


        app = DummyApp()

        def get_app(*args, **kwargs):
            app.global_conf = kwargs.get('global_conf', None)

        inst.loadapp = get_app
        inst.run()

        self.assertEqual(app.global_conf, {'a': '1', 'b': '2'})

    def test_parse_vars_bad(self):
        inst = self._makeOne('development.ini', 'a')
        inst.loadserver = self._get_server
        self.assertRaises(ValueError, inst.run)

class Test_read_pidfile(unittest.TestCase):
    def _callFUT(self, filename):
        from pyramid.scripts.pserve import read_pidfile
        return read_pidfile(filename)

    def test_read_pidfile(self):
        filename = tempfile.mktemp()
        try:
            with open(filename, 'w') as f:
                f.write('12345')
            result = self._callFUT(filename)
            self.assertEqual(result, 12345)
        finally:
            os.remove(filename)

    def test_read_pidfile_no_pid_file(self):
        result = self._callFUT('some unknown path')
        self.assertEqual(result, None)

    def test_read_pidfile_not_a_number(self):
        result = self._callFUT(__file__)
        self.assertEqual(result, None)

class Test_main(unittest.TestCase):
    def _callFUT(self, argv):
        from pyramid.scripts.pserve import main
        return main(argv, quiet=True)

    def test_it(self):
        result = self._callFUT(['pserve'])
        self.assertEqual(result, 2)

class TestLazyWriter(unittest.TestCase):
    def _makeOne(self, filename, mode='w'):
        from pyramid.scripts.pserve import LazyWriter
        return LazyWriter(filename, mode)

    def test_open(self):
        filename = tempfile.mktemp()
        try:
            inst = self._makeOne(filename)
            fp = inst.open()
            self.assertEqual(fp.name, filename)
        finally:
            fp.close()
            os.remove(filename)

    def test_write(self):
        filename = tempfile.mktemp()
        try:
            inst = self._makeOne(filename)
            inst.write('hello')
        finally:
            with open(filename) as f:
                data = f.read()
                self.assertEqual(data, 'hello')
            inst.close()
            os.remove(filename)

    def test_writeline(self):
        filename = tempfile.mktemp()
        try:
            inst = self._makeOne(filename)
            inst.writelines('hello')
        finally:
            with open(filename) as f:
                data = f.read()
                self.assertEqual(data, 'hello')
            inst.close()
            os.remove(filename)

    def test_flush(self):
        filename = tempfile.mktemp()
        try:
            inst = self._makeOne(filename)
            inst.flush()
            fp = inst.fileobj
            self.assertEqual(fp.name, filename)
        finally:
            fp.close()
            os.remove(filename)

class Test__methodwrapper(unittest.TestCase):
    def _makeOne(self, func, obj, type):
        from pyramid.scripts.pserve import _methodwrapper
        return _methodwrapper(func, obj, type)

    def test___call__succeed(self):
        def foo(self, cls, a=1): return 1
        class Bar(object): pass
        wrapper = self._makeOne(foo, Bar, None)
        result = wrapper(a=1)
        self.assertEqual(result, 1)

    def test___call__fail(self):
        def foo(self, cls, a=1): return 1
        class Bar(object): pass
        wrapper = self._makeOne(foo, Bar, None)
        self.assertRaises(AssertionError, wrapper, cls=1)