This file is indexed.

/usr/lib/python3/dist-packages/pyghmi/ipmi/private/serversession.py is in python3-pyghmi 1.0.32-4.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# vim: tabstop=4 shiftwidth=4 softtabstop=4

# Copyright 2015 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This represents the server side of a session object
# Split into a separate file to avoid overly manipulating the as-yet
# client-centered session object
import collections
import hashlib
import hmac
import os
import pyghmi.ipmi.private.constants as constants
import pyghmi.ipmi.private.session as ipmisession
import socket
import struct
import uuid


class ServerSession(ipmisession.Session):
    def __new__(cls, authdata, kg, clientaddr, netsocket, request, uuid,
                bmc):
        # Need to do default new type behavior.  The normal session
        # takes measures to assure the caller shares even when they
        # didn't try.  We don't have that operational mode to contend
        # with in the server case (one file descriptor per bmc)
        return object.__new__(cls)

    def create_open_session_response(self, request):
        clienttag = request[0]
        # role = request[1]
        self.clientsessionid = request[4:8]
        # TODO(jbjohnso): intelligently handle integrity/auth/conf
        # for now, forcibly do cipher suite 3
        self.managedsessionid = os.urandom(4)
        # table 13-17, 1 for now (hmac-sha1), 3 should also be supported
        # table 13-18, integrity, 1 for now is hmac-sha1-96, 4 is sha256
        # confidentiality: 1 is aes-cbc-128, the only one
        self.privlevel = 4
        response = (bytearray([clienttag, 0, self.privlevel, 0]) +
                    self.clientsessionid + self.managedsessionid +
                    bytearray([
                        0, 0, 0, 8, 1, 0, 0, 0,  # auth
                        1, 0, 0, 8, 1, 0, 0, 0,  # integrity
                        2, 0, 0, 8, 1, 0, 0, 0,  # privacy
                    ]))
        return response

    def __init__(self, authdata, kg, clientaddr, netsocket, request, uuid,
                 bmc):
        # begin conversation per RMCP+ open session request
        self.uuid = uuid
        self.rqaddr = constants.IPMI_BMC_ADDRESS
        self.authdata = authdata
        self.servermode = True
        self.ipmiversion = 2.0
        self.sequencenumber = 0
        self.sessionid = 0
        self.bmc = bmc
        self.lastpayload = None
        self.broken = False
        self.authtype = 6
        self.integrityalgo = 0
        self.confalgo = 0
        self.kg = kg
        self.socket = netsocket
        self.sockaddr = clientaddr
        self.pendingpayloads = collections.deque([])
        self.pktqueue = collections.deque([])
        if clientaddr not in ipmisession.Session.bmc_handlers:
            ipmisession.Session.bmc_handlers[clientaddr] = {bmc.port: self}
        else:
            ipmisession.Session.bmc_handlers[clientaddr][bmc.port] = self
        response = self.create_open_session_response(bytearray(request))
        self.send_payload(response,
                          constants.payload_types['rmcpplusopenresponse'],
                          retry=False)

    def _got_rmcp_openrequest(self, data):
        response = self.create_open_session_response(
            struct.pack('B' * len(data), *data))
        self.send_payload(response,
                          constants.payload_types['rmcpplusopenresponse'],
                          retry=False)

    def _got_rakp1(self, data):
        clienttag = data[0]
        self.Rm = data[8:24]
        self.rolem = data[24]
        self.maxpriv = self.rolem & 0b111
        namepresent = data[27]
        if namepresent == 0:
            # ignore null username for now
            return
        self.username = bytes(data[28:])
        if self.username.decode('utf-8') not in self.authdata:
            # don't think about invalid usernames for now
            return
        uuidbytes = self.uuid.bytes
        self.uuiddata = uuidbytes
        self.Rc = os.urandom(16)
        hmacdata = (self.clientsessionid + self.managedsessionid +
                    self.Rm + self.Rc + uuidbytes +
                    bytearray([self.rolem, len(self.username)]))
        hmacdata += self.username
        self.kuid = self.authdata[self.username.decode('utf-8')].encode(
            'utf-8')
        if self.kg is None:
            self.kg = self.kuid
        authcode = hmac.new(
            self.kuid, bytes(hmacdata), hashlib.sha1).digest()
        # regretably, ipmi mandates the server send out an hmac first
        # akin to a leak of /etc/shadow, not too worrisome if the secret
        # is complex, but terrible for most likely passwords selected by
        # a human
        newmessage = (bytearray([clienttag, 0, 0, 0]) + self.clientsessionid +
                      self.Rc + uuidbytes + authcode)
        self.send_payload(newmessage, constants.payload_types['rakp2'],
                          retry=False)

    def _got_rakp2(self, data):
        # stub, server should not think about rakp2
        pass

    def _got_rakp3(self, data):
        # for now drop rakp3 with bad authcode
        # respond correctly a TODO(jjohnson2), since Kg being used
        # yet incorrect is a scenario why rakp3 could be bad
        # even if rakp2 was good
        RmRc = self.Rm + self.Rc
        self.sik = hmac.new(self.kg,
                            bytes(RmRc) +
                            struct.pack("2B", self.rolem,
                                        len(self.username)) +
                            self.username, hashlib.sha1).digest()
        self.k1 = hmac.new(self.sik, b'\x01' * 20, hashlib.sha1).digest()
        self.k2 = hmac.new(self.sik, b'\x02' * 20, hashlib.sha1).digest()
        self.aeskey = self.k2[0:16]
        hmacdata = self.Rc +\
            self.clientsessionid +\
            struct.pack("2B", self.rolem,
                        len(self.username)) +\
            self.username
        expectedauthcode = hmac.new(self.kuid, bytes(hmacdata), hashlib.sha1
                                    ).digest()
        authcode = struct.pack("%dB" % len(data[8:]), *data[8:])
        if expectedauthcode != authcode:
            # TODO(jjohnson2): RMCP error back at invalid rakp3
            return
        clienttag = data[0]
        if data[1] != 0:
            # client did not like our response, so ignore the rakp3
            return
        self.localsid = struct.unpack('<I', self.managedsessionid)[0]
        self.ipmicallback = self.handle_client_request
        self._send_rakp4(clienttag, 0)

    def handle_client_request(self, request):
        if request['netfn'] == 6 and request['command'] == 0x3b:
            pendingpriv = request['data'][0]
            returncode = 0
            if pendingpriv > 1:
                if pendingpriv > self.maxpriv:
                    returncode = 0x81
                else:
                    self.clientpriv = request['data'][0]
            self._send_ipmi_net_payload(code=returncode,
                                        data=[self.clientpriv])
        elif request['netfn'] == 6 and request['command'] == 0x3c:
            self.send_ipmi_response()
            self.close_server_session()
        else:
            self.bmc.handle_raw_request(request, self)

    def close_server_session(self):
        pass

    def _send_rakp4(self, tagvalue, statuscode):
        payload = bytearray(
            [tagvalue, statuscode, 0, 0]) + self.clientsessionid
        hmacdata = self.Rm + self.managedsessionid + self.uuiddata
        hmacdata = struct.pack('%dB' % len(hmacdata), *hmacdata)
        authdata = hmac.new(self.sik, hmacdata, hashlib.sha1).digest()[:12]
        payload += authdata
        self.send_payload(payload, constants.payload_types['rakp4'],
                          retry=False)
        self.confalgo = 'aes'
        self.integrityalgo = 'sha1'
        self.sequencenumber = 1
        self.sessionid = struct.unpack(
            '<I', struct.pack('4B', *self.clientsessionid))[0]

    def _got_rakp4(self, data):
        # stub, server should not think about rakp4
        pass

    def _timedout(self):
        """Expire a client session after a period of inactivity

        After the session inactivity timeout, this invalidate the client
        session.
        """
        # for now, we will have a non-configurable 60 second timeout
        pass

    def _handle_channel_auth_cap(self, request):
        """Handle incoming channel authentication capabilities request

        This is used when serving as an IPMI target to service client
        requests for client authentication capabilities
        """
        pass

    def send_ipmi_response(self, data=[], code=0):
        self._send_ipmi_net_payload(data=data, code=code)

    def logout(self):
        pass


