This file is indexed.

/usr/lib/python3/dist-packages/provisioningserver/rackdservices/dhcp_probe_service.py is in python3-maas-provisioningserver 2.4.0~beta2-6865-gec43e47e6-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# Copyright 2014-2016 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

""" DHCP probing service."""

__all__ = [
    "DHCPProbeService",
    ]

from datetime import timedelta
import socket

from provisioningserver.config import is_dev_environment
from provisioningserver.dhcp.detect import probe_interface
from provisioningserver.logger import (
    get_maas_logger,
    LegacyLogger,
)
from provisioningserver.rpc.exceptions import NoConnectionsAvailable
from provisioningserver.rpc.region import ReportForeignDHCPServer
from provisioningserver.utils.network import (
    get_all_interfaces_definition,
    has_ipv4_address,
)
from provisioningserver.utils.twisted import (
    pause,
    retries,
)
from twisted.application.internet import TimerService
from twisted.internet.defer import (
    inlineCallbacks,
    maybeDeferred,
)
from twisted.internet.threads import deferToThread
from twisted.protocols.amp import UnhandledCommand


maaslog = get_maas_logger("dhcp.probe")
log = LegacyLogger()


class DHCPProbeService(TimerService):
    """Service to probe for DHCP servers on the rack controller interface's.

    Built on top of Twisted's `TimerService`.

    :param reactor: An `IReactor` instance.
    """

    check_interval = timedelta(minutes=10).total_seconds()

    def __init__(self, client_service, reactor):
        # Call self.try_probe_dhcp() every self.check_interval.
        super(DHCPProbeService, self).__init__(
            self.check_interval, self.try_probe_dhcp)
        self.clock = reactor
        self.client_service = client_service

    def log(self, *args, **kwargs):
        log.msg(*args, **kwargs, system=type(self).__name__)

    def err(self, *args, **kwargs):
        log.err(*args, **kwargs, system=type(self).__name__)

    def _get_interfaces(self):
        """Return the interfaces for this rack controller."""
        d = deferToThread(get_all_interfaces_definition)
        d.addCallback(lambda interfaces: [
            name
            for name, info in interfaces.items()
            # No IPv4 address (unfortunately) means that MAAS cannot probe
            # this interface. This is because ultimately the DHCP probe
            # mechanism must send out a unicast UDP packet from the
            # interface in order to receive a response, and the TCP/IP
            # stack will drop packets coming back from the server if an
            # unexpected address is used.
            if info["enabled"] and has_ipv4_address(interfaces, name)
        ])
        return d

    def _inform_region_of_dhcp(self, client, name, dhcp_ip):
        """Tell the region about the DHCP server.

        :param client: The RPC client to use.
        :param name: The name of the network interface where the rogue
            DHCP server was found.
        :param dhcp_ip: The IP address of the DHCP server.
        """

        def eb_unhandled(failure):
            failure.trap(UnhandledCommand)
            # Not a lot we can do here... The region doesn't support
            # this method yet.
            maaslog.error(
                "Unable to inform region of DHCP server: the region "
                "does not yet support the ReportForeignDHCPServer RPC "
                "method.")

        d = client(
            ReportForeignDHCPServer, system_id=client.localIdent,
            interface_name=name, dhcp_ip=dhcp_ip)
        d.addErrback(eb_unhandled)
        return d

    @inlineCallbacks
    def _tryGetClient(self):
        client = None
        for elapsed, remaining, wait in retries(15, 5, self.clock):
            try:
                client = yield self.client_service.getClientNow()
                break
            except NoConnectionsAvailable:
                yield pause(wait, self.clock)
        return client

    @inlineCallbacks
    def probe_dhcp(self):
        """Find all the interfaces on this rack controller and probe for
        DHCP servers.
        """
        client = yield self._tryGetClient()
        if client is None:
            maaslog.error(
                "Can't initiate DHCP probe; no RPC connection to region.")
            return

        # Iterate over interfaces and probe each one.
        interfaces = yield self._get_interfaces()
        self.log(
            "Probe for external DHCP servers started on interfaces: %s." % (
                ', '.join(interfaces)))
        for interface in interfaces:
            try:
                servers = yield maybeDeferred(probe_interface, interface)
            except socket.error as e:
                error = (
                    "Failed to probe for external DHCP servers on interface "
                    "'%s'." % interface)
                if is_dev_environment():
                    error += " (Did you configure authbind per HACKING.rst?)"
                self.err(e, error)
                continue
            else:
                if len(servers) > 0:
                    # XXX For now, only send the region one server, since
                    # it can only track one per VLAN (this could be considered
                    # a bug).
                    yield self._inform_region_of_dhcp(
                        client, interface, servers.pop())
                else:
                    yield self._inform_region_of_dhcp(
                        client, interface, None)
        self.log("External DHCP probe complete.")

    @inlineCallbacks
    def try_probe_dhcp(self):
        maaslog.debug("Running periodic DHCP probe.")
        try:
            yield self.probe_dhcp()
        except Exception as error:
            maaslog.error(
                "Unable to probe for DHCP servers: %s",
                str(error))
            self.err(error, "Unable to probe for DHCP servers.")
        else:
            maaslog.debug("Finished periodic DHCP probe.")