This file is indexed.

/usr/lib/python3/dist-packages/pyroute2/ipset.py is in python3-pyroute2 0.3.16-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
'''
IPSet module
============

The very basic ipset support.

Right now it is tested only for hash:ip and doesn't support
many useful options. But it can be easily extended, so you
are welcome to help with that.
'''
import socket
from pyroute2.netlink import NLMSG_ERROR
from pyroute2.netlink import NLM_F_REQUEST
from pyroute2.netlink import NLM_F_DUMP
from pyroute2.netlink import NLM_F_ACK
from pyroute2.netlink import NLM_F_EXCL
from pyroute2.netlink import NETLINK_NETFILTER
from pyroute2.netlink.nlsocket import NetlinkSocket
from pyroute2.netlink.nfnetlink import NFNL_SUBSYS_IPSET
from pyroute2.netlink.nfnetlink.ipset import IPSET_CMD_PROTOCOL
from pyroute2.netlink.nfnetlink.ipset import IPSET_CMD_CREATE
from pyroute2.netlink.nfnetlink.ipset import IPSET_CMD_DESTROY
from pyroute2.netlink.nfnetlink.ipset import IPSET_CMD_SWAP
from pyroute2.netlink.nfnetlink.ipset import IPSET_CMD_LIST
from pyroute2.netlink.nfnetlink.ipset import IPSET_CMD_ADD
from pyroute2.netlink.nfnetlink.ipset import IPSET_CMD_DEL
from pyroute2.netlink.nfnetlink.ipset import ipset_msg


def _nlmsg_error(msg):
    return msg['header']['type'] == NLMSG_ERROR


class IPSet(NetlinkSocket):
    '''
    NFNetlink socket (family=NETLINK_NETFILTER).

    Implements API to the ipset functionality.
    '''

    policy = {IPSET_CMD_PROTOCOL: ipset_msg,
              IPSET_CMD_LIST: ipset_msg}

    def __init__(self, version=6, attr_revision=0, nfgen_family=2):
        super(IPSet, self).__init__(family=NETLINK_NETFILTER)
        policy = dict([(x | (NFNL_SUBSYS_IPSET << 8), y)
                       for (x, y) in self.policy.items()])
        self.register_policy(policy)
        self._proto_version = version
        self._attr_revision = attr_revision
        self._nfgen_family = nfgen_family

    def request(self, msg, msg_type,
                msg_flags=NLM_F_REQUEST | NLM_F_DUMP,
                terminate=None):
        msg['nfgen_family'] = self._nfgen_family
        return self.nlm_request(msg,
                                msg_type | (NFNL_SUBSYS_IPSET << 8),
                                msg_flags, terminate=terminate)

    def list(self, name=None):
        '''
        List installed ipsets. If `name` is provided, list
        the named ipset or return an empty list.

        It looks like nfnetlink doesn't return an error,
        when requested ipset doesn't exist.
        '''
        msg = ipset_msg()
        msg['attrs'] = [['IPSET_ATTR_PROTOCOL', self._proto_version]]
        if name is not None:
            msg['attrs'].append(['IPSET_ATTR_SETNAME', name])
        return self.request(msg, IPSET_CMD_LIST)

    def destroy(self, name):
        '''
        Destroy an ipset
        '''
        msg = ipset_msg()
        msg['attrs'] = [['IPSET_ATTR_PROTOCOL', self._proto_version],
                        ['IPSET_ATTR_SETNAME', name]]
        return self.request(msg, IPSET_CMD_DESTROY,
                            msg_flags=NLM_F_REQUEST | NLM_F_ACK | NLM_F_EXCL,
                            terminate=_nlmsg_error)

    def create(self, name, stype='hash:ip', family=socket.AF_INET,
               exclusive=True):
        '''
        Create an ipset `name` of type `stype`, by default
        `hash:ip`.

        Very simple and stupid method, should be extended
        to support ipset options.
        '''
        excl_flag = NLM_F_EXCL if exclusive else 0
        msg = ipset_msg()
        msg['attrs'] = [['IPSET_ATTR_PROTOCOL', self._proto_version],
                        ['IPSET_ATTR_SETNAME', name],
                        ['IPSET_ATTR_TYPENAME', stype],
                        ['IPSET_ATTR_FAMILY', family],
                        ['IPSET_ATTR_REVISION', self._attr_revision]]

        return self.request(msg, IPSET_CMD_CREATE,
                            msg_flags=NLM_F_REQUEST | NLM_F_ACK | excl_flag,
                            terminate=_nlmsg_error)

    def _add_delete(self, name, entry, family, cmd, exclusive):
        if family == socket.AF_INET:
            entry_type = 'IPSET_ATTR_IPADDR_IPV4'
        elif family == socket.AF_INET6:
            entry_type = 'IPSET_ATTR_IPADDR_IPV6'
        else:
            raise TypeError('unknown family')
        excl_flag = NLM_F_EXCL if exclusive else 0

        msg = ipset_msg()
        msg['attrs'] = [['IPSET_ATTR_PROTOCOL', self._proto_version],
                        ['IPSET_ATTR_SETNAME', name],
                        ['IPSET_ATTR_DATA',
                         {'attrs': [['IPSET_ATTR_IP',
                                     {'attrs': [[entry_type, entry]]}]]}]]
        return self.request(msg, cmd,
                            msg_flags=NLM_F_REQUEST | NLM_F_ACK | excl_flag,
                            terminate=_nlmsg_error)

    def add(self, name, entry, family=socket.AF_INET, exclusive=True):
        '''
        Add a member to the ipset
        '''
        return self._add_delete(name, entry, family, IPSET_CMD_ADD, exclusive)

    def delete(self, name, entry, family=socket.AF_INET, exclusive=True):
        '''
        Delete a member from the ipset
        '''
        return self._add_delete(name, entry, family, IPSET_CMD_DEL, exclusive)

    def swap(self, set_a, set_b):
        '''
        Swap two ipsets
        '''
        msg = ipset_msg()
        msg['attrs'] = [['IPSET_ATTR_PROTOCOL', self._proto_version],
                        ['IPSET_ATTR_SETNAME', set_a],
                        ['IPSET_ATTR_TYPENAME', set_b]]
        return self.request(msg, IPSET_CMD_SWAP,
                            msg_flags=NLM_F_REQUEST | NLM_F_ACK,
                            terminate=_nlmsg_error)