This file is indexed.

/usr/lib/python2.7/dist-packages/neutron_lib/api/validators/__init__.py is in python-neutron-lib 1.13.0-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import collections
import functools
import inspect
import re

import netaddr
from oslo_log import log as logging
from oslo_utils import netutils
from oslo_utils import strutils
from oslo_utils import uuidutils
import six

from neutron_lib._i18n import _
from neutron_lib import constants
from neutron_lib import exceptions as n_exc
from neutron_lib.plugins import directory


LOG = logging.getLogger(__name__)

# Used by range check to indicate no limit for a bound.
UNLIMITED = None

# Note: In order to ensure that the MAC address is unicast the first byte
# must be even.
MAC_PATTERN = "^%s[aceACE02468](:%s{2}){5}$" % (constants.HEX_ELEM,
                                                constants.HEX_ELEM)


def _verify_dict_keys(expected_keys, target_dict, strict=True):
    """Verify expected keys in a dictionary.

    :param expected_keys: A list of keys expected to be present.
    :param target_dict: The dictionary which should be verified.
    :param strict: Specifies whether additional keys are allowed to be present.
    :returns: None if the expected keys are found. Otherwise a human readable
        message indicating why the validation failed.
    """
    if not isinstance(target_dict, dict):
        msg = (_("Invalid input. '%(target_dict)s' must be a dictionary "
                 "with keys: %(expected_keys)s") %
               {'target_dict': target_dict, 'expected_keys': expected_keys})
        LOG.debug(msg)
        return msg

    expected_keys = set(expected_keys)
    provided_keys = set(target_dict.keys())

    predicate = expected_keys.__eq__ if strict else expected_keys.issubset

    if not predicate(provided_keys):
        msg = (_("Validation of dictionary's keys failed. "
                 "Expected keys: %(expected_keys)s "
                 "Provided keys: %(provided_keys)s") %
               {'expected_keys': expected_keys,
                'provided_keys': provided_keys})
        LOG.debug(msg)
        return msg


def _collect_duplicates(data_list):
    """Collects duplicate items from a list and returns them.

    :param data_list: A list of items to check for duplicates. The list may
        include dict items.
    :returns: A set of items that are duplicates in data_list. If no
        duplicates are found, the returned set is empty.
    """
    seen = []
    dups = set()
    for datum in data_list:
        if datum in seen:
            dups.add(datum)
            continue
        seen.append(datum)
    return dups


def is_attr_set(attribute):
    """Determine if an attribute value is set.

    :param attribute: The attribute value to check.
    :returns: False if the attribute value is None or ATTR_NOT_SPECIFIED,
        otherwise True.
    """
    return not (attribute is None or
                attribute is constants.ATTR_NOT_SPECIFIED)


def _validate_list_of_items(item_validator, data, *args, **kwargs):
    if not isinstance(data, list):
        msg = _("'%s' is not a list") % data
        return msg

    dups = _collect_duplicates(data)

    if dups:
        msg = _("Duplicate items in the list: '%s'") % ', '.join(
            [str(d) for d in dups])
        return msg

    for item in data:
        msg = item_validator(item, *args, **kwargs)
        if msg:
            return msg


def validate_values(data, valid_values=None, valid_values_display=None):
    """Validate that the provided 'data' is within 'valid_values'.

    :param data: The data to check within valid_values.
    :param valid_values: A collection of values that 'data' must be in to be
        valid. The collection can be any type that supports the 'in' operation.
    :param valid_values_display: A string to display that describes the valid
        values. This string is only displayed when an invalid value is
        encountered.
        If no string is provided, the string "valid_values" will be used.
    :returns: The message to return if data not in valid_values.
    :raises: TypeError if the values for 'data' or 'valid_values' are not
        compatible for comparison or doesn't have __contains__.
        If TypeError is raised this is considered a programming error and the
        inputs (data) and (valid_values) must be checked so this is never
        raised on validation.
    """

    # If valid_values is not specified we don't check against it.
    if valid_values is None:
        return

    # Check if we can use 'in' to find membership of data in valid_values
    contains = getattr(valid_values, "__contains__", None)
    if callable(contains):
        try:
            if data not in valid_values:
                valid_values_display = valid_values_display or 'valid_values'
                msg = (_("%(data)s is not in %(valid_values)s") %
                       {'data': data, 'valid_values': valid_values_display})
                LOG.debug(msg)
                return msg
        except TypeError:
                # This is a programming error
                msg = (_("'data' of type '%(typedata)s' and 'valid_values'"
                         "of type '%(typevalues)s' are not "
                         "compatible for comparison") %
                       {'typedata': type(data),
                        'typevalues': type(valid_values)})
                raise TypeError(msg)
    else:
        # This is a programming error
        msg = (_("'valid_values' does not support membership operations"))
        raise TypeError(msg)