class IpmiServer(object):
    # auth capabilities for now is a static payload
    # for now always completion code 0, otherwise ignore
    # authentication type fixed to ipmi2, ipmi1 forbidden
    # 0b10000000

    def __init__(self, authdata, port=623, bmcuuid=None, address='::'):
        """Create a new ipmi bmc instance.

        :param authdata: A dict or object with .get() to provide password
                        lookup by username.  This does not support the full
                        complexity of what IPMI can support, only a
                        reasonable subset.
        :param port: The default port number to bind to.  Defaults to the
                     standard 623
        :param address: The IP address to bind to. Defaults to '::' (all
                        zeroes)
        """
        self.revision = 0
        self.deviceid = 0
        self.firmwaremajor = 1
        self.firmwareminor = 0
        self.ipmiversion = 2
        self.additionaldevices = 0
        self.mfgid = 0
        self.prodid = 0
        self.pktqueue = collections.deque([])
        if bmcuuid is None:
            self.uuid = uuid.uuid4()
        else:
            self.uuid = bmcuuid
        lanchannel = 1
        authtype = 0b10000000  # ipmi2 only
        authstatus = 0b00000100  # change based on authdata/kg
        chancap = 0b00000010  # ipmi2 only
        oemdata = (0, 0, 0, 0)
        self.authdata = authdata
        self.authcap = struct.pack('BBBBBBBBB', 0, lanchannel, authtype,
                                   authstatus, chancap, *oemdata)
        self.kg = None
        self.timeout = 60
        self.port = port
        addrinfo = socket.getaddrinfo(address, port, 0,
                                      socket.SOCK_DGRAM)[0]
        self.serversocket = ipmisession.Session._assignsocket(addrinfo)
        ipmisession.Session.bmc_handlers[self.serversocket] = {0: self}

    def send_auth_cap(self, myaddr, mylun, clientaddr, clientlun, clientseq,
                      sockaddr):
        header = bytearray(
            b'\x06\x00\xff\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10')
        headerdata = [clientaddr, clientlun | (7 << 2)]
        headersum = ipmisession._checksum(*headerdata)
        header += bytearray(headerdata + [headersum, myaddr,
                                          mylun | (clientseq << 2), 0x38])
        header += self.authcap
        bodydata = struct.unpack('B' * len(header[17:]), bytes(header[17:]))
        header.append(ipmisession._checksum(*bodydata))
        ipmisession._io_sendto(self.serversocket, header, sockaddr)

    def process_pktqueue(self):
        while self.pktqueue:
            pkt = self.pktqueue.popleft()
            self.sessionless_data(pkt[0], pkt[1])

    def sessionless_data(self, data, sockaddr):
        """Examines unsolocited packet and decides appropriate action.

        For a listening IpmiServer, a packet without an active session
        comes here for examination.  If it is something that is utterly
        sessionless (e.g. get channel authentication), send the appropriate
        response.  If it is a get session challenge or open rmcp+ request,
        spawn a session to handle the context.
        """
        if len(data) < 22:
            return
        data = bytearray(data)
        if not (data[0] == 6 and data[2:4] == b'\xff\x07'):  # not ipmi
            return
        if data[4] == 6:  # ipmi 2 payload...
            payloadtype = data[5]
            if payloadtype not in (0, 16):
                return
            if payloadtype == 16:  # new session to handle conversation
                ServerSession(self.authdata, self.kg, sockaddr,
                              self.serversocket, data[16:], self.uuid,
                              bmc=self)
                return
            # ditch two byte, because ipmi2 header is two
            # bytes longer than ipmi1 (payload type added, payload length 2).
            data = data[2:]
        myaddr, netfnlun = struct.unpack('2B', bytes(data[14:16]))
        netfn = (netfnlun & 0b11111100) >> 2
        mylun = netfnlun & 0b11
        if netfn == 6:  # application request
            if data[19] == 0x38:  # cmd = get channel auth capabilities
                verchannel, level = struct.unpack('2B', bytes(data[20:22]))
                version = verchannel & 0b10000000
                if version != 0b10000000:
                    return
                channel = verchannel & 0b1111
                if channel != 0xe:
                    return
                (clientaddr, clientlun) = struct.unpack(
                    'BB', bytes(data[17:19]))
                clientseq = clientlun >> 2
                clientlun &= 0b11  # Lun is only the least significant bits
                level &= 0b1111
                self.send_auth_cap(myaddr, mylun, clientaddr, clientlun,
                                   clientseq, sockaddr)

    def set_kg(self, kg):
        """Sets the Kg for the BMC to use

        In RAKP, Kg is a BMC-specific integrity key that can be set.  If not
        set, Kuid is used for the integrity key
        """
        try:
            self.kg = kg.encode('utf-8')
        except AttributeError:
            self.kg = kg

    def send_device_id(self, session):
        response = [self.deviceid, self.revision, self.firmwaremajor,
                    self.firmwareminor, self.ipmiversion,
                    self.additionaldevices]
        response += struct.unpack('4B', struct.pack('<I', self.mfgid))
        response += struct.unpack('4B', struct.pack('<I', self.prodid))
        session.send_ipmi_response(data=response)

    def handle_raw_request(self, request, session):
        # per table 5-2, completion code 0xc1 is 'unrecognized'
        session.send_ipmi_response(code=0xc1)

    def logout(self):
        pass