This file is indexed.

/usr/share/pyshared/pymodbus/mei_message.py is in python-pymodbus 1.2.0-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
'''
Encapsulated Interface (MEI) Transport Messages
-----------------------------------------------

'''
import struct
from pymodbus.constants import DeviceInformation, MoreData
from pymodbus.pdu import ModbusRequest
from pymodbus.pdu import ModbusResponse
from pymodbus.device import ModbusControlBlock
from pymodbus.device import DeviceInformationFactory
from pymodbus.pdu import ModbusExceptions as merror

_MCB = ModbusControlBlock()


#---------------------------------------------------------------------------#
# Read Device Information
#---------------------------------------------------------------------------#
class ReadDeviceInformationRequest(ModbusRequest):
    '''
    This function code allows reading the identification and additional
    information relative to the physical and functional description of a
    remote device, only.

    The Read Device Identification interface is modeled as an address space
    composed of a set of addressable data elements. The data elements are
    called objects and an object Id identifies them.  
    '''
    function_code = 0x2b
    sub_function_code = 0x0e
    _rtu_frame_size = 3

    def __init__(self, read_code=None, object_id=0x00, **kwargs):
        ''' Initializes a new instance

        :param read_code: The device information read code
        :param object_id: The object to read from
        '''
        ModbusRequest.__init__(self, **kwargs)
        self.read_code = read_code or DeviceInformation.Basic
        self.object_id = object_id

    def encode(self):
        ''' Encodes the request packet

        :returns: The byte encoded packet
        '''
        packet = struct.pack('>BBB', self.sub_function_code,
            self.read_code, self.object_id)
        return packet

    def decode(self, data):
        ''' Decodes data part of the message.

        :param data: The incoming data
        '''
        params = struct.unpack('>BBB', data)
        self.sub_function_code, self.read_code, self.object_id = params

    def execute(self, context):
        ''' Run a read exeception status request against the store

        :param context: The datastore to request from
        :returns: The populated response
        '''
        if not (0x00 <= self.object_id <= 0xff):
            return self.doException(merror.IllegalValue)
        if not (0x00 <= self.read_code <= 0x04):
            return self.doException(merror.IllegalValue)

        information = DeviceInformationFactory.get(_MCB,
            self.read_code, self.object_id)
        return ReadDeviceInformationResponse(self.read_code, information)

    def __str__(self):
        ''' Builds a representation of the request

        :returns: The string representation of the request
        '''
        params = (self.read_code, self.object_id)
        return "ReadDeviceInformationRequest(%d,%d)" % params


class ReadDeviceInformationResponse(ModbusResponse):
    '''
    '''
    function_code = 0x2b
    sub_function_code = 0x0e

    @classmethod
    def calculateRtuFrameSize(cls, buffer):
        ''' Calculates the size of the message

        :param buffer: A buffer containing the data that have been received.
        :returns: The number of bytes in the response.
        '''
        size  = 8 # skip the header information
        count = struct.unpack('>B', buffer[7])[0]

        while count > 0:
            _, object_length = struct.unpack('>BB', buffer[size:size+2])
            size += object_length + 2
            count -= 1
        return size + 2

    def __init__(self, read_code=None, information=None, **kwargs):
        ''' Initializes a new instance

        :param read_code: The device information read code
        :param information: The requested information request
        '''
        ModbusResponse.__init__(self, **kwargs)
        self.read_code = read_code or DeviceInformation.Basic
        self.information = information or {}
        self.number_of_objects = len(self.information)
        self.conformity = 0x83 # I support everything right now

        # TODO calculate
        self.next_object_id = 0x00 # self.information[-1](0)
        self.more_follows = MoreData.Nothing

    def encode(self):
        ''' Encodes the response

        :returns: The byte encoded message
        '''
        packet = struct.pack('>BBBBBB', self.sub_function_code,
            self.read_code, self.conformity, self.more_follows,
            self.next_object_id, self.number_of_objects)

        for (object_id, data) in self.information.items():
            packet += struct.pack('>BB', object_id, len(data))
            packet += data

        return packet

    def decode(self, data):
        ''' Decodes a the response

        :param data: The packet data to decode
        '''
        params = struct.unpack('>BBBBBB', data[0:6])
        self.sub_function_code, self.read_code = params[0:2]
        self.conformity, self.more_follows = params[2:4]
        self.next_object_id, self.number_of_objects = params[4:6]
        self.information, count = {}, 6 # skip the header information

        while count < len(data):
            object_id, object_length = struct.unpack('>BB', data[count:count+2])
            count += object_length + 2
            self.information[object_id] = data[count-object_length:count]

    def __str__(self):
        ''' Builds a representation of the response

        :returns: The string representation of the response
        '''
        return "ReadDeviceInformationResponse(%d)" % self.read_code

#---------------------------------------------------------------------------#
# Exported symbols
#---------------------------------------------------------------------------#
__all__ = [
    "ReadDeviceInformationRequest", "ReadDeviceInformationResponse",
]