def validate_not_empty_string_or_none(data, max_len=None):
    """Validate data is a non-empty string or None.

    :param data: The data to validate.
    :param max_len: An optional cap on the str length to validate.
    :returns: None if the data string is not None and is not an empty string,
        otherwise a human readable message as to why the string data is
        invalid.
    """
    if data is not None:
        return validate_not_empty_string(data, max_len=max_len)


def validate_not_empty_string(data, max_len=None):
    """Validate data is a non-empty/non-blank string.

    :param data: The data to validate.
    :param max_len: An optional cap on the length of the string data.
    :returns: None if the data is non-empty/non-blank, otherwise a human
        readable string message indicating why validation failed.
    """
    msg = validate_string(data, max_len=max_len)
    if msg:
        return msg
    if not data.strip():
        msg = _("'%s' Blank strings are not permitted") % data
        LOG.debug(msg)
        return msg


def validate_string_or_none(data, max_len=None):
    """Validate data is a string or None.

    :param data: The data to validate.
    :param max_len: An optional cap on the length of the string data.
    :returns: None if the data is None or a valid string, otherwise a human
        readable message indicating why validation failed.
    """
    if data is not None:
        return validate_string(data, max_len=max_len)


def validate_string(data, max_len=None):
    """Validate data is a string object optionally capping it length.

    :param data: The data to validate.
    :param max_len: An optional cap on the length of the string.
    :returns: None if the data is a valid string type and (optionally) within
        the given max_len. Otherwise a human readable message indicating why
        the data is invalid.
    """
    if not isinstance(data, six.string_types):
        msg = _("'%s' is not a valid string") % data
        LOG.debug(msg)
        return msg

    if max_len is not None and len(data) > max_len:
        msg = (_("'%(data)s' exceeds maximum length of %(max_len)s") %
               {'data': data, 'max_len': max_len})
        LOG.debug(msg)
        return msg


_validate_list_of_unique_strings = functools.partial(_validate_list_of_items,
                                                     validate_string)


# NOTE(boden): stubbed out for docstring comments.
def validate_list_of_unique_strings(data, max_len=None):
    """Validate data is a list of unique strings.

    :param data: The data to validate.
    :param max_len: An optional cap on the length of the string.
    :returns: None if the data is a list of non-empty/non-blank strings,
        otherwise a human readable message indicating why validation failed.
    """
    return _validate_list_of_unique_strings(data, max_len=max_len)


def validate_boolean(data, valid_values=None):
    """Validate data is a python bool compatible object.

    :param data: The data to validate.
    :param valid_values: Not used!
    :return: None if the value can be converted to a bool, otherwise a
        human readable message indicating why data is invalid.
    """
    try:
        strutils.bool_from_string(data, strict=True)
    except ValueError:
        msg = _("'%s' is not a valid boolean value") % data
        LOG.debug(msg)
        return msg


def validate_integer(data, valid_values=None):
    """This function validates if the data is an integer.

    It checks both number or string provided to validate it's an
    integer and returns a message with the error if it's not

    :param data: The string or number to validate as integer.
    :param valid_values: values to limit the 'data' to.
    :returns: None if data is an integer, otherwise a human readable message
        indicating why validation failed..
    """

    if valid_values is not None:
        msg = validate_values(data=data, valid_values=valid_values)
        if msg:
            return msg

    msg = _("'%s' is not an integer") % data
    try:
        fl_n = float(data)
        int_n = int(data)
    except (ValueError, TypeError, OverflowError):
        LOG.debug(msg)
        return msg

    # Fail test if non equal or boolean
    if fl_n != int_n:
        LOG.debug(msg)
        return msg
    elif isinstance(data, bool):
        msg = _("'%s' is not an integer:boolean") % data
        LOG.debug(msg)
        return msg


