/usr/share/pyshared/pymodbus/mei_message.py is in python-pymodbus 1.2.0-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | '''
Encapsulated Interface (MEI) Transport Messages
-----------------------------------------------
'''
import struct
from pymodbus.constants import DeviceInformation, MoreData
from pymodbus.pdu import ModbusRequest
from pymodbus.pdu import ModbusResponse
from pymodbus.device import ModbusControlBlock
from pymodbus.device import DeviceInformationFactory
from pymodbus.pdu import ModbusExceptions as merror
_MCB = ModbusControlBlock()
#---------------------------------------------------------------------------#
# Read Device Information
#---------------------------------------------------------------------------#
class ReadDeviceInformationRequest(ModbusRequest):
'''
This function code allows reading the identification and additional
information relative to the physical and functional description of a
remote device, only.
The Read Device Identification interface is modeled as an address space
composed of a set of addressable data elements. The data elements are
called objects and an object Id identifies them.
'''
function_code = 0x2b
sub_function_code = 0x0e
_rtu_frame_size = 3
def __init__(self, read_code=None, object_id=0x00, **kwargs):
''' Initializes a new instance
:param read_code: The device information read code
:param object_id: The object to read from
'''
ModbusRequest.__init__(self, **kwargs)
self.read_code = read_code or DeviceInformation.Basic
self.object_id = object_id
def encode(self):
''' Encodes the request packet
:returns: The byte encoded packet
'''
packet = struct.pack('>BBB', self.sub_function_code,
self.read_code, self.object_id)
return packet
def decode(self, data):
''' Decodes data part of the message.
:param data: The incoming data
'''
params = struct.unpack('>BBB', data)
self.sub_function_code, self.read_code, self.object_id = params
def execute(self, context):
''' Run a read exeception status request against the store
:param context: The datastore to request from
:returns: The populated response
'''
if not (0x00 <= self.object_id <= 0xff):
return self.doException(merror.IllegalValue)
if not (0x00 <= self.read_code <= 0x04):
return self.doException(merror.IllegalValue)
information = DeviceInformationFactory.get(_MCB,
self.read_code, self.object_id)
return ReadDeviceInformationResponse(self.read_code, information)
def __str__(self):
''' Builds a representation of the request
:returns: The string representation of the request
'''
params = (self.read_code, self.object_id)
return "ReadDeviceInformationRequest(%d,%d)" % params
class ReadDeviceInformationResponse(ModbusResponse):
'''
'''
function_code = 0x2b
sub_function_code = 0x0e
@classmethod
def calculateRtuFrameSize(cls, buffer):
''' Calculates the size of the message
:param buffer: A buffer containing the data that have been received.
:returns: The number of bytes in the response.
'''
size = 8 # skip the header information
count = struct.unpack('>B', buffer[7])[0]
while count > 0:
_, object_length = struct.unpack('>BB', buffer[size:size+2])
size += object_length + 2
count -= 1
return size + 2
def __init__(self, read_code=None, information=None, **kwargs):
''' Initializes a new instance
:param read_code: The device information read code
:param information: The requested information request
'''
ModbusResponse.__init__(self, **kwargs)
self.read_code = read_code or DeviceInformation.Basic
self.information = information or {}
self.number_of_objects = len(self.information)
self.conformity = 0x83 # I support everything right now
# TODO calculate
self.next_object_id = 0x00 # self.information[-1](0)
self.more_follows = MoreData.Nothing
def encode(self):
''' Encodes the response
:returns: The byte encoded message
'''
packet = struct.pack('>BBBBBB', self.sub_function_code,
self.read_code, self.conformity, self.more_follows,
self.next_object_id, self.number_of_objects)
for (object_id, data) in self.information.items():
packet += struct.pack('>BB', object_id, len(data))
packet += data
return packet
def decode(self, data):
''' Decodes a the response
:param data: The packet data to decode
'''
params = struct.unpack('>BBBBBB', data[0:6])
self.sub_function_code, self.read_code = params[0:2]
self.conformity, self.more_follows = params[2:4]
self.next_object_id, self.number_of_objects = params[4:6]
self.information, count = {}, 6 # skip the header information
while count < len(data):
object_id, object_length = struct.unpack('>BB', data[count:count+2])
count += object_length + 2
self.information[object_id] = data[count-object_length:count]
def __str__(self):
''' Builds a representation of the response
:returns: The string representation of the response
'''
return "ReadDeviceInformationResponse(%d)" % self.read_code
#---------------------------------------------------------------------------#
# Exported symbols
#---------------------------------------------------------------------------#
__all__ = [
"ReadDeviceInformationRequest", "ReadDeviceInformationResponse",
]
|