This file is indexed.

/usr/lib/python2.7/dist-packages/maasserver/models/network.py is in python-django-maas 1.5.4+bzr2294-0ubuntu1.2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
# Copyright 2014 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Model for networks."""

from __future__ import (
    absolute_import,
    print_function,
    unicode_literals,
    )

str = None

__metaclass__ = type
__all__ = [
    'Network',
    'parse_network_spec',
    ]


from abc import (
    ABCMeta,
    abstractmethod,
    )

from django.core.exceptions import ValidationError
from django.core.validators import RegexValidator
from django.db.models import (
    CharField,
    GenericIPAddressField,
    Manager,
    Model,
    PositiveSmallIntegerField,
    TextField,
    )
from maasserver import DefaultMeta
from maasserver.models.cleansave import CleanSave
from maasserver.utils.network import make_network
from netaddr import IPAddress
from netaddr.core import AddrFormatError

# Network name validator.  Must consist of alphanumerical characters and/or
# dashes.
NETWORK_NAME_VALIDATOR = RegexValidator('^[\w-]+$')


def strip_type_tag(type_tag, specifier):
    """Return a network specifier minus its type tag."""
    prefix = type_tag + ':'
    assert specifier.startswith(prefix)
    return specifier[len(prefix):]


class NetworkSpecifier:
    """A :class:`NetworkSpecifier` identifies a :class:`Network`.

    For example, in placement constraints, a user may specify that a node
    must be attached to a certain network.  They identify the network through
    a network specifier, which may be its name (`dmz`), an IP address
    (`ip:10.12.0.0`), or a VLAN tag (`vlan:15` or `vlan:0xf`).

    Each type of network specifier has its own `NetworkSpecifier`
    implementation class.  The class constructor validates and parses a
    network specifier of its type, and the object knows how to retrieve
    whatever network it identifies from the database.
    """
    __metaclass__ = ABCMeta

    # Most network specifiers start with a type tag followed by a colon, e.g.
    # "ip:10.1.0.0".
    type_tag = None

    @abstractmethod
    def find_network(self):
        """Load the identified :class:`Network` from the database.

        :raise Network.DoesNotExist: If no network matched the specifier.
        :return: The :class:`Network`.
        """


class NameSpecifier(NetworkSpecifier):
    """Identify a network by its name.

    This type of network specifier has no type tag; it's just the name.  A
    network name cannot contain colon (:) characters.
    """

    def __init__(self, spec):
        NETWORK_NAME_VALIDATOR(spec)
        self.name = spec

    def find_network(self):
        return Network.objects.get(name=self.name)


class IPSpecifier(NetworkSpecifier):
    """Identify a network by any IP address it contains.

    The IP address is prefixed with a type tag `ip:`, e.g. `ip:10.1.1.0`.
    It can name any IP address within the network, including its base address,
    its broadcast address, or any host address that falls in its IP range.
    """
    type_tag = 'ip'

    def __init__(self, spec):
        ip_string = strip_type_tag(self.type_tag, spec)
        try:
            self.ip = IPAddress(ip_string)
        except AddrFormatError as e:
            raise ValidationError("Invalid IP address: %s." % e)

    def find_network(self):
        # Use all().  We could narrow down the database query, but not by a
        # lot -- and this will cache better.  The number of networks should
        # not be so large that querying them from the database becomes a
        # problem for the region controller.
        for network in Network.objects.all():
            if self.ip in network.get_network():
                # Networks don't overlap, so there can be only one.
                return network
        raise Network.DoesNotExist()


class VLANSpecifier(NetworkSpecifier):
    """Identify a network by its (nonzero) VLAN tag.

    This only applies to VLANs.  The VLAN tag is a numeric value prefixed with
    a type tag of `vlan:`, e.g. `vlan:12`.  Tags may also be given in
    hexadecimal form: `vlan:0x1a`.  This is case-insensitive.
    """
    type_tag = 'vlan'

    def __init__(self, spec):
        vlan_string = strip_type_tag(self.type_tag, spec)
        if vlan_string.lower().startswith('0x'):
            # Hexadecimal.
            base = 16
        else:
            # Decimal.
            base = 10
        try:
            self.vlan_tag = int(vlan_string, base)
        except ValueError:
            raise ValidationError("Invalid VLAN tag: '%s'." % vlan_string)
        if self.vlan_tag <= 0 or self.vlan_tag >= 0xfff:
            raise ValidationError("VLAN tag out of range (1-4094).")

    def find_network(self):
        return Network.objects.get(vlan_tag=self.vlan_tag)


SPECIFIER_CLASSES = [NameSpecifier, IPSpecifier, VLANSpecifier]

SPECIFIER_TAGS = {
    spec_class.type_tag: spec_class
    for spec_class in SPECIFIER_CLASSES
}


def get_specifier_type(specifier):
    """Obtain the specifier class that knows how to parse `specifier`.

    :raise ValidationError: If `specifier` does not match any accepted type of
        network specifier.
    :return: A concrete `NetworkSpecifier` subclass that knows how to parse
        `specifier`.
    """
    if ':' in specifier:
        type_tag, _ = specifier.split(':', 1)
    else:
        type_tag = None
    specifier_class = SPECIFIER_TAGS.get(type_tag)
    if specifier_class is None:
        raise ValidationError(
            "Invalid network specifier type: '%s'." % type_tag)
    return specifier_class


def parse_network_spec(spec):
    """Parse a network specifier; return it as a `NetworkSpecifier` object.

    :raise ValidationError: If `spec` is malformed.
    """
    specifier_class = get_specifier_type(spec)
    return specifier_class(spec)


