/usr/share/pyshared/xdist/txnode.py is in python-pytest-xdist 1.4-1.1build1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 | """
Manage setup, running and local representation of remote nodes/processes.
"""
import py
from xdist.mypickle import PickleChannel
from py._test.session import Session
class TXNode(object):
""" Represents a Test Execution environment in the controlling process.
- sets up a slave node through an execnet gateway
- manages sending of test-items and receival of results and events
- creates events when the remote side crashes
"""
ENDMARK = -1
def __init__(self, nodemanager, gateway, config, putevent):
self.nodemanager = nodemanager
self.config = config
self.putevent = putevent
self.gateway = gateway
self.slaveinput = {}
self.channel = install_slave(self)
self.channel.setcallback(self.callback, endmarker=self.ENDMARK)
self._down = False
def __repr__(self):
id = self.gateway.id
status = self._down and 'true' or 'false'
return "<TXNode %r down=%s>" %(id, status)
def notify(self, eventname, *args, **kwargs):
assert not args
self.putevent((eventname, args, kwargs))
def callback(self, eventcall):
""" this gets called for each object we receive from
the other side and if the channel closes.
Note that channel callbacks run in the receiver
thread of execnet gateways - we need to
avoid raising exceptions or doing heavy work.
"""
try:
if eventcall == self.ENDMARK:
err = self.channel._getremoteerror()
if not self._down:
if not err or isinstance(err, EOFError):
err = "Not properly terminated" # lost connection?
self.notify("pytest_testnodedown", node=self, error=err)
self._down = True
return
eventname, args, kwargs = eventcall
if eventname == "slaveready":
self.notify("pytest_testnodeready", node=self)
elif eventname == "slavefinished":
self._down = True
self.slaveoutput = kwargs['slaveoutput']
error = kwargs['error']
self.notify("pytest_testnodedown", error=error, node=self)
elif eventname in ("pytest_runtest_logreport",
"pytest__teardown_final_logerror"):
kwargs['report'].node = self
self.notify(eventname, **kwargs)
else:
self.notify(eventname, **kwargs)
except KeyboardInterrupt:
# should not land in receiver-thread
raise
except:
excinfo = py.code.ExceptionInfo()
py.builtin.print_("!" * 20, excinfo)
self.config.pluginmanager.notify_exception(excinfo)
def send(self, item):
assert item is not None
self.channel.send(item)
def sendlist(self, itemlist):
self.channel.send(itemlist)
def shutdown(self, kill=False):
if kill:
self.gateway.exit()
else:
self.channel.send(None)
# configuring and setting up slave node
def install_slave(node):
channel = node.gateway.remote_exec(source="""
import os, sys
sys.path.insert(0, os.getcwd())
from xdist.mypickle import PickleChannel
from xdist.txnode import SlaveSession
channel.send("basicimport")
channel = PickleChannel(channel)
import py
config, slaveinput, basetemp, nodeid = channel.receive()
config.slaveinput = slaveinput
config.slaveoutput = {}
if basetemp:
config.basetemp = py.path.local(basetemp)
config.nodeid = nodeid
config.pluginmanager.do_configure(config)
session = SlaveSession(config, channel, nodeid)
session.dist_main()
""")
channel.receive()
channel = PickleChannel(channel)
basetemp = None
config = node.config
config.hook.pytest_configure_node(node=node)
if node.gateway.spec.popen:
popenbase = config.ensuretemp("popen")
basetemp = py.path.local.make_numbered_dir(prefix="slave-",
keep=0, rootdir=popenbase)
basetemp = str(basetemp)
channel.send((config, node.slaveinput, basetemp, node.gateway.id))
return channel
class SlaveSession(Session):
def __init__(self, config, channel, nodeid):
self.channel = channel
self.nodeid = nodeid
super(SlaveSession, self).__init__(config=config)
def __repr__(self):
return "<%s channel=%s>" %(self.__class__.__name__, self.channel)
def sendevent(self, eventname, *args, **kwargs):
self.channel.send((eventname, args, kwargs))
def pytest_runtest_logreport(self, report):
self.sendevent("pytest_runtest_logreport", report=report)
def pytest__teardown_final_logerror(self, report):
self.sendevent("pytest__teardown_final_logerror", report=report)
def pytest_keyboard_interrupt(self, excinfo):
self._slaveerror = "SIGINT"
def pytest_internalerror(self, excrepr):
self._slaveerror = "internal-error"
self.sendevent("pytest_internalerror", excrepr=excrepr)
def dist_main(self):
self.runner = self.config.pluginmanager.getplugin("pytest_runner")
self.sendevent("slaveready")
self.main(None)
error = getattr(self, '_slaveerror', None)
self.sendevent("slavefinished", error=error,
slaveoutput=self.config.slaveoutput)
def _mainloop(self, colitems):
while 1:
task = self.channel.receive()
if task is None:
break
if isinstance(task, list):
for item in task:
self.run_single(item=item)
else:
self.run_single(item=task)
def run_single(self, item):
call = self.runner.CallInfo(item._reraiseunpicklingproblem, when='setup')
if call.excinfo:
# likely it is not collectable here because of
# platform/import-dependency induced skips
# we fake a setup-error report with the obtained exception
# and do not care about capturing or non-runner hooks
rep = self.runner.pytest_runtest_makereport(item=item, call=call)
self.pytest_runtest_logreport(rep)
return
item.config.hook.pytest_runtest_protocol(item=item)
|