This file is indexed.

/usr/lib/python2.7/dist-packages/celery/tests/contrib/test_rdb.py is in python-celery 3.1.20-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
from __future__ import absolute_import

import errno
import socket

from celery.contrib.rdb import (
    Rdb,
    debugger,
    set_trace,
)
from celery.tests.case import AppCase, Mock, WhateverIO, patch, skip_if_pypy


class SockErr(socket.error):
    errno = None


class test_Rdb(AppCase):

    @patch('celery.contrib.rdb.Rdb')
    def test_debugger(self, Rdb):
        x = debugger()
        self.assertTrue(x)
        self.assertIs(x, debugger())

    @patch('celery.contrib.rdb.debugger')
    @patch('celery.contrib.rdb._frame')
    def test_set_trace(self, _frame, debugger):
        self.assertTrue(set_trace(Mock()))
        self.assertTrue(set_trace())
        self.assertTrue(debugger.return_value.set_trace.called)

    @patch('celery.contrib.rdb.Rdb.get_avail_port')
    @skip_if_pypy
    def test_rdb(self, get_avail_port):
        sock = Mock()
        get_avail_port.return_value = (sock, 8000)
        sock.accept.return_value = (Mock(), ['helu'])
        out = WhateverIO()
        with Rdb(out=out) as rdb:
            self.assertTrue(get_avail_port.called)
            self.assertIn('helu', out.getvalue())

            # set_quit
            with patch('sys.settrace') as settrace:
                rdb.set_quit()
                settrace.assert_called_with(None)

            # set_trace
            with patch('celery.contrib.rdb.Pdb.set_trace') as pset:
                with patch('celery.contrib.rdb._frame'):
                    rdb.set_trace()
                    rdb.set_trace(Mock())
                    pset.side_effect = SockErr
                    pset.side_effect.errno = errno.ENOENT
                    with self.assertRaises(SockErr):
                        rdb.set_trace()

            # _close_session
            rdb._close_session()

            # do_continue
            rdb.set_continue = Mock()
            rdb.do_continue(Mock())
            rdb.set_continue.assert_called_with()

            # do_quit
            rdb.set_quit = Mock()
            rdb.do_quit(Mock())
            rdb.set_quit.assert_called_with()

    @patch('socket.socket')
    @skip_if_pypy
    def test_get_avail_port(self, sock):
        out = WhateverIO()
        sock.return_value.accept.return_value = (Mock(), ['helu'])
        with Rdb(out=out):
            pass

        with patch('celery.contrib.rdb.current_process') as curproc:
            curproc.return_value.name = 'PoolWorker-10'
            with Rdb(out=out):
                pass

        err = sock.return_value.bind.side_effect = SockErr()
        err.errno = errno.ENOENT
        with self.assertRaises(SockErr):
            with Rdb(out=out):
                pass
        err.errno = errno.EADDRINUSE
        with self.assertRaises(Exception):
            with Rdb(out=out):
                pass
        called = [0]

        def effect(*a, **kw):
            try:
                if called[0] > 50:
                    return True
                raise err
            finally:
                called[0] += 1
        sock.return_value.bind.side_effect = effect
        with Rdb(out=out):
            pass