This file is indexed.

/usr/lib/python3/dist-packages/vmware_nsxlib/v3/router.py is in python3-vmware-nsxlib 12.0.1-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# Copyright 2015 VMware, Inc.
# All Rights Reserved
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

"""
NSX-V3 Plugin router module
"""
import copy

from oslo_log import log

from vmware_nsxlib._i18n import _
from vmware_nsxlib.v3 import exceptions
from vmware_nsxlib.v3 import nsx_constants
from vmware_nsxlib.v3 import utils

LOG = log.getLogger(__name__)

MIN_EDGE_NODE_NUM = 1

TIER0_ROUTER_LINK_PORT_NAME = "TIER0-RouterLinkPort"
TIER1_ROUTER_LINK_PORT_NAME = "TIER1-RouterLinkPort"
ROUTER_INTF_PORT_NAME = "Tier1-RouterDownLinkPort"

FIP_NAT_PRI = 900
GW_NAT_PRI = 1000


class RouterLib(object):

    def __init__(self, router_client, router_port_client, nsxlib):
        self._router_client = router_client
        self._router_port_client = router_port_client
        self.nsxlib = nsxlib

    def validate_tier0(self, tier0_groups_dict, tier0_uuid):
        err_msg = None
        try:
            lrouter = self._router_client.get(tier0_uuid)
        except exceptions.ResourceNotFound:
            err_msg = (_("Tier0 router %s not found at the backend. Either a "
                         "valid UUID must be specified or a default tier0 "
                         "router UUID must be configured in nsx.ini") %
                       tier0_uuid)
        else:
            edge_cluster_uuid = lrouter.get('edge_cluster_id')
            if not edge_cluster_uuid:
                err_msg = _("Failed to get edge cluster uuid from tier0 "
                            "router %s at the backend") % lrouter
            else:
                edge_cluster = self.nsxlib.edge_cluster.get(edge_cluster_uuid)
                member_index_list = [member['member_index']
                                     for member in edge_cluster['members']]
                if len(member_index_list) < MIN_EDGE_NODE_NUM:
                    err_msg = _("%(act_num)s edge members found in "
                                "edge_cluster %(cluster_id)s, however we "
                                "require at least %(exp_num)s edge nodes "
                                "in edge cluster for use.") % {
                        'act_num': len(member_index_list),
                        'exp_num': MIN_EDGE_NODE_NUM,
                        'cluster_id': edge_cluster_uuid}
        if err_msg:
            raise exceptions.NsxLibInvalidInput(error_message=err_msg)
        else:
            tier0_groups_dict[tier0_uuid] = {
                'edge_cluster_uuid': edge_cluster_uuid,
                'member_index_list': member_index_list}

    def add_router_link_port(self, tier1_uuid, tier0_uuid, tags):
        # Create Tier0 logical router link port
        t0_tags = copy.copy(tags)
        t0_tags = utils.add_v3_tag(t0_tags, 'os-tier0-uuid', tier0_uuid)
        tier0_link_port = self._router_port_client.create(
            tier0_uuid, display_name=TIER0_ROUTER_LINK_PORT_NAME, tags=t0_tags,
            resource_type=nsx_constants.LROUTERPORT_LINKONTIER0,
            logical_port_id=None,
            address_groups=None)
        linked_logical_port_id = tier0_link_port['id']
        # Create Tier1 logical router link port
        t1_tags = copy.copy(tags)
        t1_tags = utils.add_v3_tag(t1_tags, 'os-tier1-uuid', tier1_uuid)
        self._router_port_client.create(
            tier1_uuid, display_name=TIER1_ROUTER_LINK_PORT_NAME, tags=t1_tags,
            resource_type=nsx_constants.LROUTERPORT_LINKONTIER1,
            logical_port_id=linked_logical_port_id,
            address_groups=None)

    def remove_router_link_port(self, tier1_uuid, tier0_uuid):
        try:
            tier1_link_port = (
                self._router_port_client.get_tier1_link_port(tier1_uuid))
        except exceptions.ResourceNotFound:
            LOG.warning("Logical router link port for tier1 router: %s "
                        "not found at the backend", tier1_uuid)
            return
        tier1_link_port_id = tier1_link_port['id']
        tier0_link_port_id = (
            tier1_link_port['linked_logical_router_port_id'].get('target_id'))
        self._router_port_client.delete(tier1_link_port_id)
        self._router_port_client.delete(tier0_link_port_id)

    def update_advertisement(self, logical_router_id,
                             advertise_route_nat,
                             advertise_route_connected,
                             advertise_route_static=False,
                             enabled=True,
                             advertise_lb_vip=False,
                             advertise_lb_snat_ip=False):
        return self.nsxlib.logical_router.update_advertisement(
            logical_router_id,
            advertise_nat_routes=advertise_route_nat,
            advertise_nsx_connected_routes=advertise_route_connected,
            advertise_static_routes=advertise_route_static,
            enabled=enabled, advertise_lb_vip=advertise_lb_vip,
            advertise_lb_snat_ip=advertise_lb_snat_ip)

    def delete_gw_snat_rule(self, logical_router_id, gw_ip):
        """Delete router snat rule matching the gw ip

        assuming there is only one
        """
        return self.nsxlib.logical_router.delete_nat_rule_by_values(
            logical_router_id,
            translated_network=gw_ip)

    def delete_gw_snat_rule_by_source(self, logical_router_id, gw_ip,
                                      source_net, skip_not_found=False):
        """Delete router snat rule matching the gw ip & source"""
        return self.nsxlib.logical_router.delete_nat_rule_by_values(
            logical_router_id,
            translated_network=gw_ip,
            match_source_network=source_net,
            # Do not fail or warn if not found, unless asked for
            skip_not_found=skip_not_found, strict_mode=(not skip_not_found))

    def delete_gw_snat_rules(self, logical_router_id, gw_ip):
        """Delete all the snat rules on the router with a specific gw ip"""
        return self.nsxlib.logical_router.delete_nat_rule_by_values(
            logical_router_id,
            translated_network=gw_ip,
            # Do not fail or warn if not found
            skip_not_found=True, strict_mode=False)

    def add_gw_snat_rule(self, logical_router_id, gw_ip, source_net=None,
                         bypass_firewall=True):
        return self.nsxlib.logical_router.add_nat_rule(
            logical_router_id, action="SNAT",
            translated_network=gw_ip,
            source_net=source_net,
            rule_priority=GW_NAT_PRI,
            bypass_firewall=bypass_firewall)

    def update_router_edge_cluster(self, nsx_router_id, edge_cluster_uuid):
        return self._router_client.update(nsx_router_id,
                                          edge_cluster_id=edge_cluster_uuid)

    def create_logical_router_intf_port_by_ls_id(self, logical_router_id,
                                                 display_name,
                                                 tags,
                                                 ls_id,
                                                 logical_switch_port_id,
                                                 address_groups,
                                                 urpf_mode=None,
                                                 relay_service_uuid=None,
                                                 resource_type=None):
        try:
            port = self._router_port_client.get_by_lswitch_id(ls_id)
        except exceptions.ResourceNotFound:
            if resource_type is None:
                resource_type = nsx_constants.LROUTERPORT_DOWNLINK
            return self._router_port_client.create(
                logical_router_id,
                display_name,
                tags,
                resource_type,
                logical_switch_port_id,
                address_groups,
                urpf_mode=urpf_mode,
                relay_service_uuid=relay_service_uuid)
        else:
            return self._router_port_client.update(
                port['id'], subnets=address_groups,
                relay_service_uuid=relay_service_uuid)

    def add_fip_nat_rules(self, logical_router_id, ext_ip, int_ip,
                          match_ports=None, bypass_firewall=True):
        self.nsxlib.logical_router.add_nat_rule(
            logical_router_id, action="SNAT",
            translated_network=ext_ip,
            source_net=int_ip,
            rule_priority=FIP_NAT_PRI,
            bypass_firewall=bypass_firewall)
        self.nsxlib.logical_router.add_nat_rule(
            logical_router_id, action="DNAT",
            translated_network=int_ip,
            dest_net=ext_ip,
            rule_priority=FIP_NAT_PRI,
            match_ports=match_ports,
            bypass_firewall=bypass_firewall)

    def delete_fip_nat_rules_by_internal_ip(self, logical_router_id, int_ip):
        self.nsxlib.logical_router.delete_nat_rule_by_values(
            logical_router_id,
            action="SNAT",
            match_source_network=int_ip)
        self.nsxlib.logical_router.delete_nat_rule_by_values(
            logical_router_id,
            action="DNAT",
            translated_network=int_ip)

    def delete_fip_nat_rules(self, logical_router_id, ext_ip, int_ip):
        self.nsxlib.logical_router.delete_nat_rule_by_values(
            logical_router_id,
            action="SNAT",
            translated_network=ext_ip,
            match_source_network=int_ip)
        self.nsxlib.logical_router.delete_nat_rule_by_values(
            logical_router_id,
            action="DNAT",
            translated_network=int_ip,
            match_destination_network=ext_ip)

    def add_static_routes(self, nsx_router_id, route):
        return self.nsxlib.logical_router.add_static_route(
            nsx_router_id,
            route['destination'],
            route['nexthop'])

    def delete_static_routes(self, nsx_router_id, route):
        return self.nsxlib.logical_router.delete_static_route_by_values(
            nsx_router_id, dest_cidr=route['destination'],
            nexthop=route['nexthop'])