/usr/lib/python3/dist-packages/provisioningserver/utils/arp.py is in python3-maas-provisioningserver 2.4.0~beta2-6865-gec43e47e6-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 | # Copyright 2016 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Utilities for working with ARP packets."""
__all__ = [
"ARP",
"add_arguments",
"run"
]
from collections import namedtuple
from datetime import datetime
import json
import os
import stat
import struct
import subprocess
import sys
from textwrap import dedent
from netaddr import (
EUI,
IPAddress,
)
from provisioningserver.path import get_path
from provisioningserver.utils import sudo
from provisioningserver.utils.ethernet import (
Ethernet,
ETHERTYPE,
)
from provisioningserver.utils.network import (
bytes_to_int,
format_eui,
)
from provisioningserver.utils.pcap import (
PCAP,
PCAPError,
)
from provisioningserver.utils.script import ActionScriptError
# The SEEN_AGAIN_THRESHOLD is a time (in seconds) that determines how often
# to report (IP, MAC) bindings that have been seen again (or "REFRESHED").
# While it is important for MAAS to know about "NEW" and "MOVED" bindings
# immediately, "REFRESHED" bindings occur too often to be useful, and
# are thus throttled by this value.
SEEN_AGAIN_THRESHOLD = 600
# Definitions for ARP packet used with `struct`.
ARP_PACKET = '!hhBBh6sL6sL'
ARPPacket = namedtuple('ARPPacket', (
'hardware_type',
'protocol',
'hardware_length',
'protocol_length',
'operation',
'sender_mac',
'sender_ip',
'target_mac',
'target_ip',
))
SIZEOF_ARP_PACKET = 28
class ARP_OPERATION:
"""Enumeration to represent ARP operation types."""
REQUEST = 1
REPLY = 2
def __init__(self, operation):
super().__init__()
self.operation = operation
def __bytes__(self):
"""Returns the ARP operation in byte format.
The returned value will be padded to two bytes; suitable for placement
in an ARP packet.
"""
return bytes.fromhex("%04x" % self.operation)
def __str__(self):
if self.operation == 1:
extra = " (request)"
elif self.operation == 2:
extra = " (reply)"
else:
extra = ""
return "%d%s" % (self.operation, extra)
def __radd__(self, other):
"""Allows concatenating an ARP_OPERATION with `bytes`."""
if isinstance(other, bytes):
return other + bytes(self)
else:
raise NotImplementedError(
'ARP_OPERATION may only be added to `bytes`.')
class ARP:
"""Representation of an ARP packet."""
def __init__(
self, pkt_bytes, time=None, src_mac=None, dst_mac=None, vid=None):
"""
:param pkt_bytes: The input bytes of the ARP packet.
:type pkt_bytes: bytes
:param time: Timestamp packet was seen (seconds since epoch)
:type time: str
:param src_mac: Source MAC address from Ethernet header.
:type src_mac: bytes
:param dst_mac: Destination MAC address from Ethernet header.
:type dst_mac: bytes
:param vid: 802.1q VLAN ID (VID), or None if untagged.
:type vid: int
:return:
"""
packet = ARPPacket._make(
struct.unpack(ARP_PACKET, pkt_bytes[0:SIZEOF_ARP_PACKET]))
self.packet = packet
self.time = time
if src_mac is not None:
self.src_mac = EUI(bytes_to_int(src_mac))
else:
self.src_mac = None
if dst_mac is not None:
self.dst_mac = EUI(bytes_to_int(dst_mac))
else:
self.dst_mac = None
self.vid = vid
self.hardware_type = packet.hardware_type
self.protocol_type = packet.protocol
self.hardware_length = packet.hardware_length
self.protocol_length = packet.protocol_length
self.operation = packet.operation
self.sender_hardware_bytes = packet.sender_mac
self.sender_protocol_bytes = packet.sender_ip
self.target_hardware_bytes = packet.target_mac
self.target_protocol_bytes = packet.target_ip
@property
def source_eui(self):
"""Returns a netaddr.EUI representing the source MAC address."""
return EUI(bytes_to_int(self.sender_hardware_bytes))
@property
def target_eui(self):
"""Returns a netaddr.EUI representing the target MAC address."""
return EUI(bytes_to_int(self.target_hardware_bytes))
@property
def source_ip(self):
"""Returns a netaddr.IPAddress representing the source IP address."""
return IPAddress(self.sender_protocol_bytes)
@property
def target_ip(self):
"""Returns a netaddr.IPAddress representing the target IP address."""
return IPAddress(self.target_protocol_bytes)
def is_valid(self):
"""Only (Ethernet MAC, IPv4) bindings are currently supported. This
method ensures this ARP packet specifies those types.
"""
# http://www.iana.org/assignments/arp-parameters/arp-parameters.xhtml
# Hardware type 1 == Ethernet
if self.hardware_type != 1:
return False
# Protocol type 0x800 == IPv4 (this should match the Ethertype)
if self.protocol_type != 0x800:
return False
if self.hardware_length != 6:
return False
if self.protocol_length != 4:
return False
return True
def bindings(self):
"""Yields each (MAC, IP) binding found in this ARP packet."""
if not self.is_valid():
return
if self.operation == 1:
# This is an ARP request.
# We can find a binding in the (source_eui, source_ip)
source_ip = self.source_ip
source_eui = self.source_eui
if int(source_ip) != 0 and int(source_eui) != 0:
yield (source_ip, self.source_eui)
elif self.operation == 2:
# This is an ARP reply.
# We can find a binding in both the (source_eui, source_ip) and
# the (target_eui, target_ip).
source_ip = self.source_ip
source_eui = self.source_eui
target_ip = self.target_ip
target_eui = self.target_eui
if int(source_ip) != 0 and int(source_eui) != 0:
yield (source_ip, self.source_eui)
if int(target_ip) != 0 and int(target_eui) != 0:
yield (target_ip, self.target_eui)
def write(self, out=sys.stdout):
"""Output text-based details about this ARP packet to the specified
file or stream.
:param out: An object with a `write(str)` method.
"""
if self.time is not None:
out.write("ARP observed at %s:\n" % (
datetime.fromtimestamp(self.time)))
if self.vid is not None:
out.write(" 802.1q VLAN ID (VID): %s (0x%03x)\n" % (
self.vid, self.vid))
if self.src_mac is not None:
out.write(" Ethernet source: %s\n" % format_eui(
self.src_mac))
if self.dst_mac is not None:
out.write(" Ethernet destination: %s\n" % format_eui(
self.dst_mac))
out.write(" Hardware type: 0x%04x\n" % self.hardware_type)
out.write(" Protocol type: 0x%04x\n" % self.protocol_type)
out.write("Hardware address length: %d\n" % self.hardware_length)
out.write("Protocol address length: %d\n" % self.protocol_length)
out.write(" Operation: %s\n" % (
ARP_OPERATION(self.operation)))
out.write("Sender hardware address: %s\n" % (
format_eui(self.source_eui)))
out.write("Sender protocol address: %s\n" % self.source_ip)
out.write("Target hardware address: %s\n" % (
format_eui(self.target_eui)))
out.write("Target protocol address: %s\n" % self.target_ip)
out.write("\n")
def update_bindings_and_get_event(bindings, vid, ip, mac, time):
"""Update the specified bindings dictionary and returns a dictionary if the
information resulted in an update to the bindings. (otherwise, returns
None.)
If an event is returned, it will be a dictionary with the following fields:
ip - The IP address of the binding.
mac - The MAC address the IP was bound to.
previous_mac - (if the IP moved between MACs) The previous MAC that
was using the IP address.
time - The time (in seconds since the epoch) the binding was observed.
event - An event type; either "NEW", "MOVED", or "REFRESHED".
"""
if (vid, ip) in bindings:
binding = bindings[(vid, ip)]
if binding['mac'] != mac:
# Another MAC claimed ownership of this IP address. Update the
# MAC and emit a "MOVED" event.
previous_mac = binding['mac']
binding['mac'] = mac
binding['time'] = time
return (dict(
ip=str(ip), mac=format_eui(mac), time=time, event="MOVED",
previous_mac=format_eui(previous_mac), vid=vid))
elif time - binding['time'] >= SEEN_AGAIN_THRESHOLD:
binding['time'] = time
return dict(
ip=str(ip), mac=format_eui(mac), time=time,
event="REFRESHED", vid=vid)
else:
# The IP was found in the bindings dict, but within the
# SEEN_AGAIN_THRESHOLD. Don't update the record; the time field
# records the last time we emitted an event for this IP address.
return None
else:
# We haven't seen this IP before, so add a binding for it and
# emit a "NEW" event.
bindings[(vid, ip)] = {'mac': mac, 'time': time}
return dict(
ip=str(ip), mac=format_eui(mac), time=time,
event="NEW", vid=vid)
def update_and_print_bindings(bindings, arp, out=sys.stdout):
"""Update the specified bindings dictionary with the given ARP packet.
Output a JSON object on the specified stream (defaults to stdout) based on
the results of updating the binding.
"""
for ip, mac in arp.bindings():
event = update_bindings_and_get_event(
bindings, arp.vid, ip, mac, arp.time)
if event is not None:
out.write("%s\n" % json.dumps(event))
out.flush()
def observe_arp_packets(
verbose=False, bindings=False, input=sys.stdin.buffer,
output=sys.stdout):
"""Read stdin and look for tcpdump binary ARP output.
:param verbose: Output text-based ARP packet details.
:type verbose: bool
:param bindings: Track (MAC, IP) bindings, and print new/update bindings.
:type bindings: bool
:param input: Stream to read PCAP data from.
:type input: a file or stream supporting `read(int)`
:param output: Stream to write JSON data to.
:type input: a file or stream supporting `write(str)` and `flush()`.
"""
if bindings:
bindings = dict()
else:
bindings = None
try:
pcap = PCAP(input)
if pcap.global_header.data_link_type != 1:
# Not an Ethernet interface. Need to exit here, because our
# assumptions about the link layer header won't be correct.
return 4
for header, packet in pcap:
ethernet = Ethernet(packet, time=header.timestamp_seconds)
if not ethernet.is_valid():
# Ignore packets with a truncated Ethernet header.
continue
if len(ethernet.payload) < SIZEOF_ARP_PACKET:
# Ignore truncated ARP packets.
continue
if ethernet.ethertype != ETHERTYPE.ARP:
# Ignore non-ARP packets.
continue
arp = ARP(
ethernet.payload, src_mac=ethernet.src_mac,
dst_mac=ethernet.dst_mac, vid=ethernet.vid,
time=ethernet.time)
if bindings is not None:
update_and_print_bindings(bindings, arp, output)
if verbose:
arp.write()
except EOFError:
# Capture aborted before it could even begin. Note that this does not
# occur if the end-of-stream occurs normally. (In that case, the
# program will just exit.)
return 3
except PCAPError:
# Capture aborted due to an I/O error.
return 2
return None
def add_arguments(parser):
"""Add this command's options to the `ArgumentParser`.
Specified by the `ActionScript` interface.
"""
parser.description = dedent("""\
Observes the traffic on the specified interface, looking for ARP
traffic. Outputs JSON objects (one per line) for each NEW, REFRESHED,
or MOVED binding.
Reports on REFRESHED bindings at most once every ten minutes.
""")
parser.add_argument(
'-v', '--verbose', action='store_true', required=False,
help='Print verbose packet information.')
parser.add_argument(
'interface', type=str, nargs='?',
help="Ethernet interface from which to capture traffic. Optional if "
"an input file is specified.")
parser.add_argument(
'-i', '--input-file', type=str, required=False,
help="File to read PCAP output from. Use - for stdin. Default is to "
"call `sudo /usr/lib/maas/network-monitor` to get input.")
def run(args, output=sys.stdout, stdin=sys.stdin,
stdin_buffer=sys.stdin.buffer):
"""Observe an Ethernet interface and print ARP bindings."""
# First, become a progress group leader, so that signals can be directed
# to this process and its children; see p.u.twisted.terminateProcess.
os.setpgrp()
network_monitor = None
if args.input_file is None:
if args.interface is None:
raise ActionScriptError("Required argument: interface")
cmd = [get_path("/usr/lib/maas/network-monitor"), args.interface]
cmd = sudo(cmd)
network_monitor = subprocess.Popen(
cmd, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE)
infile = network_monitor.stdout
else:
if args.input_file == '-':
mode = os.fstat(stdin.fileno()).st_mode
if not stat.S_ISFIFO(mode):
raise ActionScriptError("Expected stdin to be a pipe.")
infile = stdin_buffer
else:
infile = open(args.input_file, "rb")
return_code = observe_arp_packets(
bindings=True, verbose=args.verbose, input=infile, output=output)
if return_code is not None:
raise SystemExit(return_code)
if network_monitor is not None:
return_code = network_monitor.poll()
if return_code is not None:
raise SystemExit(return_code)
|