This file is indexed.

/usr/lib/python3/dist-packages/pyroute2/iwutil.py is in python3-pyroute2 0.4.21-0.1ubuntu2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
# -*- coding: utf-8 -*-
'''
IW module
=========

Experimental wireless module — nl80211 support.

Disclaimer
----------

Unlike IPRoute, which is mostly usable, though is far from
complete yet, the IW module is in the very initial state.
Neither the module itself, nor the message class cover the
nl80211 functionality reasonably enough. So if you're
going to use it, brace yourself — debug is coming.

Messages
--------

nl80211 messages are defined here::

    pyroute2/netlink/nl80211/__init__.py

Pls notice NLAs of type `hex`. On the early development stage
`hex` allows to inspect incoming data as a hex dump and,
occasionally, even make requests with such NLAs. But it's
not a production way.

The type `hex` in the NLA definitions means that this
particular NLA is not handled yet properly. If you want to
use some NLA which is defined as `hex` yet, pls find out a
specific type, patch the message class and submit your pull
request on github.

If you're not familiar with NLA types, take a look at RTNL
definitions::

    pyroute2/netlink/rtnl/ndmsg.py

and so on.

Communication with the kernel
-----------------------------

There are several methods of the communication with the kernel.

    * `sendto()` — lowest possible, send a raw binary data
    * `put()` — send a netlink message
    * `nlm_request()` — send a message, return the response
    * `get()` — get a netlink message
    * `recv()` — get a raw binary data from the kernel

There are no errors on `put()` usually. Any `permission denied`,
any `invalid value` errors are returned from the kernel with
netlink also. So if you do `put()`, but don't do `get()`, be
prepared to miss errors.

The preferred method for the communication is `nlm_request()`.
It tracks the message ID, returns the corresponding response.
In the case of errors `nlm_request()` raises an exception.
To get the response on any operation with nl80211, use flag
`NLM_F_ACK`.

Reverse it
----------

If you're too lazy to read the kernel sources, but still need
something not implemented here, you can use reverse engineering
on a reference implementation. E.g.::

    # strace -e trace=network -f -x -s 4096 \\
            iw phy phy0 interface add test type monitor

Will dump all the netlink traffic between the program `iw` and
the kernel. Three first packets are the generic netlink protocol
discovery, you can ignore them. All that follows, is the
nl80211 traffic::

    sendmsg(3, {msg_name(12)={sa_family=AF_NETLINK, ... },
        msg_iov(1)=[{"\\x30\\x00\\x00\\x00\\x1b\\x00\\x05 ...", 48}],
        msg_controllen=0, msg_flags=0}, 0) = 48
    recvmsg(3, {msg_name(12)={sa_family=AF_NETLINK, ... },
        msg_iov(1)=[{"\\x58\\x00\\x00\\x00\\x1b\\x00\\x00 ...", 16384}],
        msg_controllen=0, msg_flags=0}, 0) = 88
    ...

With `-s 4096` you will get the full dump. Then copy the strings
from `msg_iov` to a file, let's say `data`, and run the decoder::

    $ pwd
    /home/user/Projects/pyroute2
    $ export PYTHONPATH=`pwd`
    $ python scripts/decoder.py pyroute2.netlink.nl80211.nl80211cmd data

You will get the session decoded::

    {'attrs': [['NL80211_ATTR_WIPHY', 0],
               ['NL80211_ATTR_IFNAME', 'test'],
               ['NL80211_ATTR_IFTYPE', 6]],
     'cmd': 7,
     'header': {'flags': 5,
                'length': 48,
                'pid': 3292542647,
                'sequence_number': 1430426434,
                'type': 27},
     'reserved': 0,
     'version': 0}
    {'attrs': [['NL80211_ATTR_IFINDEX', 23811],
               ['NL80211_ATTR_IFNAME', 'test'],
               ['NL80211_ATTR_WIPHY', 0],
               ['NL80211_ATTR_IFTYPE', 6],
               ['NL80211_ATTR_WDEV', 4],
               ['NL80211_ATTR_MAC', 'a4:4e:31:43:1c:7c'],
               ['NL80211_ATTR_GENERATION', '02:00:00:00']],
     'cmd': 7,
     'header': {'flags': 0,
                'length': 88,
                'pid': 3292542647,
                'sequence_number': 1430426434,
                'type': 27},
     'reserved': 0,
     'version': 1}

Now you know, how to do a request and what you will get as a
response. Sample collected data is in the `scripts` directory.

Submit changes
--------------

Please do not hesitate to submit the changes on github. Without
your patches this module will not evolve.
'''
from pyroute2.netlink import NLM_F_ACK
from pyroute2.netlink import NLM_F_REQUEST
from pyroute2.netlink import NLM_F_DUMP
from pyroute2.netlink.nl80211 import NL80211
from pyroute2.netlink.nl80211 import nl80211cmd
from pyroute2.netlink.nl80211 import NL80211_NAMES
from pyroute2.netlink.nl80211 import IFTYPE_NAMES
from pyroute2.netlink.nl80211 import CHAN_WIDTH
from pyroute2.netlink.nl80211 import BSS_STATUS_NAMES