def validate_range(data, valid_values=None):
    """Check that integer value is within a range provided.

    Test is inclusive. Allows either limit to be ignored, to allow
    checking ranges where only the lower or upper limit matter.
    It is expected that the limits provided are valid integers or
    the value None.

    :param data: The data to validate.
    :param valid_values: A list of 2 elements where element 0 is the min
        value the int data can have and element 1 is the max.
    :returns: None if the data is a valid int in the given range, otherwise
        a human readable message as to why validation failed.
    """

    min_value = valid_values[0]
    max_value = valid_values[1]
    try:
        data = int(data)
    except (ValueError, TypeError):
        msg = _("'%s' is not an integer") % data
        LOG.debug(msg)
        return msg
    if min_value is not UNLIMITED and data < min_value:
        msg = _("'%(data)s' is too small - must be at least "
                "'%(limit)d'") % {'data': data, 'limit': min_value}
        LOG.debug(msg)
        return msg
    if max_value is not UNLIMITED and data > max_value:
        msg = _("'%(data)s' is too large - must be no larger than "
                "'%(limit)d'") % {'data': data, 'limit': max_value}
        LOG.debug(msg)
        return msg


def validate_range_or_none(data, valid_values=None):
    """Check that the provided value is none or a ranged integer"""

    if data is not None:
        return validate_range(data, valid_values)


def validate_no_whitespace(data):
    """Validates that input has no whitespace.

    :param data: The data to validate. Must be a python string type suitable
        for searching via regex.
    :returns: The data itself.
    :raises InvalidInput: If the data contains whitespace.
    """
    if re.search(r'\s', data):
        msg = _("'%s' contains whitespace") % data
        LOG.debug(msg)
        raise n_exc.InvalidInput(error_message=msg)
    return data


def validate_mac_address(data, valid_values=None):
    """Validate data is a MAC address.

    :param data: The data to validate.
    :param valid_values: Not used!
    :returns: None if the data is a valid MAC address, otherwise a human
        readable message as to why validation failed.
    """
    try:
        valid_mac = netaddr.valid_mac(validate_no_whitespace(data))
    except Exception:
        valid_mac = False

    if valid_mac:
        valid_mac = (not netaddr.EUI(data) in
                     map(netaddr.EUI, constants.INVALID_MAC_ADDRESSES))
    # TODO(arosen): The code in this file should be refactored
    # so it catches the correct exceptions. validate_no_whitespace
    # raises AttributeError if data is None.
    if not valid_mac:
        msg = _("'%s' is not a valid MAC address") % data
        LOG.debug(msg)
        return msg


def validate_mac_address_or_none(data, valid_values=None):
    """Validate data is a MAC address if the data isn't None.

    :param data: The data to validate.
    :param valid_values: Not used!
    :returns: None if the data is None or a valid MAC address, otherwise
        a human readable message indicating why validation failed.
    """
    if data is not None:
        return validate_mac_address(data, valid_values)


def validate_ip_address(data, valid_values=None):
    """Validate data is an IP address.

    :param data: The data to validate.
    :param valid_values: Not used!
    :returns: None if data is an IP address, otherwise a human readable
        message indicating why data isn't an IP address.
    """
    msg = None
    try:
        # netaddr.core.ZEROFILL is only applicable to IPv4.
        # it will remove leading zeros from IPv4 address octets.
        ip = netaddr.IPAddress(validate_no_whitespace(data),
                               flags=netaddr.core.ZEROFILL)
        # The followings are quick checks for IPv6 (has ':') and
        # IPv4.  (has 3 periods like 'xx.xx.xx.xx')
        # NOTE(yamamoto): netaddr uses libraries provided by the underlying
        # platform to convert addresses.  For example, inet_aton(3).
        # Some platforms, including NetBSD and OS X, have inet_aton
        # implementation which accepts more varying forms of addresses than
        # we want to accept here.  The following check is to reject such
        # addresses.  For Example:
        #   >>> netaddr.IPAddress('1' * 59)
        #   IPAddress('199.28.113.199')
        #   >>> netaddr.IPAddress(str(int('1' * 59) & 0xffffffff))
        #   IPAddress('199.28.113.199')
        #   >>>
        if ':' not in data and data.count('.') != 3:
            msg = _("'%s' is not a valid IP address") % data
        # A leading '0' in IPv4 address may be interpreted as an octal number,
        # e.g. 011 octal is 9 decimal. Since there is no standard saying
        # whether IP address with leading '0's should be interpreted as octal
        # or decimal, hence we reject leading '0's to avoid ambiguity.
        elif ip.version == 4 and str(ip) != data:
            msg = _("'%(data)s' is not an accepted IP address, "
                    "'%(ip)s' is recommended") % {"data": data, "ip": ip}
    except Exception:
        msg = _("'%s' is not a valid IP address") % data
    if msg:
        LOG.debug(msg)
    return msg


