This file is indexed.

/usr/lib/python3/dist-packages/scapy/route6.py is in python3-scapy 0.23-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
## This file is part of Scapy
## See http://www.secdev.org/projects/scapy for more informations
## Copyright (C) Philippe Biondi <phil@secdev.org>
## This program is published under a GPLv2 license

## Copyright (C) 2005  Guillaume Valadon <guedou@hongo.wide.ad.jp>
##                     Arnaud Ebalard <arnaud.ebalard@eads.net>

"""
Routing and network interface handling for IPv6.
"""

#############################################################################
#############################################################################
###                      Routing/Interfaces stuff                         ###
#############################################################################
#############################################################################

import socket
from .config import conf
from .utils6 import *
from .arch import *


class Route6:

    def __init__(self):
        self.invalidate_cache()
        self.resync()

    def invalidate_cache(self):
        self.cache = {}

    def flush(self):
        self.invalidate_cache()
        self.routes = []

    def resync(self):
        # TODO : At the moment, resync will drop existing Teredo routes
        #        if any. Change that ...
        self.invalidate_cache()
        self.routes = read_routes6()
        if self.routes == []:
             log_loading.info("No IPv6 support in kernel")
        
    def __repr__(self):
        rtlst = [('Destination', 'Next Hop', "iface", "src candidates")]

        for net,msk,gw,iface,cset in self.routes:
            rtlst.append(('%s/%i'% (net,msk), gw, iface, ", ".join(cset)))

        #colwidth = map(lambda x: max(map(lambda y: len(y), x)), apply(zip, rtlst))
        zipped_rtlst = list(zip(*rtlst))
        colwidth = [ max([len(y) for y in x]) for x in zipped_rtlst]
        fmt = "  ".join(map(lambda x: "%%-%ds"%x, colwidth))
        rt = "\n".join([ fmt % x for x in rtlst])

        return rt


    # Unlike Scapy's Route.make_route() function, we do not have 'host' and 'net'
    # parameters. We only have a 'dst' parameter that accepts 'prefix' and 
    # 'prefix/prefixlen' values.
    # WARNING: Providing a specific device will at the moment not work correctly.
    def make_route(self, dst, gw=None, dev=None):
        """Internal function : create a route for 'dst' via 'gw'.
        """
        prefix, plen = (dst.split("/")+["128"])[:2]
        plen = int(plen)

        if gw is None:
            gw = "::"
        if dev is None:
            dev, ifaddr, x = self.route(gw)
        else:
            # TODO: do better than that
            # replace that unique address by the list of all addresses
            lifaddr = in6_getifaddr()             
            #filter(lambda x: x[2] == dev, lifaddr)
            devaddrs = [ i for i in lifaddr if i[2] == dev] 
            ifaddr = construct_source_candidate_set(prefix, plen, devaddrs, LOOPBACK_NAME)

        return (prefix, plen, gw, dev, ifaddr)

    
    def add(self, *args, **kargs):
        """Ex:
        add(dst="2001:db8:cafe:f000::/56")
        add(dst="2001:db8:cafe:f000::/56", gw="2001:db8:cafe::1")
        add(dst="2001:db8:cafe:f000::/64", gw="2001:db8:cafe::1", dev="eth0")
        """
        self.invalidate_cache()
        self.routes.append(self.make_route(*args, **kargs))


    def delt(self, dst, gw=None):
        """ Ex: 
        delt(dst="::/0") 
        delt(dst="2001:db8:cafe:f000::/56") 
        delt(dst="2001:db8:cafe:f000::/56", gw="2001:db8:deca::1") 
        """
        tmp = dst+b"/128"
        dst, plen = tmp.split(b'/')[:2]
        dst = in6_ptop(dst)
        plen = int(plen)
        #l = filter(lambda x: in6_ptop(x[0]) == dst and x[1] == plen, self.routes)
        l = [ x for x in self.routes if in6_ptop(x[0]) == dst and x[1] == plen ]
        if gw:
            gw = in6_ptop(gw)
            #l = filter(lambda x: in6_ptop(x[0]) == gw, self.routes)
            l = [ x for x in self.routes if in6_ptop(x[0]) == gw ]
        if len(l) == 0:
            warning("No matching route found")
        elif len(l) > 1:
            warning("Found more than one match. Aborting.")
        else:
            i=self.routes.index(l[0])
            self.invalidate_cache()
            del(self.routes[i])
        
    def ifchange(self, iff, addr):
        the_addr, the_plen = (addr.split("/")+["128"])[:2]
        the_plen = int(the_plen)

        naddr = inet_pton(socket.AF_INET6, the_addr)
        nmask = in6_cidr2mask(the_plen)
        the_net = inet_ntop(socket.AF_INET6, in6_and(nmask,naddr))
        
        for i in range(len(self.routes)):
            net,plen,gw,iface,addr = self.routes[i]
            if iface != iff:
                continue
            if gw == '::':
                self.routes[i] = (the_net,the_plen,gw,iface,the_addr)
            else:
                self.routes[i] = (net,the_plen,gw,iface,the_addr)
        self.invalidate_cache()
        ip6_neigh_cache.flush()

    def ifdel(self, iff):
        """ removes all route entries that uses 'iff' interface. """
        new_routes=[]
        for rt in self.routes:
            if rt[3] != iff:
                new_routes.append(rt)
        self.invalidate_cache()
        self.routes = new_routes


    def ifadd(self, iff, addr):
        """
        Add an interface 'iff' with provided address into routing table.
        
        Ex: ifadd('eth0', '2001:bd8:cafe:1::1/64') will add following entry into 
            Scapy6 internal routing table:

            Destination           Next Hop  iface  Def src @
            2001:bd8:cafe:1::/64  ::        eth0   2001:bd8:cafe:1::1

            prefix length value can be omitted. In that case, a value of 128
            will be used.
        """
        addr, plen = (addr.split(b"/")+[b"128"])[:2]
        addr = in6_ptop(addr)
        plen = int(plen)
        naddr = inet_pton(socket.AF_INET6, addr)
        nmask = in6_cidr2mask(plen)
        prefix = inet_ntop(socket.AF_INET6, in6_and(nmask,naddr))
        self.invalidate_cache()
        self.routes.append((prefix,plen,'::',iff,[addr]))

    def route(self, dst, dev=None):
        """
        Provide best route to IPv6 destination address, based on Scapy6 
        internal routing table content.

        When a set of address is passed (e.g. 2001:db8:cafe:*::1-5) an address
        of the set is used. Be aware of that behavior when using wildcards in
        upper parts of addresses !

        If 'dst' parameter is a FQDN, name resolution is performed and result
        is used.

        if optional 'dev' parameter is provided a specific interface, filtering
        is performed to limit search to route associated to that interface.
        """
        # Transform "2001:db8:cafe:*::1-5:0/120" to one IPv6 address of the set
        dst = dst.split("/")[0]
        savedst = dst # In case following inet_pton() fails 
        dst = dst.replace("*","0")
        l = dst.find("-")
        while l >= 0:
            m = (dst[l:]+":").find(":")
            dst = dst[:l]+dst[l+m:]
            l = dst.find("-")
            
        try:
            inet_pton(socket.AF_INET6, dst)
        except socket.error:
            dst = socket.getaddrinfo(savedst, None, socket.AF_INET6)[0][-1][0]
            # TODO : Check if name resolution went well

        # Deal with dev-specific request for cache search
        k = dst
        if dev is not None:
            k = dst + "%%" + dev
        if k in self.cache:
            return self.cache[k]

        pathes = []

        # TODO : review all kinds of addresses (scope and *cast) to see
        #        if we are able to cope with everything possible. I'm convinced 
        #        it's not the case.
        # -- arnaud
        for p, plen, gw, iface, cset in self.routes:
            if dev is not None and iface != dev:
                continue
            if in6_isincluded(dst, p, plen):
                pathes.append((plen, (iface, cset, gw)))
            elif (in6_ismlladdr(dst) and in6_islladdr(p) and in6_islladdr(cset[0])):
                pathes.append((plen, (iface, cset, gw)))
                
        if not pathes:
            warning("No route found for IPv6 destination %s (no default route?). This affects only IPv6" % dst)
            return (LOOPBACK_NAME, "::", "::") # XXX Linux specific

        # Sort with longest prefix first
        pathes.sort(reverse=True)

        best_plen = pathes[0][0]
        pathes = filter(lambda x: x[0] == best_plen, pathes)

        res = []
        for p in pathes: # Here we select best source address for every route
            tmp = p[1]
            srcaddr = get_source_addr_from_candidate_set(dst, p[1][1])
            if srcaddr is not None:
                res.append((p[0], (tmp[0], srcaddr, tmp[2])))
 
        if res == []:
            warning("Found a route for IPv6 destination '%s', but no possible source address. This affects only IPv6" % dst)
            return (LOOPBACK_NAME, b"::", b"::") # XXX Linux specific

        # Symptom  : 2 routes with same weight (our weight is plen)
        # Solution : 
        #  - dst is unicast global. Check if it is 6to4 and we have a source 
        #    6to4 address in those available
        #  - dst is link local (unicast or multicast) and multiple output
        #    interfaces are available. Take main one (conf.iface6)
        #  - if none of the previous or ambiguity persists, be lazy and keep
        #    first one
        #  XXX TODO : in a _near_ future, include metric in the game

        if len(res) > 1:
            tmp = []
            if in6_isgladdr(dst) and in6_isaddr6to4(dst):
                # TODO : see if taking the longest match between dst and
                #        every source addresses would provide better results
                #tmp = filter(lambda x: in6_isaddr6to4(x[1][1]), res)
                tmp = [ x for x in res if in6_isaddr6to4(x[1][1]) ]
            elif in6_ismaddr(dst) or in6_islladdr(dst):
                # TODO : I'm sure we are not covering all addresses. Check that
                #tmp = filter(lambda x: x[1][0] == conf.iface6, res)
                tmp = [ x for x in res if x[1][0] == conf.iface6 ]

            if tmp:
                res = tmp
                
        # Fill the cache (including dev-specific request)
        k = dst
        if dev is not None:
            k = dst + "%%" + dev
        self.cache[k] = res[0][1]

        return res[0][1]

conf.route6 = Route6()

_res = conf.route6.route("::/0")
if _res:
    iff, gw, addr = _res
    conf.iface6 = iff
del(_res)