/usr/share/pyshared/vamos/tools.py is in undertaker 1.3b-1.1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 | #
# vamos - common auxiliary functionality
#
# Copyright (C) 2011 Christian Dietrich <christian.dietrich@informatik.uni-erlangen.de>
# Copyright (C) 2011 Reinhard Tartler <tartler@informatik.uni-erlangen.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import logging
import os.path
from subprocess import *
class CommandFailed(RuntimeError):
""" Indicates that some command failed
Attributes:
command -- the command that failed
returncode -- the exitcode of the failed command
"""
def __init__(self, command, returncode):
assert(returncode != 0)
self.command = command
self.returncode = returncode
self.repr = "Command %s failed to execute (returncode: %d)" % \
(command, returncode)
RuntimeError.__init__(self, self.repr)
def __str__(self):
return self.repr
def setup_logging(log_level):
""" setup the logging module with the given log_level """
l = logging.WARNING # default
if log_level == 1:
l = logging.INFO
elif log_level >= 2:
l = logging.DEBUG
logging.basicConfig(level=l)
def execute(command, echo=True, failok=True):
"""
executes 'command' in a shell.
optional parameter echo can be used to suppress emitting the command
to logging.debug.
if failok is set to false, an RuntimeError will be raised with the
full commandname and exitcode.
returns a tuple with
1. the command's standard output as list of lines
2. the exitcode
"""
os.environ["LC_ALL"] = "C"
os.environ["LC_MESSAGES"] = "C"
if echo:
logging.debug("executing: " + command)
p = Popen(command, stdout=PIPE, stderr=STDOUT, shell=True)
(stdout, _) = p.communicate() # stderr is merged into STDOUT
if not failok and p.returncode != 0:
raise CommandFailed(command, p.returncode)
if len(stdout) > 0 and stdout[-1] == '\n':
stdout = stdout[:-1]
return (stdout.__str__().rsplit('\n'), p.returncode)
def calculate_worklist(args, batch_mode=False):
"""
Calculates a sanitizes worklist from a list of given arguments
This method is intended to be passed lists from the command line. It
is passed a list of filenames, which are tested individually for
existance. Non-existing filenames are skipped with a warning.
Optionally, the given files can be interpreted as worklist. In this
case, each line in the given files is tested for existance and
considered for the worklist.
In any case, the caller can rely on a set of filenames which are
good for processing.
"""
worklist = set()
if batch_mode:
for worklist_file in args:
oldlen = len(worklist)
try:
with open(worklist_file) as fd:
for line in fd:
f = os.path.normpath(line.rstrip())
if os.path.exists(f):
worklist.add(f)
else:
logging.warning("Skipping non-existent file %s", f)
except IOError:
logging.warning("failed to process worklist %s", worklist_file)
logging.info("Added %d files from worklist %s",
len(worklist) - oldlen, worklist_file)
else:
for filename in args:
f = os.path.normpath(filename)
if os.path.exists(f):
worklist.add(f)
else:
logging.warning("Skipping non-existent file %s", f)
logging.info("Processing %d files in total", len(worklist))
return worklist
def check_tool(tool):
"""
checks if the specified tool is available and executable
NB: the specified command will be executed and should therefore not
have any side effects. For gcc or make pass an option such as '--version'.
"""
try:
execute(tool, failok=False)
except CommandFailed:
logging.error("Failed to locate %s, check your installation", tool)
return False
return True
def get_online_processors():
"""
tries to determine how many processors are currently online
"""
threads = 1
try:
output, _= execute("getconf _NPROCESSORS_ONLN", failok=False)
threads = int(output[0])
except StandardError:
logging.error("Unable to determine number of online processors, " +\
"assuming 1")
return threads
|