/usr/lib/python2.7/dist-packages/provisioningserver/dhcp/leases_parser_fast.py is in python-maas-provisioningserver 1.5.4+bzr2294-0ubuntu1.2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | # Copyright 2013 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""A speedier version of `leases_parser`.
This extracts the relevant stanzas from a leases file, keeping only the
most recent "host" and "lease" entries, then uses the existing and
properly defined but slow parser to parse them. This massively speeds up
parsing a leases file that contains a modest number of unique host and
lease entries, but has become very large because of churn.
"""
from __future__ import (
absolute_import,
print_function,
unicode_literals,
)
str = None
__metaclass__ = type
__all__ = [
'parse_leases',
]
from collections import defaultdict
from datetime import datetime
from itertools import chain
import re
from provisioningserver.dhcp.leases_parser import (
get_host_mac,
has_expired,
is_host,
is_lease,
lease_parser,
)
re_entry = re.compile(
r'''
^\s* # Ignore leading whitespace on each line.
(host|lease) # Look only for host or lease stanzas.
\s+ # Mandatory whitespace.
([0-9a-fA-F.:]+) # Capture the IP/MAC address for this stanza.
\s*{ # Optional whitespace then an opening brace.
''',
re.MULTILINE | re.DOTALL | re.VERBOSE)
def find_lease_starts(leases_contents):
results = defaultdict(dict)
for match in re_entry.finditer(leases_contents):
stanza, address = match.groups()
results[stanza][address] = match.start()
return chain.from_iterable(
mapping.itervalues() for mapping in results.itervalues())
def extract_leases(leases_contents):
starts = find_lease_starts(leases_contents)
for start in sorted(starts):
record = lease_parser.scanString(leases_contents[start:])
try:
token, _, _ = next(record)
except StopIteration:
pass
else:
yield token
def parse_leases(leases_contents):
results = {}
now = datetime.utcnow()
for entry in extract_leases(leases_contents):
if is_lease(entry):
if not has_expired(entry, now):
results[entry.ip] = entry.hardware.mac
elif is_host(entry):
mac = get_host_mac(entry)
if mac is None:
# TODO: Test this.
if entry.ip in results:
del results[entry.ip]
else:
results[entry.ip] = mac
return results
|