def _validate_any_key_spec(data, key_specs=None):
    """Validate a dict matches at least 1 key spec.

    :param data: The dict to validate.
    :param key_specs: An iterable collection of key spec dicts used to validate
        data.
    :returns: None if at least 1 of the key_specs matches data, otherwise
        a message is returned indicating data could not be matched with any
        of the key_specs.
    """
    for spec in key_specs:
        if validate_dict(data, spec) is None:
            return None
    msg = _("No valid key specs matched for: %s") % data
    LOG.debug(msg)
    return msg


def validate_any_key_specs_or_none(data, key_specs=None):
    """Validate each dict in a list matches at least 1 key_spec.

    :param data: The list of dicts to validate.
    :param key_specs: An iterable collection of key spec dicts that is used
        to check each dict in data.
    :returns: None.
    :raises InvalidInput: If any of the dicts in data do not match at least
        1 of the key_specs given.
    """
    if data is None:
        return

    def dict_validator(data_dict):
        msg = _validate_any_key_spec(data_dict, key_specs=key_specs)
        if msg:
            raise n_exc.InvalidInput(error_message=msg)

    msg = _validate_list_of_items(dict_validator, data)
    if msg:
        raise n_exc.InvalidInput(error_message=msg)


def validate_ip_pools(data, valid_values=None):
    """Validate that start and end IP addresses are present.

    In addition to this the IP addresses will also be validated.

    :param data: The data to validate. Must be a list-like structure of
        IP pool dicts that each have a 'start' and 'end' key value.
    :param valid_values: Not used!
    :returns: None if data is a valid list of IP pools, otherwise a message
        indicating why the data is invalid.
    """
    if not isinstance(data, list):
        msg = _("Invalid data format for IP pool: '%s'") % data
        LOG.debug(msg)
        return msg

    expected_keys = ['start', 'end']
    for ip_pool in data:
        msg = _verify_dict_keys(expected_keys, ip_pool)
        if msg:
            return msg
        for k in expected_keys:
            msg = validate_ip_address(ip_pool[k])
            if msg:
                return msg


def validate_fixed_ips(data, valid_values=None):
    """Validate data is a list of fixed IP dicts.

    In addition this function validates the ip_address and subnet_id
    if present in each fixed IP dict.

    :param data: The data to validate.
    :param valid_values: Not used!
    :returns: None if data is a valid list of fixed IP dicts. Otherwise a
        human readable message is returned indicating why validation failed.
    """
    if not isinstance(data, list):
        msg = _("Invalid data format for fixed IP: '%s'") % data
        LOG.debug(msg)
        return msg

    ips = []
    for fixed_ip in data:
        if not isinstance(fixed_ip, dict):
            msg = _("Invalid data format for fixed IP: '%s'") % fixed_ip
            LOG.debug(msg)
            return msg
        if 'ip_address' in fixed_ip:
            # Ensure that duplicate entries are not set - just checking IP
            # suffices. Duplicate subnet_id's are legitimate.
            fixed_ip_address = fixed_ip['ip_address']
            if fixed_ip_address in ips:
                msg = _("Duplicate IP address '%s'") % fixed_ip_address
                LOG.debug(msg)
            else:
                msg = validate_ip_address(fixed_ip_address)
            if msg:
                return msg
            ips.append(fixed_ip_address)
        if 'subnet_id' in fixed_ip:
            msg = validate_uuid(fixed_ip['subnet_id'])
            if msg:
                return msg


def validate_nameservers(data, valid_values=None):
    """Validate a list of unique IP addresses.

    :param data: The data to validate.
    :param valid_values: Not used!
    :returns: None if data is a list of valid IP addresses, otherwise
        a human readable message is returned indicating why validation failed.
    """
    if not hasattr(data, '__iter__'):
        msg = _("Invalid data format for nameserver: '%s'") % data
        LOG.debug(msg)
        return msg

    hosts = []
    for host in data:
        # This must be an IP address only
        msg = validate_ip_address(host)
        if msg:
            msg = _("'%(host)s' is not a valid nameserver. %(msg)s") % {
                'host': host, 'msg': msg}
            LOG.debug(msg)
            return msg
        if host in hosts:
            msg = _("Duplicate nameserver '%s'") % host
            LOG.debug(msg)
            return msg
        hosts.append(host)


