/usr/lib/python3/dist-packages/provisioningserver/utils/ps.py is in python3-maas-provisioningserver 2.0.0~beta3+bzr4941-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 | # Copyright 2016 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Helpers for inspecting processes."""
__all__ = [
'running_in_container',
]
from functools import lru_cache
import os
from provisioningserver.utils.fs import read_text_file
from provisioningserver.utils.shell import (
call_and_check,
ExternalProcessError,
)
def is_pid_in_container(pid, proc_path="/proc"):
"""Return True if the `pid` is running in a container.
This should only be called when not running in a container itself, because
if this process is running in a container than the `pid` process is
sure to be running in the container as well.
"""
cgroup_path = os.path.join(proc_path, str(pid), "cgroup")
cgroup_info = read_text_file(cgroup_path)
for line in cgroup_info.splitlines():
id_num, subsytem, hierarchy = line.split(":", 2)
if hierarchy.startswith("/lxc") or "docker" in hierarchy:
return True
return False
@lru_cache(maxsize=1)
def running_in_container():
"""Return True if running in an LXC or Docker container."""
try:
call_and_check(["systemd-detect-virt", "-c"])
except ExternalProcessError:
# Exited non-zero so not in a container.
return False
else:
# Exited zero so inside a container.
return True
def get_running_pids_with_command(
command, exclude_container_processes=True, proc_path="/proc"):
"""Return list of pids that are running the following command.
:param command: The command to search for. This is only the command as
`cat` not the full command line.
:param exclude_container_processes: Excludes processes that are running
in an LXC container on the host.
"""
running_pids = [pid for pid in os.listdir(proc_path) if pid.isdigit()]
pids = []
for pid in running_pids:
try:
pid_command = read_text_file(
os.path.join(proc_path, pid, "comm")).strip()
except (FileNotFoundError, ProcessLookupError):
# Process was closed while running.
pass
else:
if pid_command == command:
pids.append(int(pid))
if (exclude_container_processes and
not running_in_container()):
return [
pid
for pid in pids
if not is_pid_in_container(pid, proc_path=proc_path)
]
else:
return pids
|