/usr/lib/python3/dist-packages/plainbox/impl/secure/launcher1.py is in python3-plainbox 0.25-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 | # This file is part of Checkbox.
#
# Copyright 2013 Canonical Ltd.
# Written by:
# Zygmunt Krynicki <zygmunt.krynicki@canonical.com>
#
# Checkbox is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3,
# as published by the Free Software Foundation.
#
# Checkbox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
"""
:mod:`plainbox.impl.secure.launcher1` -- plainbox-trusted-launcher-1
====================================================================
"""
import argparse
import copy
import logging
import os
import subprocess
from plainbox.i18n import gettext as _
from plainbox.impl.job import JobDefinition
from plainbox.impl.resource import Resource
from plainbox.impl.unit.template import TemplateUnit
from plainbox.impl.secure.origin import JobOutputTextSource
from plainbox.impl.secure.providers.v1 import all_providers
from plainbox.impl.secure.rfc822 import load_rfc822_records, RFC822SyntaxError
class TrustedLauncher:
"""
Trusted Launcher for v1 jobs.
"""
def __init__(self):
"""
Initialize a new instance of the trusted launcher
"""
self._job_list = []
def add_job_list(self, job_list):
"""
Add jobs to the trusted launcher
"""
self._job_list.extend(job_list)
def find_job(self, checksum):
for job in self._job_list:
if job.checksum == checksum:
return job
else:
raise LookupError(
_("Cannot find job with checksum {}").format(checksum))
def modify_execution_environment(self, target_env):
"""
Modify the job execution environment with a new set of values.
It's mandatory to do this way to keep variables automatically set by
pkexec(1) when the org.freedesktop.policykit.exec.allow_gui annotation
is set.
It will allow the trusted launcher to run X11 applications as
another user since the $DISPLAY and $XAUTHORITY environment
variables will be retained.
"""
ptl_env = dict(os.environ)
if target_env:
ptl_env.update(target_env)
return ptl_env
def run_shell_from_job(self, checksum, env):
"""
Run a job with the given checksum.
:param checksum:
The checksum of the job to execute.
:param env:
Environment to execute the job in.
:returns:
The return code of the command
:raises LookupError:
If the checksum does not match any known job
"""
job = self.find_job(checksum)
cmd = [job.shell, '-c', job.command]
return subprocess.call(cmd, env=self.modify_execution_environment(env))
def run_generator_job(self, checksum, env):
"""
Run a job with and process the stdout to get a job definition.
:param checksum:
The checksum of the job to execute
:param env:
Environment to execute the job in.
:returns:
A list of job definitions that were processed from the output.
:raises LookupError:
If the checksum does not match any known job
"""
job = self.find_job(checksum)
cmd = [job.shell, '-c', job.command]
output = subprocess.check_output(
cmd, universal_newlines=True,
env=self.modify_execution_environment(env))
job_list = []
source = JobOutputTextSource(job)
try:
record_list = load_rfc822_records(output, source=source)
except RFC822SyntaxError as exc:
logging.error(
_("Syntax error in record generated from %s: %s"), job, exc)
else:
if job.plugin == 'local':
for record in record_list:
job = JobDefinition.from_rfc822_record(record)
job_list.append(job)
elif job.plugin == 'resource':
resource_list = []
for record in record_list:
resource = Resource(record.data)
resource_list.append(resource)
for plugin in all_providers.get_all_plugins():
for u in plugin.plugin_object.unit_list:
if (
isinstance(u, TemplateUnit) and
u.resource_id == job.id
):
logging.info(_("Instantiating unit: %s"), u)
for new_unit in u.instantiate_all(resource_list):
job_list.append(new_unit)
return job_list
class UpdateAction(argparse.Action):
"""
Argparse action that builds up a dictionary.
This action is similar to the built-in append action but it constructs
a dictionary instead of a list.
"""
def __init__(self, option_strings, dest, nargs=None, const=None,
default=None, type=None, choices=None, required=False,
help=None, metavar=None):
if nargs == 0:
raise ValueError('nargs for append actions must be > 0; if arg '
'strings are not supplying the value to append, '
'the append const action may be more appropriate')
if const is not None and nargs != argparse.OPTIONAL:
raise ValueError(
'nargs must be {!r} to supply const'.format(argparse.OPTIONAL))
super().__init__(
option_strings=option_strings, dest=dest, nargs=nargs, const=const,
default=default, type=type, choices=choices, required=required,
help=help, metavar=metavar)
def __call__(self, parser, namespace, values, option_string=None):
"""
Internal method of argparse.Action
This method is invoked to "apply" the action after seeing all the
values for a given argument. Please refer to argparse source code for
information on how it is used.
"""
items = copy.copy(argparse._ensure_value(namespace, self.dest, {}))
for value in values:
try:
k, v = value.split('=', 1)
except ValueError:
raise argparse.ArgumentError(self, "expected NAME=VALUE")
else:
items[k] = v
setattr(namespace, self.dest, items)
def get_parser_for_sphinx():
parser = argparse.ArgumentParser(
prog="plainbox-trusted-launcher-1",
description=_("Security elevation mechanism for plainbox"))
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
'-w', '--warmup',
action='store_true',
# TRANSLATORS: don't translate pkexec(1)
help=_('return immediately, only useful when used with pkexec(1)'))
group.add_argument(
'-t', '--target',
metavar=_('CHECKSUM'),
help=_('run a job with this checksum'))
group = parser.add_argument_group(_("target job specification"))
group.add_argument(
'-T', '--target-environment', metavar=_('NAME=VALUE'),
dest='target_env',
nargs='+',
action=UpdateAction,
help=_('environment passed to the target job'))
group = parser.add_argument_group(title=_("generator job specification"))
group.add_argument(
'-g', '--generator',
metavar=_('CHECKSUM'),
# TRANSLATORS: don't translate 'local' in the sentence below. It
# denotes a special type of job, not its location.
help=_('also run a job with this checksum (assuming it is a local'
' job)'))
group.add_argument(
'-G', '--generator-environment',
dest='generator_env',
nargs='+',
metavar=_('NAME=VALUE'),
action=UpdateAction,
help=_('environment passed to the generator job'))
return parser
def main(argv=None):
"""
Entry point for the plainbox-trusted-launcher-1
:param argv:
Command line arguments to parse. If None (default) then sys.argv is
used instead.
:returns:
The return code of the job that was selected with the --target argument
or zero if the --warmup argument was specified.
:raises:
SystemExit if --taget or --generator point to unknown jobs.
The trusted launcher is a sudo-like program, that can grant unprivileged
users permission to run something as root, that is restricted to executing
shell snippets embedded inside job definitions offered by v1 plainbox
providers.
As a security measure the trusted launcher only considers job providers
listed in the system-wide directory since one needs to be root to add
additional definitions there anyway.
Unlike the rest of plainbox, the trusted launcher does not produce job
results, instead it just literally executes the shell snippet and returns
stdout/stderr unaffected to the invoking process. The exception to this
rule is the way --via argument is handled, where the trusted launcher needs
to capture stdout to interpret that as job definitions.
Unlike sudo, the trusted launcher is not a setuid program and cannot grant
root access in itself. Instead it relies on a policykit and specifically on
pkexec(1) alongside with an appropriate policy file, to grant users a way
to run trusted-launcher as root (or another user).
"""
parser = get_parser_for_sphinx()
ns = parser.parse_args(argv)
# Just quit if warming up
if ns.warmup:
return 0
launcher = TrustedLauncher()
# Siphon all jobs from all secure providers otherwise
all_providers.load()
for plugin in all_providers.get_all_plugins():
launcher.add_job_list(plugin.plugin_object.job_list)
# Run the local job and feed the result back to the launcher
if ns.generator:
try:
generated_job_list = launcher.run_generator_job(
ns.generator, ns.generator_env)
launcher.add_job_list(generated_job_list)
except LookupError as exc:
raise SystemExit(str(exc))
# Run the target job and return the result code
try:
return launcher.run_shell_from_job(ns.target, ns.target_env)
except LookupError as exc:
raise SystemExit(str(exc))
if __name__ == "__main__":
main()
|