def validate_hostroutes(data, valid_values=None):
    """Validate a list of unique host route dicts.

    :param data: The data to validate. To be valid it must be a list like
        structure of host route dicts, each containing 'destination' and
        'nexthop' key values.
    :param valid_values: Not used!
    :returns: None if data is a valid list of unique host route dicts,
        otherwise a human readable message indicating why validation failed.
    """
    if not isinstance(data, list):
        msg = _("Invalid data format for hostroute: '%s'") % data
        LOG.debug(msg)
        return msg

    expected_keys = ['destination', 'nexthop']
    hostroutes = []
    for hostroute in data:
        msg = _verify_dict_keys(expected_keys, hostroute)
        if msg:
            return msg
        msg = validate_subnet(hostroute['destination'])
        if msg:
            return msg
        msg = validate_ip_address(hostroute['nexthop'])
        if msg:
            return msg
        if hostroute in hostroutes:
            msg = _("Duplicate hostroute '%s'") % hostroute
            LOG.debug(msg)
            return msg
        hostroutes.append(hostroute)


def validate_ip_address_or_none(data, valid_values=None):
    """Validate data is an IP address or None.

    :param data: The data to validate.
    :param valid_values: An optional list of values data may take on.
    :return: None if data is None or a valid IP address, otherwise a
        human readable message indicating why the data is invalid.
    """
    if data is not None:
        return validate_ip_address(data, valid_values)


def validate_ip_or_subnet_or_none(data, valid_values=None):
    """Validate data is an IP address, a valid IP subnet string, or None.

    :param data: The data to validate.
    :param valid_values: Not used!
    :return: None if data is None or a valid IP address or a valid IP subnet,
        otherwise a human readable message indicating why the data is neither
        an IP address nor IP subnet.
    """
    msg_ip = validate_ip_address_or_none(data)
    msg_subnet = validate_subnet_or_none(data)
    if msg_ip is not None and msg_subnet is not None:
        return _("'%(data)s' is neither a valid IP address, nor "
                 "is it a valid IP subnet") % {'data': data}


def validate_subnet(data, valid_values=None):
    """Validate data is an IP network subnet string.

    :param data: The data to validate.
    :param valid_values: Not used!
    :returns: None if data is valid IP network address. Otherwise a human
        readable message as to why data is invalid.
    """
    msg = None
    try:
        net = netaddr.IPNetwork(validate_no_whitespace(data))
        if '/' not in data or (net.version == 4 and str(net) != data):
            msg = _("'%(data)s' isn't a recognized IP subnet cidr,"
                    " '%(cidr)s' is recommended") % {"data": data,
                                                     "cidr": net.cidr}
        else:
            return
    except Exception:
        msg = _("'%s' is not a valid IP subnet") % data
    if msg:
        LOG.debug(msg)
    return msg


def validate_subnet_or_none(data, valid_values=None):
    """Validate data is a valid subnet address string or None.

    :param data: The data to validate.
    :param valid_values: The optional list of values data may take on.
    :returns: None if data is None or a valid subnet, otherwise a human
        readable message as to why data is invalid.
    """
    if data is not None:
        return validate_subnet(data, valid_values)


_validate_subnet_list = functools.partial(_validate_list_of_items,
                                          validate_subnet)


# NOTE(boden): subbed out for docstring comments.
def validate_subnet_list(data, valid_values=None):
    """Validate data is a list of subnet dicts.

    :param data: The data to validate.
    :param valid_values: Not used!
    :returns: None if data is a valid list of subnet dicts, otherwise a human
        readable message as to why the data is invalid.
    """
    return _validate_subnet_list(data, valid_values)


def validate_regex(data, valid_values=None):
    """Validate data is matched against a regex.

    :param data: The data to validate.
    :param valid_values: The regular expression to use with re.match on
        the data.
    :returns: None if data contains matches for valid_values, otherwise a
        human readable message as to why data is invalid.
    """
    try:
        if re.match(valid_values, data):
            return
    except TypeError:
        pass

    msg = _("'%s' is not a valid input") % data
    LOG.debug(msg)
    return msg


