/usr/lib/python3/dist-packages/checkbox/job.py is in python3-checkbox 0.17.6-0ubuntu6.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 | #
# This file is part of Checkbox.
#
# Copyright 2008 Canonical Ltd.
#
# Checkbox is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3,
# as published by the Free Software Foundation.
#
# Checkbox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
#
import os
import logging
from gettext import gettext as _
from string import Template
from checkbox.lib.process import Process
from checkbox.lib.signal import signal_to_name, signal_to_description
from checkbox.message import MessageStore
FAIL = "fail"
PASS = "pass"
UNINITIATED = "uninitiated"
UNRESOLVED = "unresolved"
UNSUPPORTED = "unsupported"
UNTESTED = "untested"
ALL_STATUS = [FAIL, PASS, UNINITIATED, UNRESOLVED, UNSUPPORTED, UNTESTED]
DEFAULT_JOB_TIMEOUT = 30 # used in case a job specifies invalid timeout
class Job:
def __init__(self, command, environ=None, timeout=None):
if environ is None:
environ = []
self.command = command
self.environ = environ
self.timeout = timeout
if self.timeout is not None:
try:
self.timeout = float(self.timeout)
except:
self.timeout = DEFAULT_JOB_TIMEOUT
finally:
if self.timeout < 0:
self.timeout = DEFAULT_JOB_TIMEOUT
def execute(self):
# Sanitize environment
process_environ = dict(os.environ)
for environ in self.environ:
key, value = environ.split("=", 1)
value = Template(value).safe_substitute(process_environ)
process_environ[key] = value
logging.info("Running command: %s", self.command)
process = Process(self.command, process_environ)
if not process.read(self.timeout):
logging.info("Command timed out, killing process.")
process.kill()
process_status = process.cleanup()
if os.WIFEXITED(process_status):
exit_status = os.WEXITSTATUS(process_status)
if exit_status == 0:
status = PASS
data = process.outdata
if not data:
data = process.errdata
elif exit_status == 127:
status = UNRESOLVED
data = _("Command not found.").encode("utf-8")
else:
status = FAIL
data = (process.errdata
or process.outdata)
elif os.WIFSIGNALED(process_status):
status = UNRESOLVED
term_signal = os.WTERMSIG(process_status)
data = (_("Command received signal %(signal_name)s: "
"%(signal_description)s") % \
{'signal_name': signal_to_name(term_signal),
'signal_description': signal_to_description(term_signal)}
).encode("utf-8")
else:
raise Exception("Command not terminated: %s" \
% self.command)
duration = process.endtime - process.starttime
return (status, data, duration,)
class JobStore(MessageStore):
"""A job store which stores its jobs in a file system hierarchy."""
def add(self, job):
# Return if the same job is already in the store
if list(self._find_matching_messages(name=job["name"])):
return
return super(JobStore, self).add(job)
def _find_matching_messages(self, **kwargs):
for filename in self._walk_messages():
message = self._read_message(filename, cache=True)
for key, value in kwargs.items():
if message.get(key) != value:
break
else:
yield filename
|