/usr/lib/python3/dist-packages/pyroute2/ipset.py is in python3-pyroute2 0.3.16-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 | '''
IPSet module
============
The very basic ipset support.
Right now it is tested only for hash:ip and doesn't support
many useful options. But it can be easily extended, so you
are welcome to help with that.
'''
import socket
from pyroute2.netlink import NLMSG_ERROR
from pyroute2.netlink import NLM_F_REQUEST
from pyroute2.netlink import NLM_F_DUMP
from pyroute2.netlink import NLM_F_ACK
from pyroute2.netlink import NLM_F_EXCL
from pyroute2.netlink import NETLINK_NETFILTER
from pyroute2.netlink.nlsocket import NetlinkSocket
from pyroute2.netlink.nfnetlink import NFNL_SUBSYS_IPSET
from pyroute2.netlink.nfnetlink.ipset import IPSET_CMD_PROTOCOL
from pyroute2.netlink.nfnetlink.ipset import IPSET_CMD_CREATE
from pyroute2.netlink.nfnetlink.ipset import IPSET_CMD_DESTROY
from pyroute2.netlink.nfnetlink.ipset import IPSET_CMD_SWAP
from pyroute2.netlink.nfnetlink.ipset import IPSET_CMD_LIST
from pyroute2.netlink.nfnetlink.ipset import IPSET_CMD_ADD
from pyroute2.netlink.nfnetlink.ipset import IPSET_CMD_DEL
from pyroute2.netlink.nfnetlink.ipset import ipset_msg
def _nlmsg_error(msg):
return msg['header']['type'] == NLMSG_ERROR
class IPSet(NetlinkSocket):
'''
NFNetlink socket (family=NETLINK_NETFILTER).
Implements API to the ipset functionality.
'''
policy = {IPSET_CMD_PROTOCOL: ipset_msg,
IPSET_CMD_LIST: ipset_msg}
def __init__(self, version=6, attr_revision=0, nfgen_family=2):
super(IPSet, self).__init__(family=NETLINK_NETFILTER)
policy = dict([(x | (NFNL_SUBSYS_IPSET << 8), y)
for (x, y) in self.policy.items()])
self.register_policy(policy)
self._proto_version = version
self._attr_revision = attr_revision
self._nfgen_family = nfgen_family
def request(self, msg, msg_type,
msg_flags=NLM_F_REQUEST | NLM_F_DUMP,
terminate=None):
msg['nfgen_family'] = self._nfgen_family
return self.nlm_request(msg,
msg_type | (NFNL_SUBSYS_IPSET << 8),
msg_flags, terminate=terminate)
def list(self, name=None):
'''
List installed ipsets. If `name` is provided, list
the named ipset or return an empty list.
It looks like nfnetlink doesn't return an error,
when requested ipset doesn't exist.
'''
msg = ipset_msg()
msg['attrs'] = [['IPSET_ATTR_PROTOCOL', self._proto_version]]
if name is not None:
msg['attrs'].append(['IPSET_ATTR_SETNAME', name])
return self.request(msg, IPSET_CMD_LIST)
def destroy(self, name):
'''
Destroy an ipset
'''
msg = ipset_msg()
msg['attrs'] = [['IPSET_ATTR_PROTOCOL', self._proto_version],
['IPSET_ATTR_SETNAME', name]]
return self.request(msg, IPSET_CMD_DESTROY,
msg_flags=NLM_F_REQUEST | NLM_F_ACK | NLM_F_EXCL,
terminate=_nlmsg_error)
def create(self, name, stype='hash:ip', family=socket.AF_INET,
exclusive=True):
'''
Create an ipset `name` of type `stype`, by default
`hash:ip`.
Very simple and stupid method, should be extended
to support ipset options.
'''
excl_flag = NLM_F_EXCL if exclusive else 0
msg = ipset_msg()
msg['attrs'] = [['IPSET_ATTR_PROTOCOL', self._proto_version],
['IPSET_ATTR_SETNAME', name],
['IPSET_ATTR_TYPENAME', stype],
['IPSET_ATTR_FAMILY', family],
['IPSET_ATTR_REVISION', self._attr_revision]]
return self.request(msg, IPSET_CMD_CREATE,
msg_flags=NLM_F_REQUEST | NLM_F_ACK | excl_flag,
terminate=_nlmsg_error)
def _add_delete(self, name, entry, family, cmd, exclusive):
if family == socket.AF_INET:
entry_type = 'IPSET_ATTR_IPADDR_IPV4'
elif family == socket.AF_INET6:
entry_type = 'IPSET_ATTR_IPADDR_IPV6'
else:
raise TypeError('unknown family')
excl_flag = NLM_F_EXCL if exclusive else 0
msg = ipset_msg()
msg['attrs'] = [['IPSET_ATTR_PROTOCOL', self._proto_version],
['IPSET_ATTR_SETNAME', name],
['IPSET_ATTR_DATA',
{'attrs': [['IPSET_ATTR_IP',
{'attrs': [[entry_type, entry]]}]]}]]
return self.request(msg, cmd,
msg_flags=NLM_F_REQUEST | NLM_F_ACK | excl_flag,
terminate=_nlmsg_error)
def add(self, name, entry, family=socket.AF_INET, exclusive=True):
'''
Add a member to the ipset
'''
return self._add_delete(name, entry, family, IPSET_CMD_ADD, exclusive)
def delete(self, name, entry, family=socket.AF_INET, exclusive=True):
'''
Delete a member from the ipset
'''
return self._add_delete(name, entry, family, IPSET_CMD_DEL, exclusive)
def swap(self, set_a, set_b):
'''
Swap two ipsets
'''
msg = ipset_msg()
msg['attrs'] = [['IPSET_ATTR_PROTOCOL', self._proto_version],
['IPSET_ATTR_SETNAME', set_a],
['IPSET_ATTR_TYPENAME', set_b]]
return self.request(msg, IPSET_CMD_SWAP,
msg_flags=NLM_F_REQUEST | NLM_F_ACK,
terminate=_nlmsg_error)
|