def validate_regex_or_none(data, valid_values=None):
    """Validate data is None or matched against a regex.

    :param data: The data to validate.
    :param valid_values: The regular expression to use with re.match on
        the data.
    :returns: None if data is None or contains matches for valid_values,
        otherwise a human readable message as to why data is invalid.
    """
    if data is not None:
        return validate_regex(data, valid_values)


def validate_list_of_regex_or_none(data, valid_values=None):
    """Validate data is None or a list of items matching regex.

    :param data: A list of data to validate.
    :param valid_values: The regular expression to use with re.match on
        each element of the data.
    :returns: None if data is None or contains matches for valid_values,
        otherwise a human readable message as to why data is invalid.
    """
    if data is not None:
        return _validate_list_of_items(validate_regex, data, valid_values)


def validate_subnetpool_id(data, valid_values=None):
    """Validate data is valid subnet pool ID.

    :param data: The data to validate.
    :param valid_values: Not used!
    :returns: None if data is a valid subnet pool ID, otherwise a
        human readable message as to why it's invalid.
    """
    if data != constants.IPV6_PD_POOL_ID:
        return validate_uuid_or_none(data, valid_values)


def validate_subnetpool_id_or_none(data, valid_values=None):
    """Validate data is valid subnet pool ID or None.

    :param data: The data to validate.
    :param valid_values: Not used!
    :returns: None if data is a valid subnet pool ID or None, otherwise a
        human readable message as to why it's invalid.
    """
    if data is not None:
        return validate_subnetpool_id(data, valid_values)


def validate_uuid(data, valid_values=None):
    """Validate data is UUID like.

    :param data: The data to validate.
    :param valid_values: Not used!
    :returns: None if data is UUID like in form, otherwise a human readable
        message indicating why data is invalid.
    """
    if not uuidutils.is_uuid_like(data):
        msg = _("'%s' is not a valid UUID") % data
        LOG.debug(msg)
        return msg


def validate_uuid_or_none(data, valid_values=None):
    """Validate data is UUID like or None.

    :param data: The data to validate.
    :param valid_values: Not used!
    :returns: None if data is UUID like in form or None, otherwise a human
        readable message indicating why data is invalid.
    """
    if data is not None:
        return validate_uuid(data)


_validate_uuid_list = functools.partial(_validate_list_of_items,
                                        validate_uuid)


# NOTE(boden): subbed out for docstring comments.
def validate_uuid_list(data, valid_values=None):
    """Validate data is a list of UUID like values.

    :param data: The data to validate.
    :param valid_values: Not used!
    :returns: None if data is an iterable that contains valid UUID values,
        otherwise a message is returned indicating why validation failed.
    """
    return _validate_uuid_list(data, valid_values)


def _validate_dict_item(key, key_validator, data):
    # Find conversion function, if any, and apply it
    conv_func = key_validator.get('convert_to')
    if conv_func:
        data[key] = conv_func(data.get(key))
    # Find validator function
    # TODO(salv-orlando): Structure of dict attributes should be improved
    # to avoid iterating over items
    val_func = val_params = None
    for (k, v) in key_validator.items():
        if k.startswith('type:'):
            # ask forgiveness, not permission
            try:
                val_func = validators[k]
            except KeyError:
                msg = _("Validator '%s' does not exist") % k
                LOG.debug(msg)
                return msg
            val_params = v
            break
    # Process validation
    if val_func:
        return val_func(data.get(key), val_params)


def validate_dict(data, key_specs=None):
    """Validate data is a dict optionally containing a specific set of keys.

    :param data: The data to validate.
    :param key_specs: The optional list of keys that must be contained in
        data.
    :returns: None if data is a dict and (optionally) contains only key_specs.
        Otherwise a human readable message is returned indicating why data is
        not valid.
    """
    if not isinstance(data, dict):
        msg = _("'%s' is not a dictionary") % data
        LOG.debug(msg)
        return msg
    # Do not perform any further validation, if no constraints are supplied
    if not key_specs:
        return

    # Check whether all required keys are present
    required_keys = [key for key, spec in key_specs.items()
                     if spec.get('required')]

    if required_keys:
        msg = _verify_dict_keys(required_keys, data, False)
        if msg:
            return msg

    # Check whether unexpected keys are supplied in data
    unexpected_keys = [key for key in data if key not in key_specs]
    if unexpected_keys:
        msg = _("Unexpected keys supplied: %s") % ', '.join(unexpected_keys)
        LOG.debug(msg)
        return msg

    # Perform validation and conversion of all values
    # according to the specifications.
    for key, key_validator in [(k, v) for k, v in key_specs.items()
                               if k in data]:
        msg = _validate_dict_item(key, key_validator, data)
        if msg:
            return msg


