/usr/share/pyshared/provisioningserver/dhcp/leases_parser.py is in python-maas-provisioningserver 1.2+bzr1373+dfsg-0ubuntu1~12.04.6.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 | # Copyright 2012 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Parser for ISC dhcpd leases file.
The parser is very minimal. All we really care about is which IP
addresses are currently associated with which respective MAC addresses.
The parser works out no other information than that, and does not
pretend to parse the full format of the leases file faithfully.
"""
from __future__ import (
absolute_import,
print_function,
unicode_literals,
)
__metaclass__ = type
__all__ = [
'parse_leases',
]
from datetime import datetime
from pyparsing import (
CaselessKeyword,
Dict,
Group,
oneOf,
QuotedString,
Regex,
restOfLine,
Suppress,
ZeroOrMore,
)
ip = Regex("[0-9]{1,3}(\.[0-9]{1,3}){3}")
mac = Regex("[0-9a-fA-F]{2}(:[0-9a-fA-F]{2}){5}")
hardware_type = Regex('[A-Za-z0-9_-]+')
args = Regex('[^"{;]+') | QuotedString('"')
expiry = Regex('[0-9]\s+[0-9/-]+\s+[0-9:]+') | 'never'
identifier = Regex("[A-Za-z_][0-9A-Za-z_-]*")
set_statement = (
CaselessKeyword('set') +
identifier +
Suppress('=') +
QuotedString('"'))
# For our purposes, leases and host declarations are similar enough that
# we can parse them as the same construct with different names.
lease_or_host = oneOf(['lease', 'host'], caseless=True)
hardware = CaselessKeyword("hardware") + hardware_type("type") + mac("mac")
ends = CaselessKeyword("ends") + expiry("expiry")
deleted = CaselessKeyword("deleted")
lone_statement_names = [
'abandoned',
'bootp',
'deleted',
'dynamic',
'reserved',
]
lone_statement = oneOf(lone_statement_names, caseless=True)
other_statement_names = [
'atsfp',
'binding',
'bootp',
'client-hostname',
'cltt',
'ddns-client-fqdn',
'ddns-fwd-name',
'ddns-rev-name',
'ddns-text',
'fixed-address',
'next',
'option',
'reserved',
'rewind',
'starts',
'tstp',
'tsfp',
'uid',
'vendor-class-identifier',
]
other_statement = oneOf(other_statement_names, caseless=True) + args
lease_statement = (
hardware | deleted | ends | set_statement | lone_statement |
other_statement
) + Suppress(';')
lease_parser = (
lease_or_host("lease_or_host") + ip("ip") +
Suppress('{') +
Dict(ZeroOrMore(Group(lease_statement))) +
Suppress('}')
)
lease_parser.ignore('#' + restOfLine)
def is_lease(entry):
"""Is `entry` a lease declaration?"""
entry_type = entry.lease_or_host.lower()
assert entry_type in {'host', 'lease'}, (
"Unknown entry type (not a host or lease): %s" % entry_type)
return entry_type == 'lease'
def is_host(entry):
"""Is `entry` a host declaration?"""
return not is_lease(entry)
def get_expiry_date(lease):
"""Get the expiry date for a lease, if any.
:param lease: A lease as returned by the parser.
:return: A UTC-based timestamp representing the lease's moment of
expiry, or None if the lease has no expiry date.
"""
assert is_lease(lease)
ends = getattr(lease, 'ends', None)
if ends is None or len(ends) == 0 or ends.lower() == 'never':
return None
else:
return datetime.strptime(ends, '%w %Y/%m/%d %H:%M:%S')
def has_expired(lease, now):
"""Has `lease` expired?
:param lease: A lease as returned by the parser.
:param now: The current UTC-based timestamp to check expiry against.
:return: Whether the lease has expired.
"""
assert is_lease(lease)
expiry_date = get_expiry_date(lease)
if expiry_date is None:
return False
else:
return expiry_date < now
def gather_leases(hosts_and_leases):
"""Find current leases among `hosts_and_leases`."""
now = datetime.utcnow()
# If multiple leases for the same address are valid at the same
# time, for whatever reason, the dict will contain the one that was
# last appended to the leases file.
return {
lease.ip: lease.hardware.mac
for lease in filter(is_lease, hosts_and_leases)
if not has_expired(lease, now)}
def get_host_mac(host):
"""Get the MAC address from a host declaration. A rubout has none."""
assert is_host(host)
if 'deleted' in host:
return None
hardware = getattr(host, 'hardware', None)
if hardware in (None, '', b''):
return None
else:
return hardware.mac
def gather_hosts(hosts_and_leases):
"""Find current host declarations among `hosts_and_leases`."""
# Get MAC address mappings for host entries. A newer entry
# overwrites an older one for the same IP address. A rubout entry
# will have no MAC address.
host_maps = {
host.ip: get_host_mac(host)
for host in filter(is_host, hosts_and_leases)}
# Now filter out mappings where the last entry was a rubout.
return {
ip: mac
for ip, mac in host_maps.items()
if mac is not None}
def combine_entries(entries):
"""Combine the hosts and leases declarations in a parsed leases file.
:param entries: Parsed host/leases entries from a leases file.
:return: A dict mapping leased IP addresses to the respective MAC
addresses that currently own them (regardless of whether they
were found in a lease or in a host declaration).
"""
leases = gather_leases(entries)
leases.update(gather_hosts(entries))
return leases
def parse_leases(leases_contents):
"""Parse contents of a leases file.
:param leases_contents: Contents (as unicode) of the leases file.
:return: A dict mapping each currently leased IP address to the MAC
address that it is associated with.
"""
entries = lease_parser.searchString(leases_contents)
return combine_entries(entries)
|