/usr/lib/python3/dist-packages/provisioningserver/ntp/config.py is in python3-maas-provisioningserver 2.4.0~beta2-6865-gec43e47e6-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 | # Copyright 2016 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""NTP service configuration."""
__all__ = [
"configure_rack",
"configure_region",
"normalise_address",
]
from functools import partial
from itertools import (
dropwhile,
groupby,
)
import re
from netaddr import (
AddrFormatError,
IPAddress,
)
from provisioningserver.path import get_tentative_data_path
from provisioningserver.utils.fs import sudo_write_file
_ntp_conf_name = "chrony/chrony.conf"
_ntp_maas_conf_name = "chrony/maas.conf"
def configure(servers, peers, offset):
"""Configure the local NTP server with the given time references.
This writes new ``chrony.chrony.conf`` and ``chrony.maas.conf`` files,
using ``sudo`` in production.
:param servers: An iterable of server addresses -- IPv4, IPv6, hostnames
-- to use as time references.
:param peers: An iterable of peer addresses -- IPv4, IPv6, hostnames -- to
use as time references.
:param offset: A relative stratum within MAAS's world. A region controller
would be 0 and a rack controller would be 1.
"""
ntp_maas_conf = _render_ntp_maas_conf(servers, peers, offset)
ntp_maas_conf_path = get_tentative_data_path("etc", _ntp_maas_conf_name)
sudo_write_file(
ntp_maas_conf_path,
ntp_maas_conf.encode("utf-8"),
mode=0o644)
ntp_conf = _render_ntp_conf(ntp_maas_conf_path)
ntp_conf_path = get_tentative_data_path("etc", _ntp_conf_name)
sudo_write_file(
ntp_conf_path,
ntp_conf.encode("utf-8"),
mode=0o644)
configure_region = partial(configure, offset=0)
configure_rack = partial(configure, offset=1)
def normalise_address(address):
"""Normalise an IP address into a form suitable for the `ntp` daemon.
It seems to prefer non-mapped IPv4 addresses, for example. Hostnames are
passed through.
"""
try:
address = IPAddress(address)
except AddrFormatError:
return address # Hostname.
else:
if address.is_ipv4_mapped():
return address.ipv4()
else:
return address
def _render_ntp_conf(includefile):
"""Render ``ntp.conf`` based on the existing configuration.
This configuration includes the file named by `includefile`.
"""
ntp_conf_path = get_tentative_data_path("etc", _ntp_conf_name)
with open(ntp_conf_path, "r", encoding="utf-8") as fd:
lines = _render_ntp_conf_from_source(fd, includefile)
return "".join(lines)
def _render_ntp_conf_from_source(lines, includefile):
"""Render the lines of a new ``ntp.conf`` from the given lines.
:param lines: An iterable of lines from an existing ``ntp.conf``.
:return: An iterable of lines.
"""
lines = _disable_existing_pools_and_servers(lines)
lines = _remove_maas_includefile_option(lines)
lines = _clean_whitespace(lines)
yield from lines # Has trailing blank line.
yield "include %s\n" % includefile
def _render_ntp_maas_conf(servers, peers, offset):
"""Render ``ntp.maas.conf`` for the given time references.
:param servers: An iterable of server addresses -- IPv4, IPv6, hostnames
-- to use as time references.
:param peers: An iterable of peer addresses -- IPv4, IPv6, hostnames -- to
use as time references.
:param offset: A relative stratum used when calculating the stratum for
orphan mode (https://chrony.tuxfamily.org/doc/3.2/chrony.conf.html).
"""
lines = ["# MAAS NTP configuration."]
servers = map(normalise_address, servers)
lines.extend(
"%s %s iburst" % (
("server" if isinstance(server, IPAddress) else "pool"), server)
for server in servers)
peers = map(normalise_address, peers)
lines.extend("peer %s" % peer for peer in peers)
# Chrony provides a special 'orphan' mode that is compatible
# with ntpd's 'tos orphan' mode. (see
# https://chrony.tuxfamily.org/doc/devel/chrony.conf.html)
lines.append("local stratum {:d} orphan".format(offset + 8))
# Chrony requires 'allow' option to specify which client IPs
# or Networks can use it as a time source. For now, allow all
# clients to be compatible to 'ntpd'. In the future, it would
# be nice to limit this similarly to how we do proxy. (see
# https://chrony.tuxfamily.org/doc/3.2/chrony.conf.html)
lines.append("allow")
lines.append("") # Add newline at end.
return "\n".join(lines)
_re_pool_or_server = re.compile(
r" ^ \s* (?: pool | server ) \b ",
re.VERBOSE)
def _is_pool_or_server_option(line):
"""Predicate: does the given line represent a pool or server option?"""
return _re_pool_or_server.match(line) is not None
def _disable_existing_pools_and_servers(lines):
"""Disable ``pool`` and ``server`` lines.
This comments-out each uncommented ``pool`` or ``server`` lines and adds a
comment that it was disabled by MAAS.
:param lines: An iterable of lines.
:return: An iterable of lines.
"""
for line in lines:
if _is_pool_or_server_option(line):
yield "# %s # Disabled by MAAS.\n" % line.strip()
else:
yield line
_re_maas_includefile = re.compile(
r" ^ \s* include \s+ .* \b %s \b " % re.escape(_ntp_maas_conf_name),
re.VERBOSE)
def _is_maas_includefile_option(line):
"""Predicate: does the given line represent a include of a MAAS file?"""
return _re_maas_includefile.match(line) is not None
def _remove_maas_includefile_option(lines):
"""Remove ``includefile`` lines referencing MAAS-managed files.
:param lines: An iterable of lines.
:return: An iterable of lines.
"""
for line in lines:
if not _is_maas_includefile_option(line):
yield line
def _is_line_blank(line):
"""Predicate: is the given line either empty or all whitespace?"""
return len(line) == 0 or line.isspace()
def _clean_whitespace(lines):
"""Remove leading blank lines then squash repeated blank lines.
:param lines: An iterable of lines.
:return: An iterable of lines.
"""
lines = dropwhile(_is_line_blank, lines)
for blank, lines in groupby(lines, _is_line_blank):
if not blank:
yield from lines
yield "\n"
|