/usr/lib/python2.7/dist-packages/firmwaretools/pycompat.py is in firmware-tools-common 2.1.14-0ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 | # vim:expandtab:autoindent:tabstop=4:shiftwidth=4:filetype=python:tw=0
#############################################################################
#
# Copyright (c) 2005 Dell Computer Corporation
# Dual Licenced under GNU GPL and OSL
#
#############################################################################
"""module
some docs here eventually.
"""
from __future__ import generators
# import arranged alphabetically
import commands
import getopt
import glob
import os
import sys
import ConfigParser
import math
import zipfile
import re
import shutil
import signal
import time
import threading
from firmwaretools.trace_decorator import decorate, traceLog, getLog
def clearLine():
return "\033[2K\033[0G"
def spinner(cycle=['/', '-', '\\', '|']):
step = cycle[0]
del cycle[0]
cycle.append(step)
# ESC codes for clear line and position cursor at horizontal pos 0
return step
def pad(strn, pad_width=67):
# truncate strn to pad_width so spinPrint does not scroll
if len(strn) > pad_width:
return strn[:pad_width] + ' ...'
else:
return strn
def spinPrint(strn, outFd=sys.stderr):
outFd.write(clearLine())
outFd.write("%s\t%s" % (spinner(), pad(strn)))
outFd.flush()
def timedSpinPrint( strn, start ):
now = time.time()
# ESC codes for position cursor at horizontal pos 65
spinPrint( strn + "\033[65G time: %2.2f" % (now - start) )
# helper class & functions for executeCommand()
# User should handle this if they specify a timeout
class commandTimeoutExpired(Exception): pass
# the problem with os.system() is that the command that is run gets any
# keyboard input and/or signals. This means that <CTRL>-C interrupts the
# sub-program instead of the python program. This helper function fixes that.
# It also allows us to set up a maximum timeout before all children are killed
decorate(traceLog())
def executeCommand(cmd, timeout=0):
class alarmExc(Exception): pass
def alarmhandler(signum,stackframe):
raise alarmExc("timeout expired")
pid = os.fork()
if pid:
#parent
rpid = ret = 0
oldhandler=signal.signal(signal.SIGALRM,alarmhandler)
starttime = time.time()
prevTimeout = signal.alarm(timeout)
try:
(rpid, ret) = os.waitpid(pid, 0)
signal.alarm(0)
signal.signal(signal.SIGALRM,oldhandler)
if prevTimeout:
passed = time.time() - starttime
signal.alarm(int(math.ceil(prevTimeout - passed)))
except alarmExc:
try:
os.kill(-pid, signal.SIGTERM)
time.sleep(1)
os.kill(-pid, signal.SIGKILL)
except OSError: # errno=3 == no such process
pass
(rpid, ret) = os.waitpid(pid, 0)
signal.signal(signal.SIGALRM,oldhandler)
if prevTimeout:
passed = time.time() - starttime
signal.alarm(int(max(math.ceil(prevTimeout - passed), 1)))
raise commandTimeoutExpired( "Specified timeout of %s seconds expired before command finished. Command was: %s"
% (timeout, cmd)
)
except KeyboardInterrupt:
signal.signal(signal.SIGALRM,oldhandler)
try:
os.kill(-pid, signal.SIGTERM)
time.sleep(1)
os.kill(-pid, signal.SIGKILL)
except OSError: # errno=3 == no such process
pass
(rpid, ret) = os.waitpid(pid, 0)
raise
# mask and return just return value
return (ret & 0xFF00) >> 8
else:
#child
os.setpgrp() # become process group leader so that we can kill all our children
signal.signal(signal.SIGINT, signal.SIG_IGN) #ignore <CTRL>-C so parent gets it
ret = os.system(cmd)
os._exit( (ret & 0xFF00) >> 8 )
decorate(traceLog())
def copyFile( source, dest, ignoreException=0 ):
try:
shutil.copyfile(source, dest)
except IOError:
if not ignoreException:
raise
# python 2.3 has a better version, but we have to run on python 2.2. :-(
decorate(traceLog())
def mktempdir( prefix="/tmp" ):
status, output = commands.getstatusoutput("mktemp -d %s/tempdir-$$-$RANDOM-XXXXXX" % prefix)
if status != 0:
raise Exception("could not create secure temporary directory: %s" % output)
return output
# generator function -- emulates the os.walk() generator in python 2.3 (mostly)
# ret = (path, dirs, files) foreach dir
decorate(traceLog())
def walkPath(topdir, direction=0):
rawFiles = os.listdir(topdir)
files=[f for f in rawFiles if os.path.isfile(os.path.join(topdir,f))]
dirs =[f for f in rawFiles if os.path.isdir (os.path.join(topdir,f))]
if direction == 0:
yield (topdir, dirs, files)
for d in dirs:
if not os.path.islink(os.path.join(topdir,d)):
for (newtopdir, newdirs, newfiles) in walkPath(os.path.join(topdir,d)):
yield (newtopdir, newdirs, newfiles)
if direction == 1:
yield (topdir, dirs, files)
decorate(traceLog())
def runLongProcess(function, args=None, kargs=None, waitLoopFunction=None):
# runs a function in a separate thread. Runs waitLoopFunction() while it
# waits for the function to finish. Good for updating GUI, or other stuff
thread = BackgroundWorker(function, args, kargs)
while thread.running:
if waitLoopFunction is not None:
waitLoopFunction()
# run waitLoopFunction one last time before exit.
# gives status opportunity to update to 100%
if waitLoopFunction is not None:
waitLoopFunction()
if thread.exception:
getLog(prefix="verbose.").exception(thread.exception)
raise thread.exception
return thread.returnCode
class BackgroundWorker(threading.Thread):
def __init__ (self, function, args=None, kargs=None):
threading.Thread.__init__(self)
self.function = function
self.args = args
self.kargs = kargs
self.exception = None
self.returnCode = None
self.running=1
if self.args is None: self.args = []
if self.kargs is None: self.kargs = {}
self.start()
decorate(traceLog())
def run(self):
try:
self.returnCode = self.function(*self.args, **self.kargs)
except (Exception,), e:
self.exception = e
self.running=0
|