/usr/share/pyshared/twisted/runner/procmon.py is in python-twisted-runner 11.1.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 | # -*- test-case-name: twisted.runner.test.test_procmon -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Support for starting, monitoring, and restarting child process.
"""
import warnings
from twisted.python import log
from twisted.internet import error, protocol, reactor as _reactor
from twisted.application import service
from twisted.protocols import basic
class DummyTransport:
disconnecting = 0
transport = DummyTransport()
class LineLogger(basic.LineReceiver):
tag = None
delimiter = '\n'
def lineReceived(self, line):
log.msg('[%s] %s' % (self.tag, line))
class LoggingProtocol(protocol.ProcessProtocol):
service = None
name = None
empty = 1
def connectionMade(self):
self.output = LineLogger()
self.output.tag = self.name
self.output.makeConnection(transport)
def outReceived(self, data):
self.output.dataReceived(data)
self.empty = data[-1] == '\n'
errReceived = outReceived
def processEnded(self, reason):
if not self.empty:
self.output.dataReceived('\n')
self.service.connectionLost(self.name)
class ProcessMonitor(service.Service):
"""
ProcessMonitor runs processes, monitors their progress, and restarts
them when they die.
The ProcessMonitor will not attempt to restart a process that appears to
die instantly -- with each "instant" death (less than 1 second, by
default), it will delay approximately twice as long before restarting
it. A successful run will reset the counter.
The primary interface is L{addProcess} and L{removeProcess}. When the
service is running (that is, when the application it is attached to is
running), adding a process automatically starts it.
Each process has a name. This name string must uniquely identify the
process. In particular, attempting to add two processes with the same
name will result in a C{KeyError}.
@type threshold: C{float}
@ivar threshold: How long a process has to live before the death is
considered instant, in seconds. The default value is 1 second.
@type killTime: C{float}
@ivar killTime: How long a process being killed has to get its affairs
in order before it gets killed with an unmaskable signal. The
default value is 5 seconds.
@type minRestartDelay: C{float}
@ivar minRestartDelay: The minimum time (in seconds) to wait before
attempting to restart a process. Default 1s.
@type maxRestartDelay: C{float}
@ivar maxRestartDelay: The maximum time (in seconds) to wait before
attempting to restart a process. Default 3600s (1h).
@type _reactor: L{IReactorProcess} provider
@ivar _reactor: A provider of L{IReactorProcess} and L{IReactorTime}
which will be used to spawn processes and register delayed calls.
"""
threshold = 1
killTime = 5
minRestartDelay = 1
maxRestartDelay = 3600
def __init__(self, reactor=_reactor):
self._reactor = reactor
self.processes = {}
self.protocols = {}
self.delay = {}
self.timeStarted = {}
self.murder = {}
self.restart = {}
def _getActive(self):
warnings.warn("active is deprecated since Twisted 10.1.0. "
"Use running instead.", category=DeprecationWarning,
stacklevel=2)
return self.running
active = property(_getActive, None)
def _getConsistency(self):
warnings.warn("consistency is deprecated since Twisted 10.1.0.",
category=DeprecationWarning, stacklevel=2)
return None
consistency = property(_getConsistency, None)
def _getConsistencyDelay(self):
warnings.warn("consistencyDelay is deprecated since Twisted 10.1.0.",
category=DeprecationWarning, stacklevel=2)
return 60
consistencyDelay = property(_getConsistencyDelay, None)
def __getstate__(self):
dct = service.Service.__getstate__(self)
del dct['_reactor']
dct['protocols'] = {}
dct['delay'] = {}
dct['timeStarted'] = {}
dct['murder'] = {}
dct['restart'] = {}
return dct
def addProcess(self, name, args, uid=None, gid=None, env={}):
"""
Add a new monitored process and start it immediately if the
L{ProcessMonitor} service is running.
Note that args are passed to the system call, not to the shell. If
running the shell is desired, the common idiom is to use
C{ProcessMonitor.addProcess("name", ['/bin/sh', '-c', shell_script])}
@param name: A name for this process. This value must be
unique across all processes added to this monitor.
@type name: C{str}
@param args: The argv sequence for the process to launch.
@param uid: The user ID to use to run the process. If C{None},
the current UID is used.
@type uid: C{int}
@param gid: The group ID to use to run the process. If C{None},
the current GID is used.
@type uid: C{int}
@param env: The environment to give to the launched process. See
L{IReactorProcess.spawnProcess}'s C{env} parameter.
@type env: C{dict}
@raises: C{KeyError} if a process with the given name already
exists
"""
if name in self.processes:
raise KeyError("remove %s first" % (name,))
self.processes[name] = args, uid, gid, env
self.delay[name] = self.minRestartDelay
if self.running:
self.startProcess(name)
def removeProcess(self, name):
"""
Stop the named process and remove it from the list of monitored
processes.
@type name: C{str}
@param name: A string that uniquely identifies the process.
"""
self.stopProcess(name)
del self.processes[name]
def startService(self):
"""
Start all monitored processes.
"""
service.Service.startService(self)
for name in self.processes:
self.startProcess(name)
def stopService(self):
"""
Stop all monitored processes and cancel all scheduled process restarts.
"""
service.Service.stopService(self)
# Cancel any outstanding restarts
for name, delayedCall in self.restart.items():
if delayedCall.active():
delayedCall.cancel()
for name in self.processes:
self.stopProcess(name)
def connectionLost(self, name):
"""
Called when a monitored processes exits. If
L{ProcessMonitor.running} is C{True} (ie the service is started), the
process will be restarted.
If the process had been running for more than
L{ProcessMonitor.threshold} seconds it will be restarted immediately.
If the process had been running for less than
L{ProcessMonitor.threshold} seconds, the restart will be delayed and
each time the process dies before the configured threshold, the restart
delay will be doubled - up to a maximum delay of maxRestartDelay sec.
@type name: C{str}
@param name: A string that uniquely identifies the process
which exited.
"""
# Cancel the scheduled _forceStopProcess function if the process
# dies naturally
if name in self.murder:
if self.murder[name].active():
self.murder[name].cancel()
del self.murder[name]
del self.protocols[name]
if self._reactor.seconds() - self.timeStarted[name] < self.threshold:
# The process died too fast - backoff
nextDelay = self.delay[name]
self.delay[name] = min(self.delay[name] * 2, self.maxRestartDelay)
else:
# Process had been running for a significant amount of time
# restart immediately
nextDelay = 0
self.delay[name] = self.minRestartDelay
# Schedule a process restart if the service is running
if self.running and name in self.processes:
self.restart[name] = self._reactor.callLater(nextDelay,
self.startProcess,
name)
def startProcess(self, name):
"""
@param name: The name of the process to be started
"""
# If a protocol instance already exists, it means the process is
# already running
if name in self.protocols:
return
args, uid, gid, env = self.processes[name]
proto = LoggingProtocol()
proto.service = self
proto.name = name
self.protocols[name] = proto
self.timeStarted[name] = self._reactor.seconds()
self._reactor.spawnProcess(proto, args[0], args, uid=uid,
gid=gid, env=env)
def _forceStopProcess(self, proc):
"""
@param proc: An L{IProcessTransport} provider
"""
try:
proc.signalProcess('KILL')
except error.ProcessExitedAlready:
pass
def stopProcess(self, name):
"""
@param name: The name of the process to be stopped
"""
if name not in self.processes:
raise KeyError('Unrecognized process name: %s' % (name,))
proto = self.protocols.get(name, None)
if proto is not None:
proc = proto.transport
try:
proc.signalProcess('TERM')
except error.ProcessExitedAlready:
pass
else:
self.murder[name] = self._reactor.callLater(
self.killTime,
self._forceStopProcess, proc)
def restartAll(self):
"""
Restart all processes. This is useful for third party management
services to allow a user to restart servers because of an outside change
in circumstances -- for example, a new version of a library is
installed.
"""
for name in self.processes:
self.stopProcess(name)
def __repr__(self):
l = []
for name, proc in self.processes.items():
uidgid = ''
if proc[1] is not None:
uidgid = str(proc[1])
if proc[2] is not None:
uidgid += ':'+str(proc[2])
if uidgid:
uidgid = '(' + uidgid + ')'
l.append('%r%s: %r' % (name, uidgid, proc[0]))
return ('<' + self.__class__.__name__ + ' '
+ ' '.join(l)
+ '>')
|