def validate_dict_or_none(data, key_specs=None):
    """Validate data is None or a dict containing a specific set of keys.

    :param data: The data to validate.
    :param key_specs: The optional list of keys that must be contained in
        data.
    :returns: None if data is None or a dict  that (optionally) contains
        all key_specs. Otherwise a human readable message is returned
        indicating why data is not valid.
    """
    if data is not None:
        return validate_dict(data, key_specs)


def validate_dict_or_empty(data, key_specs=None):
    """Validate data is {} or a dict containing a specific set of keys.

    :param data: The data to validate.
    :param key_specs: The optional list of keys that must be contained in
        data.
    :returns: None if data is {} or a dict (optionally) containing
        only key_specs. Otherwise a human readable message is returned
        indicating why data is not valid.
    """
    if data != {}:
        return validate_dict(data, key_specs)


def validate_dict_or_nodata(data, key_specs=None):
    """Validate no data or a dict containing a specific set of keys.

    :param data: The data to validate. May be None.
    :param key_specs: The optional list of keys that must be contained in
        data.
    :returns: None if no data/empty dict or a dict and (optionally) contains
        all key_specs. Otherwise a human readable message is returned
        indicating why data is not valid.
    """
    if data:
        return validate_dict(data, key_specs)


def validate_non_negative(data, valid_values=None):
    """Validate data is a positive int.

    :param data: The data to validate
    :param valid_values: Not used!
    :returns: None if data is an int and is positive, otherwise a human
        readable message as to why data is invalid.
    """
    try:
        data = int(data)
    except (ValueError, TypeError):
        msg = _("'%s' is not an integer") % data
        LOG.debug(msg)
        return msg

    if data < 0:
        msg = _("'%s' should be non-negative") % data
        LOG.debug(msg)
        return msg


def validate_port_range_or_none(data, valid_values=None):
    """Validate data is a range of TCP/UDP port numbers

    :param data: The data to validate
    :param valid_values: Not used!
    :returns: None if data is an int between 0 and 65535, or two ints between 0
        and 65535 with a colon between them, otherwise a human readable message
        as to why data is invalid.
    """
    if data is None:
        return
    data = str(data)
    ports = data.split(':')
    if len(ports) > 2:
        msg = _("Port range must be two integers separated by a colon")
        LOG.debug(msg)
        return msg
    for p in ports:
        if len(p) == 0:
            msg = _("Port range must be two integers separated by a colon")
            LOG.debug(msg)
            return msg
        if not netutils.is_valid_port(p):
            msg = _("Invalid port: %s") % p
            LOG.debug(msg)
            return msg
    if len(ports) > 1 and int(ports[0]) > int(ports[1]):
        msg = _("First port in a port range must be lower than the second "
                "port")
        LOG.debug(msg)
        return msg


def validate_subports(data, valid_values=None):
    """Validate data is a list of subnet port dicts.

    :param data: The data to validate.
    :param valid_values: Not used!
    :returns: None if data is a list of subport dicts each with a unique valid
        port_id, segmentation_id and segmentation_type. Otherwise a human
        readable message is returned indicating why the data is invalid.
    """
    if not isinstance(data, list):
        msg = _("Invalid data format for subports: '%s' is not a list") % data
        LOG.debug(msg)
        return msg

    subport_ids = set()
    segmentations = collections.defaultdict(set)
    for subport in data:
        if not isinstance(subport, dict):
            msg = _("Invalid data format for subport: "
                    "'%s' is not a dict") % subport
            LOG.debug(msg)
            return msg

        # Expect a non duplicated and valid port_id for the subport
        if 'port_id' not in subport:
            msg = _("A valid port UUID must be specified")
            LOG.debug(msg)
            return msg
        elif validate_uuid(subport["port_id"]):
            msg = _("Invalid UUID for subport: '%s'") % subport["port_id"]
            return msg
        elif subport["port_id"] in subport_ids:
            msg = _("Non unique UUID for subport: '%s'") % subport["port_id"]
            return msg
        subport_ids.add(subport["port_id"])

        # Validate that both segmentation ID and segmentation type are
        # specified, and that the client does not duplicate segmentation
        # IDs (unless it is explicitly asked to inherit segmentation
        # details from the underlying subport's network).
        segmentation_type = subport.get("segmentation_type")
        if segmentation_type == 'inherit':
            return
        segmentation_id = subport.get("segmentation_id")
        if ((not segmentation_type or segmentation_id is None)
                and len(subport) > 1):
            msg = _("Invalid subport details '%s': missing segmentation "
                    "information. Must specify both segmentation_id and "
                    "segmentation_type") % subport
            LOG.debug(msg)
            return msg
        if segmentation_id in segmentations.get(segmentation_type, []):
            msg = _("Segmentation ID '%(seg_id)s' for '%(subport)s' is not "
                    "unique") % {"seg_id": segmentation_id,
                                 "subport": subport["port_id"]}
            LOG.debug(msg)
            return msg
        if segmentation_id is not None:
            segmentations[segmentation_type].add(segmentation_id)


