This file is indexed.

/usr/share/pyshared/xdist/remote.py is in python-pytest-xdist 1.4-1.1build1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
"""
    LooponfailingSession and Helpers. 

    NOTE that one really has to avoid loading and depending on 
    application modules within the controlling process 
    (the one that starts repeatedly test processes)
    otherwise changes to source code can crash 
    the controlling process which should never happen. 
"""
import py
import sys
import execnet
from py._test.session import Session
from xdist import util

class LooponfailingSession(Session):
    def __init__(self, config):
        super(LooponfailingSession, self).__init__(config=config)
        self.rootdirs = [self.config.topdir] # xxx dist_rsync_roots? 
        self.statrecorder = util.StatRecorder(self.rootdirs) 
        self.remotecontrol = RemoteControl(self.config)
        self.out = py.io.TerminalWriter()

    def main(self, initialitems):
        try:
            self.loopstate = loopstate = LoopState([])
            while 1:
                self.loop_once(loopstate)
                if not loopstate.colitems and loopstate.wasfailing:
                    continue # the last failures passed, let's rerun all
                self.statrecorder.waitonchange(checkinterval=2.0) 
        except KeyboardInterrupt:
            print

    def loop_once(self, loopstate):
        self.remotecontrol.setup()
        colitems = loopstate.colitems
        loopstate.wasfailing = colitems and len(colitems)
        loopstate.colitems = self.remotecontrol.runsession(colitems or ())

class LoopState:
    def __init__(self, colitems=None):
        self.colitems = colitems

class RemoteControl(object):
    def __init__(self, config):
        self.config = config

    def trace(self, *args):
        if self.config.option.debug:
            msg = " ".join([str(x) for x in args])
            py.builtin.print_("RemoteControl:", msg)

    def initgateway(self):
        return execnet.makegateway("popen")

    def setup(self, out=None):
        if out is None:
            out = py.io.TerminalWriter()
        if hasattr(self, 'gateway'):
            raise ValueError("already have gateway %r" % self.gateway)
        self.trace("setting up slave session")
        self.gateway = self.initgateway()
        self.channel = channel = self.gateway.remote_exec("""
            import os, sys
            import py
            chdir = channel.receive()
            outchannel = channel.gateway.newchannel()
            channel.send(outchannel)
            # prune sys.path to not contain relative paths
            newpaths = []
            for p in sys.path:
                if p:
                    if not os.path.isabs(p):
                        p = os.path.abspath(p)
                    newpaths.append(p)
            sys.path[:] = newpaths     
            os.chdir(chdir) # unpickling config uses cwd as topdir
            
            config_state = channel.receive()
            fullwidth, hasmarkup = channel.receive()
            py.test.config.__setstate__(config_state)

            import sys
            sys.stdout = sys.stderr = outchannel.makefile('w')

            from xdist.remote import slave_runsession
            slave_runsession(channel, py.test.config, fullwidth, hasmarkup) 
        """)
        channel.send(str(self.config.topdir))
        remote_outchannel = channel.receive()
        def write(s):
            out._file.write(s)
            out._file.flush()
        remote_outchannel.setcallback(write)
        channel.send(self.config.__getstate__())
        channel.send((out.fullwidth, out.hasmarkup))
        self.trace("set up of slave session complete")

    def ensure_teardown(self):
        if hasattr(self, 'channel'):
            if not self.channel.isclosed():
                self.trace("closing", self.channel)
                self.channel.close()
            del self.channel
        if hasattr(self, 'gateway'):
            self.trace("exiting", self.gateway)
            self.gateway.exit()
            del self.gateway

    def runsession(self, colitems=()):
        try:
            self.trace("sending", colitems)
            trails = colitems
            self.channel.send(trails)
            try:
                return self.channel.receive()
            except self.channel.RemoteError:
                e = sys.exc_info()[1]
                self.trace("ERROR", e)
                raise
        finally:
            self.ensure_teardown()

def slave_runsession(channel, config, fullwidth, hasmarkup):
    """ we run this on the other side. """
    if config.option.debug:
        def DEBUG(*args): 
            print(" ".join(map(str, args)))
    else:
        def DEBUG(*args): pass

    DEBUG("SLAVE: received configuration, using topdir:", config.topdir)
    #config.option.session = None
    config.option.looponfail = False 
    config.option.usepdb = False 
    try:
        trails = channel.receive()
    except KeyboardInterrupt:
        return # in the slave we can't do much about this
    config.pluginmanager.do_configure(config)
    DEBUG("SLAVE: initsession()")
    session = config.initsession()
    # XXX configure the reporter object's terminal writer more directly
    # XXX and write a test for this remote-terminal setting logic 
    config.pytest_terminal_hasmarkup = hasmarkup
    config.pytest_terminal_fullwidth = fullwidth
    if trails:
        colitems = []
        for trail in trails:
            try:
                colitem = config._rootcol.fromtrail(trail)
            except ValueError:
                #XXX send info for "test disappeared" or so
                continue 
            colitems.append(colitem)
    else:
        colitems = config.getinitialnodes()
    session.shouldclose = channel.isclosed 
   
    class Failures(list):
        def pytest_runtest_logreport(self, report):
            if report.failed:
                self.append(report)
        pytest_collectreport = pytest_runtest_logreport
        
    failreports = Failures()
    session.pluginmanager.register(failreports)

    DEBUG("SLAVE: starting session.main()")
    session.main(colitems)
    repr_pytest_looponfailinfo(
        failreports=list(failreports), 
        rootdirs=[config.topdir])
    rootcol = session.config._rootcol
    channel.send([rootcol.totrail(rep.getnode()) for rep in failreports])


def repr_pytest_looponfailinfo(failreports, rootdirs):
    tr = py.io.TerminalWriter()
    if failreports:
        tr.sep("#", "LOOPONFAILING", red=True)
        for report in failreports:
            loc = report._getcrashline()
            if loc:
                tr.line(loc, red=True)
    tr.sep("#", "waiting for changes")
    for rootdir in rootdirs:
        tr.line("### Watching:   %s" %(rootdir,), bold=True)