This file is indexed.

/usr/share/pyshared/provisioningserver/tftp.py is in python-maas-provisioningserver 1.2+bzr1373+dfsg-0ubuntu1~12.04.6.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# Copyright 2012 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Twisted Application Plugin for the MAAS TFTP server."""

from __future__ import (
    absolute_import,
    print_function,
    unicode_literals,
    )

__metaclass__ = type
__all__ = [
    "TFTPBackend",
    ]

import httplib
from io import BytesIO
from itertools import repeat
import json
import re
from urllib import urlencode
from urlparse import (
    parse_qsl,
    urlparse,
    )

from provisioningserver.cluster_config import get_cluster_uuid
from provisioningserver.enum import ARP_HTYPE
from provisioningserver.kernel_opts import KernelParameters
from provisioningserver.pxe.config import render_pxe_config
from provisioningserver.utils import deferred
from tftp.backend import (
    FilesystemSynchronousBackend,
    IReader,
    )
from tftp.errors import FileNotFound
from twisted.python.context import get
from twisted.web.client import getPage
import twisted.web.error
from zope.interface import implementer


@implementer(IReader)
class BytesReader:

    def __init__(self, data):
        super(BytesReader, self).__init__()
        self.buffer = BytesIO(data)
        self.size = len(data)

    def read(self, size):
        return self.buffer.read(size)

    def finish(self):
        self.buffer.close()


class TFTPBackend(FilesystemSynchronousBackend):
    """A partially dynamic read-only TFTP server.

    Static files such as kernels and initrds, as well as any non-MAAS files
    that the system may already be set up to serve, are served up normally.
    But PXE configurations are generated on the fly.

    When a PXE configuration file is requested, the server asynchronously
    requests the appropriate parameters from the API (at a configurable
    "generator URL") and generates a config file based on those.

    The regular expressions `re_config_file` and `re_mac_address` specify
    which files the server generates on the fly.  Any other requests are
    passed on to the filesystem.

    Passing requests on to the API must be done very selectively, because
    failures cause the boot process to halt. This is why the expression for
    matching the MAC address is so narrowly defined: PXELINUX attempts to
    fetch files at many similar paths which must not be passed on.
    """

    get_page = staticmethod(getPage)
    render_pxe_config = staticmethod(render_pxe_config)

    # PXELINUX represents a MAC address in IEEE 802 hyphen-separated
    # format.  See http://www.syslinux.org/wiki/index.php/PXELINUX.
    re_mac_address_octet = r'[0-9a-f]{2}'
    re_mac_address = re.compile(
        "-".join(repeat(re_mac_address_octet, 6)))

    # We assume that the ARP HTYPE (hardware type) that PXELINUX sends is
    # always Ethernet.
    re_config_file = re.compile(
        r'''
        # Optional leading slash(es).
        ^/*
        pxelinux[.]cfg    # PXELINUX expects this.
        /
        (?: # either a MAC
            {htype:02x}    # ARP HTYPE.
            -
            (?P<mac>{re_mac_address.pattern})    # Capture MAC.
        | # or "default"
            default
              (?: # perhaps with specified arch, with a separator of either '-'
                # or '.', since the spec was changed and both are unambiguous
                [.-](?P<arch>\w+) # arch
                (?:-(?P<subarch>\w+))? # optional subarch
              )?
        )
        $
        '''.format(
            htype=ARP_HTYPE.ETHERNET,
            re_mac_address=re_mac_address),
        re.VERBOSE)

    def __init__(self, base_path, generator_url):
        """
        :param base_path: The root directory for this TFTP server.
        :param generator_url: The URL which can be queried for the PXE
            config. See `get_generator_url` for the types of queries it is
            expected to accept.
        """
        super(TFTPBackend, self).__init__(
            base_path, can_read=True, can_write=False)
        self.generator_url = urlparse(generator_url)

    def get_generator_url(self, params):
        """Calculate the URL, including query, from which we can fetch
        additional configuration parameters.

        :param params: A dict, or iterable suitable for updating a dict, of
            additional query parameters.
        """
        query = {}
        # Merge parameters from the generator URL.
        query.update(parse_qsl(self.generator_url.query))
        # Merge parameters obtained from the request.
        query.update(params)
        # Merge updated query into the generator URL.
        url = self.generator_url._replace(query=urlencode(query))
        # TODO: do something more intelligent with unicode URLs here; see
        # apiclient.utils.ascii_url() for inspiration.
        return url.geturl().encode("ascii")

    @deferred
    def get_kernel_params(self, params):
        """Return kernel parameters obtained from the API.

        :param params: Parameters so far obtained, typically from the file
            path requested.
        :return: A `KernelParameters` instance.
        """
        url = self.get_generator_url(params)

        def reassemble(data):
            return KernelParameters(**data)

        d = self.get_page(url)
        d.addCallback(json.loads)
        d.addCallback(reassemble)
        return d

    @deferred
    def get_config_reader(self, params):
        """Return an `IReader` for a PXE config.

        :param params: Parameters so far obtained, typically from the file
            path requested.
        """
        def generate_config(kernel_params):
            config = self.render_pxe_config(
                kernel_params=kernel_params, **params)
            return config.encode("utf-8")

        d = self.get_kernel_params(params)
        d.addCallback(generate_config)
        d.addCallback(BytesReader)
        return d

    @staticmethod
    def get_page_errback(failure, file_name):
        failure.trap(twisted.web.error.Error)
        # This twisted.web.error.Error.status object ends up being a
        # string for some reason, but the constants we can compare against
        # (both in httplib and twisted.web.http) are ints.
        try:
            status_int = int(failure.value.status)
        except ValueError:
            # Assume that it's some other error and propagate it
            return failure

        if status_int == httplib.NO_CONTENT:
            # Convert HTTP No Content to a TFTP file not found
            raise FileNotFound(file_name)
        else:
            # Otherwise propogate the unknown error
            return failure

    @deferred
    def get_reader(self, file_name):
        """See `IBackend.get_reader()`.

        If `file_name` matches `re_config_file` then the response is obtained
        from a server. Otherwise the filesystem is used to service the
        response.
        """
        config_file_match = self.re_config_file.match(file_name)
        if config_file_match is None:
            return super(TFTPBackend, self).get_reader(file_name)
        else:
            # Do not include any element that has not matched (ie. is None)
            params = {
                key: value
                for key, value in config_file_match.groupdict().items()
                if value is not None
                }
            # Send the local and remote endpoint addresses.
            local_host, local_port = get("local", (None, None))
            params["local"] = local_host
            remote_host, remote_port = get("remote", (None, None))
            params["remote"] = remote_host
            params["cluster_uuid"] = get_cluster_uuid()
            d = self.get_config_reader(params)
            d.addErrback(self.get_page_errback, file_name)
            return d