class IW(NL80211):

    def __init__(self, *argv, **kwarg):
        # get specific groups kwarg
        if 'groups' in kwarg:
            groups = kwarg['groups']
            del kwarg['groups']
        else:
            groups = None

        # get specific async kwarg
        if 'async' in kwarg:
            async = kwarg['async']
            del kwarg['async']
        else:
            async = False

        # align groups with async
        if groups is None:
            groups = ~0 if async else 0

        # continue with init
        super(IW, self).__init__(*argv, **kwarg)

        # do automatic bind
        # FIXME: unfortunately we can not omit it here
        self.bind(groups, async)

    def del_interface(self, dev):
        '''
        Delete a virtual interface

            - dev — device index
        '''
        msg = nl80211cmd()
        msg['cmd'] = NL80211_NAMES['NL80211_CMD_DEL_INTERFACE']
        msg['attrs'] = [['NL80211_ATTR_IFINDEX', dev]]
        self.nlm_request(msg,
                         msg_type=self.prid,
                         msg_flags=NLM_F_REQUEST | NLM_F_ACK)

    def add_interface(self, ifname, iftype, dev=None, phy=0):
        '''
        Create a virtual interface

            - ifname — name of the interface to create
            - iftype — interface type to create
            - dev — device index
            - phy — phy index

        One should specify `dev` (device index) or `phy`
        (phy index). If no one specified, phy == 0.

        `iftype` can be integer or string:

        1. adhoc
        2. station
        3. ap
        4. ap_vlan
        5. wds
        6. monitor
        7. mesh_point
        8. p2p_client
        9. p2p_go
        10. p2p_device
        11. ocb
        '''
        # lookup the interface type
        iftype = IFTYPE_NAMES.get(iftype, iftype)
        assert isinstance(iftype, int)

        msg = nl80211cmd()
        msg['cmd'] = NL80211_NAMES['NL80211_CMD_NEW_INTERFACE']
        msg['attrs'] = [['NL80211_ATTR_IFNAME', ifname],
                        ['NL80211_ATTR_IFTYPE', iftype]]
        if dev is not None:
            msg['attrs'].append(['NL80211_ATTR_IFINDEX', dev])
        elif phy is not None:
            msg['attrs'].append(['NL80211_ATTR_WIPHY', phy])
        else:
            raise TypeError('no device specified')
        self.nlm_request(msg,
                         msg_type=self.prid,
                         msg_flags=NLM_F_REQUEST | NLM_F_ACK)

    def list_dev(self):
        '''
        Get list of all wifi network interfaces
        '''
        return self.get_interfaces_dump()

    def list_wiphy(self):
        '''
        Get list of all phy devices
        '''
        msg = nl80211cmd()
        msg['cmd'] = NL80211_NAMES['NL80211_CMD_GET_WIPHY']
        return self.nlm_request(msg,
                                msg_type=self.prid,
                                msg_flags=NLM_F_REQUEST | NLM_F_DUMP)

    def _get_phy_name(self, attr):
        return 'phy%i' % attr.get_attr('NL80211_ATTR_WIPHY')

    def _get_frequency(self, attr):
        try:
            return attr.get_attr('NL80211_ATTR_WIPHY_FREQ')
        except:
            return 0

    def get_interfaces_dict(self):
        '''
        Get interfaces dictionary
        '''
        ret = {}
        for wif in self.get_interfaces_dump():
            chan_width = wif.get_attr('NL80211_ATTR_CHANNEL_WIDTH')
            freq = self._get_frequency(wif) if chan_width is not None else 0
            wifname = wif.get_attr('NL80211_ATTR_IFNAME')
            ret[wifname] = [wif.get_attr('NL80211_ATTR_IFINDEX'),
                            self._get_phy_name(wif),
                            wif.get_attr('NL80211_ATTR_MAC'),
                            freq, chan_width]
        return ret

    def get_interfaces_dump(self):
        '''
        Get interfaces dump
        '''
        msg = nl80211cmd()
        msg['cmd'] = NL80211_NAMES['NL80211_CMD_GET_INTERFACE']
        return self.nlm_request(msg,
                                msg_type=self.prid,
                                msg_flags=NLM_F_REQUEST | NLM_F_DUMP)

    def get_interface_by_phy(self, attr):
        '''
        Get interface by phy ( use x.get_attr('NL80211_ATTR_WIPHY') )
        '''
        msg = nl80211cmd()
        msg['cmd'] = NL80211_NAMES['NL80211_CMD_GET_INTERFACE']
        msg['attrs'] = [['NL80211_ATTR_WIPHY', attr]]
        return self.nlm_request(msg,
                                msg_type=self.prid,
                                msg_flags=NLM_F_REQUEST | NLM_F_DUMP)

    def get_interface_by_ifindex(self, ifindex):
        '''
        Get interface by ifindex ( use x.get_attr('NL80211_ATTR_IFINDEX')
        '''
        msg = nl80211cmd()
        msg['cmd'] = NL80211_NAMES['NL80211_CMD_GET_INTERFACE']
        msg['attrs'] = [['NL80211_ATTR_IFINDEX', ifindex]]
        return self.nlm_request(msg,
                                msg_type=self.prid,
                                msg_flags=NLM_F_REQUEST)

    def get_stations(self, ifindex):
        '''
        Get stations by ifindex
        '''
        msg = nl80211cmd()
        msg['cmd'] = NL80211_NAMES['NL80211_CMD_GET_STATION']
        msg['attrs'] = [['NL80211_ATTR_IFINDEX', ifindex]]
        return self.nlm_request(msg,
                                msg_type=self.prid,
                                msg_flags=NLM_F_REQUEST | NLM_F_DUMP)

    def join_ibss(self, ifindex, ssid, freq, bssid=None,
                  channel_fixed=False, width=None, center=None, center2=None):
        '''
        Connect to network by ssid
            - ifindex - IFINDEX of the interface to perform the connection
            - ssid - Service set identification
            - freq - Frequency in MHz
            - bssid - The MAC address of target interface
            - channel_fixed: Boolean flag
            - width - Channel width
            - center - Central frequency of the 40/80/160 MHz channel
            - center2 - Center frequency of second segment if 80P80

        If the flag of channel_fixed is True, one should specify both the width
        and center of the channel

        `width` can be integer of string:

        0. 20_noht
        1. 20
        2. 40
        3. 80
        4. 80p80
        5. 160
        6. 5
        7. 10
        '''

        msg = nl80211cmd()
        msg['cmd'] = NL80211_NAMES['NL80211_CMD_JOIN_IBSS']
        msg['attrs'] = [['NL80211_ATTR_IFINDEX', ifindex],
                        ['NL80211_ATTR_SSID', ssid],
                        ['NL80211_ATTR_WIPHY_FREQ', freq]]

        if channel_fixed:
            msg['attrs'].append(['NL80211_ATTR_FREQ_FIXED', None])
            width = CHAN_WIDTH.get(width, width)
            assert isinstance(width, int)
            if width in [2, 3, 5] and center:
                msg['attrs'].append(['NL80211_ATTR_CHANNEL_WIDTH', width])
                msg['attrs'].append(['NL80211_ATTR_CENTER_FREQ1', center])
            elif width == 4 and center and center2:
                msg['attrs'].append(['NL80211_ATTR_CHANNEL_WIDTH', width])
                msg['attrs'].append(['NL80211_ATTR_CENTER_FREQ1', center])
                msg['attrs'].append(['NL80211_ATTR_CENTER_FREQ2', center2])
            elif width in [0, 1, 6, 7]:
                msg['attrs'].append(['NL80211_ATTR_CHANNEL_WIDTH', width])
            else:
                raise TypeError('No channel specified')

        if bssid is not None:
            msg['attrs'].append(['NL80211_ATTR_MAC', bssid])

        self.nlm_request(msg,
                         msg_type=self.prid,
                         msg_flags=NLM_F_REQUEST | NLM_F_ACK)

    def leave_ibss(self, ifindex):
        '''
        Leave the IBSS -- the IBSS is determined by the network interface
        '''
        msg = nl80211cmd()
        msg['cmd'] = NL80211_NAMES['NL80211_CMD_LEAVE_IBSS']
        msg['attrs'] = [['NL80211_ATTR_IFINDEX', ifindex]]

        self.nlm_request(msg,
                         msg_type=self.prid,
                         msg_flags=NLM_F_REQUEST | NLM_F_ACK)

    def connect(self, ifindex, ssid, bssid=None):
        '''
        Connect to the ap with ssid and bssid
        '''
        msg = nl80211cmd()
        msg['cmd'] = NL80211_NAMES['NL80211_CMD_CONNECT']
        msg['attrs'] = [['NL80211_ATTR_IFINDEX', ifindex],
                        ['NL80211_ATTR_SSID', ssid]]
        if bssid is not None:
            msg['attrs'].append(['NL80211_ATTR_MAC', bssid])

        self.nlm_request(msg,
                         msg_type=self.prid,
                         msg_flags=NLM_F_REQUEST | NLM_F_ACK)

    def disconnect(self, ifindex):
        '''
        Disconnect the device
        '''
        msg = nl80211cmd()
        msg['cmd'] = NL80211_NAMES['NL80211_CMD_DISCONNECT']
        msg['attrs'] = [['NL80211_ATTR_IFINDEX', ifindex]]
        self.nlm_request(msg,
                         msg_type=self.prid,
                         msg_flags=NLM_F_REQUEST | NLM_F_ACK)

    def scan(self, ifindex):
        '''
        Trigger scan and get results.

        Triggering scan usually requires root, and can take a
        couple of seconds.
        '''
        # Prepare a second netlink socket to get the scan results.
        # The issue is that the kernel can send the results notification
        # before we get answer for the NL80211_CMD_TRIGGER_SCAN
        nsock = NL80211()
        nsock.bind()
        nsock.add_membership('scan')

        # send scan request
        msg = nl80211cmd()
        msg['cmd'] = NL80211_NAMES['NL80211_CMD_TRIGGER_SCAN']
        msg['attrs'] = [['NL80211_ATTR_IFINDEX', ifindex]]
        self.nlm_request(msg,
                         msg_type=self.prid,
                         msg_flags=NLM_F_REQUEST | NLM_F_ACK)

        # monitor the results notification on the secondary socket
        scanResultNotFound = True
        while scanResultNotFound:
            listMsg = nsock.get()
            for msg in listMsg:
                if msg["event"] == "NL80211_CMD_NEW_SCAN_RESULTS":
                    scanResultNotFound = False
                    break
        # close the secondary socket
        nsock.close()

        # request the results
        msg2 = nl80211cmd()
        msg2['cmd'] = NL80211_NAMES['NL80211_CMD_GET_SCAN']
        msg2['attrs'] = [['NL80211_ATTR_IFINDEX', ifindex]]
        return self.nlm_request(msg2, msg_type=self.prid,
                                msg_flags=NLM_F_REQUEST | NLM_F_DUMP)

    def get_associated_bss(self, ifindex):
        '''
        Returns the same info like scan() does, but only about the
        currently associated BSS.

        Unlike scan(), it returns immediately and doesn't require root.
        '''
        # When getting scan results without triggering scan first,
        # you'll always get the information about currently associated BSS
        #
        # However, it may return other BSS, if last scan wasn't very
        # long time go

        msg = nl80211cmd()
        msg['cmd'] = NL80211_NAMES['NL80211_CMD_GET_SCAN']
        msg['attrs'] = [['NL80211_ATTR_IFINDEX', ifindex]]

        res = self.nlm_request(msg, msg_type=self.prid,
                               msg_flags=NLM_F_REQUEST | NLM_F_DUMP)

        for x in res:
            attr_bss = x.get_attr('NL80211_ATTR_BSS')
            if attr_bss is not None:
                status = attr_bss.get_attr('NL80211_BSS_STATUS')
                if status in (BSS_STATUS_NAMES['associated'],
                              BSS_STATUS_NAMES['ibss_joined']):

                    return x

        return None