/usr/lib/python3/dist-packages/schroot/chroot.py is in python3-schroot 0.4-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 | from schroot.utils import run_command
from schroot.core import log
from schroot.errors import SchrootError
from contextlib import contextmanager
from tempfile import NamedTemporaryFile
import shutil
import os
import subprocess
import pipes
try:
import configparser
except ImportError:
import ConfigParser as configparser # meh, Python 2
class SchrootCommandError(SchrootError):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
SCHROOT_BASE = "/var/lib/schroot"
class SchrootChroot(object):
__slots__ = ('session', 'active', 'location')
def __init__(self):
self.session = None
self.active = False
self.location = None
def _command_prefix(self, user, preserve_environment):
command = ['schroot', '-r', '-c', self.session]
if user:
command += ['-u', user]
if preserve_environment:
command += ['-p']
command += ['--']
return command
@contextmanager
def _command(self, cmd, kwargs):
user = kwargs.pop("user", None)
preserve_environment = kwargs.pop("preserve_environment", False)
env = kwargs.pop("env", None)
command = self._command_prefix(user, preserve_environment)
# short cut if env not set
if env is None:
command += cmd
log.debug(" ".join((str(x) for x in command)))
yield command
return
tmp_dir = os.path.join(self.location, "tmp")
with NamedTemporaryFile(dir=tmp_dir) as tmp_file:
for key, value in env.iteritems():
tmp_cmd = "export %s=%s" % (
pipes.quote(key), pipes.quote(value))
tmp_file.write(tmp_cmd)
tmp_file.write("\n")
tmp_file.write('"$@"\n')
tmp_file.flush()
chroot_tmp_file = os.path.basename(tmp_file.name)
chroot_tmp_file = os.path.join("/tmp", chroot_tmp_file)
command += ['sh', '-e', chroot_tmp_file] + cmd
log.debug(" ".join((str(x) for x in command)))
yield command
def _safe_run(self, cmd):
out, err, ret = run_command(cmd)
if ret != 0:
raise SchrootCommandError(err)
return out, err, ret
def copy(self, what, whence, user=None):
with self.create_file(whence, user) as f:
log.debug("copying %s to %s" % (what, f.name))
with open(what) as src:
shutil.copyfileobj(src, f)
def get_session_config(self):
cfg = configparser.ConfigParser()
fil = os.path.join(SCHROOT_BASE, 'session', self.session)
if cfg.read(fil) == []:
raise SchrootError("SANITY FAILURE")
return cfg[self.session]
def start(self, chroot_name):
out, err, ret = self._safe_run(['schroot', '-b', '-c', chroot_name])
self.session = out.strip()
self.active = True
log.debug("new session: %s" % (self.session))
out, err, ret = self._safe_run([
'schroot', '--location', '-c', "session:%s" % self.session
])
self.location = out.strip()
def end(self):
if self.session is not None:
out, err, ret = self._safe_run(['schroot', '-e', '-c', self.session])
def __lt__(self, other):
return self.run(other, return_codes=0)
def __floordiv__(self, other):
return UserProxy(other, self)
@contextmanager
def create_file(self, whence, user=None):
tmp_dir = os.path.join(self.location, "tmp")
with NamedTemporaryFile(dir=tmp_dir) as tmp_file:
chroot_tmp_file = os.path.basename(tmp_file.name)
chroot_tmp_file = os.path.join("/tmp", chroot_tmp_file)
log.debug("creating %s" % (tmp_file.name))
yield tmp_file
tmp_file.flush()
self.check_call(['cp', chroot_tmp_file, whence], user=user)
def run(self, cmd, **kwargs):
with self._command(cmd, kwargs) as command:
return run_command(command, **kwargs)
def call(self, cmd, **kwargs):
with self._command(cmd, kwargs) as command:
return subprocess.call(command, **kwargs)
def check_call(self, cmd, **kwargs):
with self._command(cmd, kwargs) as command:
return subprocess.check_call(command, **kwargs)
def check_output(self, cmd, **kwargs):
with self._command(cmd, kwargs) as command:
return subprocess.check_output(command, **kwargs)
def Popen(self, cmd, **kwargs):
with self._command(cmd, kwargs) as command:
return subprocess.Popen(command, **kwargs)
class UserProxy(SchrootChroot):
__slots__ = ('user')
def __init__(self, user, other):
super(UserProxy, self).__init__()
self.user = user
for entry in other.__slots__:
setattr(self, entry, getattr(other, entry))
def run(self, cmd, return_codes=None):
return super(UserProxy, self).run(cmd, user=self.user,
return_codes=return_codes)
@contextmanager
def schroot(name):
ch = SchrootChroot()
try:
ch.start(name)
yield ch
finally:
ch.end()
|