This file is indexed.

/usr/lib/python3/dist-packages/provisioningserver/utils/pcap.py is in python3-maas-provisioningserver 2.4.0~beta2-6865-gec43e47e6-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# Copyright 2016 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Utilities for working with PCAP format streams."""

__all__ = [
    "PCAP",
    "PCAPError",
    "PCAPHeader",
    "PCAPPacketHeader"
]

from collections import namedtuple
from pprint import pprint
import struct
import sys

# See documentation here for explanation of magic numbers, formats, etc:
#     https://wiki.wireshark.org/Development/LibpcapFileFormat
PCAP_NATIVE_BYTE_ORDER_MAGIC_NUMBER = 0xa1b2c3d4
PCAP_HEADER_SIZE = 24
PCAP_PACKET_HEADER_SIZE = 16

PCAPHeader = namedtuple('PCAPHeader', (
    'magic_number',
    'pcap_version_major',
    'pcap_version_minor',
    'time_zone_gmt_offset',
    'timestamp_accuracy_sigfigs',
    'max_capture_bytes_per_packet',
    'data_link_type',
))

PCAPPacketHeader = namedtuple('PCAPPacketHeader', (
    'timestamp_seconds',
    'timestamp_microseconds',
    'bytes_captured',
    'original_packet_length',
))


class PCAPError(IOError):
    """Exception class for problems originating in the PCAP I/O stream."""
    pass


class PCAP:
    """Class to encapsulate reading from a stream of PCAP capture output.

    The capture output is assumed to have been obtained on the same machine
    as this class is running, so no byte-swapping is needed for compatibility
    with architectures of differing endianness.
    """

    def __init__(self, stream):
        """Initializes the PCAP stream reader with the specified stream.

        Stores the global PCAP header in self.global_header, as a namedtuple
        of: (magic_number, pcap_version_major, pcap_version_minor,
        time_zone_gmt_offset, timestamp_accuracy_sigfigs,
        max_capture_bytes_per_packet, data_link_type).
        :raise EOFError: If an empty stream was supplied.
        :raise PCAPError: If the PCAP stream was invalid.
       """
        super().__init__()
        self.stream = stream
        global_header_bytes = stream.read(PCAP_HEADER_SIZE)
        if len(global_header_bytes) == 0:
            raise EOFError("No PCAP output found.")
        if len(global_header_bytes) != PCAP_HEADER_SIZE:
            raise PCAPError("Unexpected end of PCAP stream: invalid header.")
        # typedef struct pcap_hdr_s {
        #     guint32 magic_number;   /* magic number */
        #     guint16 version_major;  /* major version number */
        #     guint16 version_minor;  /* minor version number */
        #     gint32  thiszone; /* GMT to local correction */
        #     guint32 sigfigs;  /* accuracy of timestamps */
        #     guint32 snaplen;  /* max length of captured packets, in octets */
        #     guint32 network;  /* data link type */
        # } pcap_hdr_t;
        self.global_header = PCAPHeader._make(
            struct.unpack('IHHiIII', global_header_bytes))
        if self.global_header[0] != PCAP_NATIVE_BYTE_ORDER_MAGIC_NUMBER:
            raise PCAPError("Stream is not in native PCAP format.")

    def read(self):
        """Reads a packet from the PCAP stream.

        :returns: a tuple of the format (pcap_packet_header, packet),
            where the pcap_packet_header is a namedtuple in the format:
            (timestamp_seconds, timestamp_microseconds, bytes_captured,
            original_packet_length), and the packet is of type `bytes` and
            matches the bytes_captured field in the tuple.
        :raise EOFError: If this is an attempt to read beyond the last packet.
        :raise PCAPError: If the PCAP stream was invalid.
        """
        pcap_packet_header_bytes = self.stream.read(PCAP_PACKET_HEADER_SIZE)
        if len(pcap_packet_header_bytes) == 0:
            raise EOFError("End of PCAP stream.")
        if len(pcap_packet_header_bytes) != PCAP_PACKET_HEADER_SIZE:
            raise PCAPError(
                "Unexpected end of PCAP stream: invalid packet header.")
        # typedef struct pcaprec_hdr_s {
        #     guint32 ts_sec;   /* timestamp seconds */
        #     guint32 ts_usec;  /* timestamp microseconds */
        #     guint32 incl_len; /* number of octets of packet saved in file */
        #     guint32 orig_len; /* actual length of packet */
        # } pcaprec_hdr_t;
        pcap_packet_header = PCAPPacketHeader._make(
            struct.unpack('IIII', pcap_packet_header_bytes))
        packet = self.stream.read(pcap_packet_header.bytes_captured)
        if len(packet) != pcap_packet_header.bytes_captured:
            raise PCAPError("Unexpected end of PCAP stream: invalid packet.")
        return pcap_packet_header, packet

    def __iter__(self):
        """Iterate this PCAP stream.

         Stops when EOF is encountered."""
        while True:
            try:
                yield self.read()
            except EOFError:
                break


def main():
    """Debug function for printing packets output by tcpdump on stdin.

    Example usage:
        cd src
        sudo tcpdump -i eth0 -U --immediate-mode -s 64 -n -w - arp \
        | python3 -m provisioningserver.utils.pcap
    """
    packet_stream = PCAP(sys.stdin.buffer)
    pprint(packet_stream.global_header)
    for packet in packet_stream:
        pprint(packet)


if __name__ == '__main__':
    main()