def validate_service_plugin_type(data, valid_values=None):
    """Validates data is a valid service plugin.

    :param data: The service plugin type to validate.
    :param valid_values: Not used.
    :returns: None if data is a valid service plugin known to the plugin
        directory.
    :raises: InvalidServiceType if data is not a service known by the
        plugin directory.
    """
    if not directory.get_plugin(data):
        raise n_exc.InvalidServiceType(service_type=data)


# Dictionary that maintains a list of validation functions
validators = {'type:dict': validate_dict,
              'type:dict_or_none': validate_dict_or_none,
              'type:dict_or_empty': validate_dict_or_empty,
              'type:dict_or_nodata': validate_dict_or_nodata,
              'type:fixed_ips': validate_fixed_ips,
              'type:hostroutes': validate_hostroutes,
              'type:ip_address': validate_ip_address,
              'type:ip_address_or_none': validate_ip_address_or_none,
              'type:ip_or_subnet_or_none': validate_ip_or_subnet_or_none,
              'type:ip_pools': validate_ip_pools,
              'type:list_of_regex_or_none': validate_list_of_regex_or_none,
              'type:mac_address': validate_mac_address,
              'type:mac_address_or_none': validate_mac_address_or_none,
              'type:nameservers': validate_nameservers,
              'type:non_negative': validate_non_negative,
              'type:port_range': validate_port_range_or_none,
              'type:range': validate_range,
              'type:range_or_none': validate_range_or_none,
              'type:regex': validate_regex,
              'type:regex_or_none': validate_regex_or_none,
              'type:string': validate_string,
              'type:string_or_none': validate_string_or_none,
              'type:not_empty_string': validate_not_empty_string,
              'type:not_empty_string_or_none':
              validate_not_empty_string_or_none,
              'type:subnet': validate_subnet,
              'type:subnet_list': validate_subnet_list,
              'type:subnet_or_none': validate_subnet_or_none,
              'type:subnetpool_id': validate_subnetpool_id,
              'type:subnetpool_id_or_none': validate_subnetpool_id_or_none,
              'type:subports': validate_subports,
              'type:uuid': validate_uuid,
              'type:uuid_or_none': validate_uuid_or_none,
              'type:uuid_list': validate_uuid_list,
              'type:values': validate_values,
              'type:boolean': validate_boolean,
              'type:integer': validate_integer,
              'type:list_of_unique_strings': validate_list_of_unique_strings,
              'type:list_of_any_key_specs_or_none':
                  validate_any_key_specs_or_none,
              'type:service_plugin_type': validate_service_plugin_type}


def _to_validation_type(validation_type):
    return (validation_type
            if validation_type.startswith('type:')
            else 'type:' + validation_type)


def get_validator(validation_type, default=None):
    """Get a registered validator by type.

    :param validation_type: The type to retrieve the validator for.
    :param default: A default value to return if the validator is
        not registered.
    :return: The validator if registered, otherwise the default value.
    """
    return validators.get(_to_validation_type(validation_type), default)


def add_validator(validation_type, validator):
    """Dynamically add a validator.

    This can be used by clients to add their own, private validators, rather
    than directly modifying the data structure. The clients can NOT modify
    existing validators.
    """
    key = _to_validation_type(validation_type)
    if key in validators:
        # NOTE(boden): imp.load_source() forces module reinitialization that
        # can lead to validator redefinition from the same call site
        if inspect.getsource(validator) != inspect.getsource(validators[key]):
            msg = _("Validator type %s is already defined") % validation_type
            raise KeyError(msg)
        return
    validators[key] = validator