class NetworkManager(Manager):
    """Manager for :class:`Network` model class.

    Don't import or instantiate this directly; access as `<Class>.objects` on
    the model class it manages.
    """

    def get_from_spec(self, spec):
        """Find a single `Network` from a given network specifier.

        :raise ValidationError: If `spec` is malformed.
        :raise Network.DoesNotExist: If the network specifier does not match
            any known network.
        :return: The one `Network` matching `spec`.
        """
        specifier = parse_network_spec(spec)
        try:
            return specifier.find_network()
        except Network.DoesNotExist:
            raise Network.DoesNotExist("No network matching '%s'." % spec)


class Network(CleanSave, Model):

    class Meta(DefaultMeta):
        """Needed for South to recognize this model."""

    objects = NetworkManager()

    name = CharField(
        unique=True, blank=False, editable=True, max_length=255,
        validators=[NETWORK_NAME_VALIDATOR],
        help_text="Identifying name for this network.")

    ip = GenericIPAddressField(
        blank=False, editable=True, unique=True, null=False,
        help_text="Network address (e.g. 192.168.1.0).")

    netmask = GenericIPAddressField(
        blank=False, editable=True, null=False,
        help_text="Network mask (e.g. 255.255.255.0).")

    vlan_tag = PositiveSmallIntegerField(
        editable=True, null=True, blank=True, unique=True,
        help_text="A 12-bit field specifying the VLAN to which the frame "
                  "belongs. The hexadecimal values of 0x000 and 0xFFF "
                  "are reserved. All other values may be used as VLAN "
                  "identifiers, allowing up to 4,094 VLANs. The reserved "
                  "value 0x000 indicates that the frame does not belong "
                  "to any VLAN; in this case, the 802.1Q tag specifies "
                  "only a priority and is referred to as a priority tag. "
                  "On bridges, VLAN 1 (the default VLAN ID) is often "
                  "reserved for a management VLAN; this is vendor-"
                  "specific.")

    description = TextField(
        blank=True, editable=True,
        help_text="Any short description to help users identify the network")

    def get_network(self):
        """Return self as :class:`IPNetwork`.

        :raise AddrFormatError: If the combination of `self.ip` and
            `self.netmask` is a malformed network address.
        """
        return make_network(self.ip, self.netmask)

    def get_connected_nodes(self):
        """Return the `QuerySet` of the nodes connected to this network.

        :rtype: `django.db.models.query.QuerySet`
        """
        # Circular imports.
        from maasserver.models import Node
        return Node.objects.filter(
            macaddress__in=self.macaddress_set.all()).distinct()

    def __unicode__(self):
        net = unicode(self.get_network().cidr)
        # A vlan_tag of zero normalises to None.  But __unicode__ may be
        # called while we're not in a clean state, so handle zero as well.
        no_tag = [0, None]
        if self.vlan_tag in no_tag:
            tag = ''
        else:
            tag = '(tag:%x)' % self.vlan_tag
        return '%s:%s%s' % (self.name, net, tag)

    def clean_vlan_tag(self):
        """Validator for `vlan_tag`."""
        if self.vlan_tag is None:
            # Always OK.
            return
        if self.vlan_tag == 0xFFF:
            raise ValidationError(
                {'vlan_tag': ["Cannot use reserved value 0xFFF."]})
        if self.vlan_tag < 0 or self.vlan_tag > 0xFFF:
            raise ValidationError(
                {'vlan_tag':
                    ["Value must be between 0x000 and 0xFFF (12 bits)"]})

    def clean_netmask(self):
        """Validator for `vlan_tag`."""
        # To see whether the netmask is well-formed, combine it with an
        # arbitrary valid IP address and see if IPNetwork's constructor
        # complains.
        try:
            make_network('10.1.1.1', self.netmask)
        except AddrFormatError as e:
            raise ValidationError({'netmask': [e.message]})

        if self.netmask == '0.0.0.0':
            raise ValidationError(
                {'netmask': ["This netmask would span the entire Internet."]})
        if self.netmask == '255.255.255.255':
            raise ValidationError(
                {'netmask': ["This netmask leaves no room for IP addresses."]})

    def clean_fields(self, *args, **kwargs):
        super(Network, self).clean_fields(*args, **kwargs)
        self.clean_vlan_tag()
        self.clean_netmask()

    def clean(self):
        super(Network, self).clean()
        try:
            net = self.get_network()
        except AddrFormatError as e:
            # This probably means that the netmask was invalid, in which case
            # it will have its own error, but in case it isn't, we can't let
            # this slide.
            raise ValidationError("Invalid network address: %s" % e)
        # Normalise self.ip.  This strips off any host bits from the address.
        self.ip = unicode(net.cidr.ip)
        # Normalise self.vlan_tag.  A zero value ("not a VLAN") becomes None.
        if self.vlan_tag == 0:
            self.vlan_tag = None

    def validate_unique(self, exclude=None):
        super(Network, self).validate_unique(exclude=exclude)
        if exclude is None:
            exclude = []

        if 'ip' not in exclude and 'netmask' not in exclude:
            # The ip and netmask have passed validation.  Now see if they don't
            # clash with any other networks.
            my_net = self.get_network()
            for other in Network.objects.all().exclude(id=self.id):
                other_net = other.get_network()
                if my_net in other_net or other_net in my_net:
                    # This has to get an error dict, not a simple error string,
                    # or Django throws a fit (bug 1299114).
                    message = (
                        "IP range clashes with network '%s'." % other.name)
                    raise ValidationError({
                        'ip': [message],
                        'netmask': [message],
                        })