/usr/share/pyshared/epsilon/process.py is in python-epsilon 0.6.0+r2713-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | # -*- test-case-name: epsilon.test.test_process -*-
# Copyright (c) 2008 Divmod. See LICENSE for details.
"""
Process and stdio related functionality.
"""
import os, sys, imp
from zope.interface import implements
from twisted.internet import reactor
from twisted.application.service import IService, Service
from twisted.internet.stdio import StandardIO
from epsilon.structlike import record
def spawnProcess(processProtocol, executable, args=(), env={},
path=None, uid=None, gid=None, usePTY=0,
packages=()):
"""Launch a process with a particular Python environment.
All arguments as to reactor.spawnProcess(), except for the
addition of an optional packages iterable. This should be
of strings naming packages the subprocess is to be able to
import.
"""
env = env.copy()
pythonpath = []
for pkg in packages:
p = os.path.split(imp.find_module(pkg)[1])[0]
if p.startswith(os.path.join(sys.prefix, 'lib')):
continue
pythonpath.append(p)
pythonpath = list(set(pythonpath))
pythonpath.extend(env.get('PYTHONPATH', '').split(os.pathsep))
env['PYTHONPATH'] = os.pathsep.join(pythonpath)
return reactor.spawnProcess(processProtocol, executable, args,
env, path, uid, gid, usePTY)
def spawnPythonProcess(processProtocol, args=(), env={},
path=None, uid=None, gid=None, usePTY=0,
packages=()):
"""Launch a Python process
All arguments as to spawnProcess(), except the executable
argument is omitted.
"""
return spawnProcess(processProtocol, sys.executable,
args, env, path, uid, gid, usePTY,
packages)
class StandardIOService(record('protocol'), Service):
"""
Service for connecting a protocol to stdio.
"""
def startService(self):
"""
Connect C{self.protocol} to standard io.
"""
StandardIO(self.protocol)
|