/usr/share/pyshared/pymodbus/other_message.py is in python-pymodbus 0.9.0+r175-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 | '''
Diagnostic record read/write
Currently not all implemented
'''
import struct
from pymodbus.constants import ModbusStatus
from pymodbus.pdu import ModbusRequest
from pymodbus.pdu import ModbusResponse
from pymodbus.device import ModbusControlBlock
_MCB = ModbusControlBlock()
#---------------------------------------------------------------------------#
# TODO Make these only work on serial
#---------------------------------------------------------------------------#
class ReadExceptionStatusRequest(ModbusRequest):
'''
This function code is used to read the contents of eight Exception Status
outputs in a remote device. The function provides a simple method for
accessing this information, because the Exception Output references are
known (no output reference is needed in the function).
'''
function_code = 0x07
_rtu_frame_size = 4
def __init__(self):
''' Initializes a new instance
'''
ModbusRequest.__init__(self)
def encode(self):
''' Encodes the message
'''
return ''
def decode(self, data):
''' Decodes data part of the message.
:param data: The incoming data
'''
pass
def execute(self):
''' Run a read exeception status request against the store
:returns: The populated response
'''
status = _MCB.Counter.summary()
return ReadExceptionStatusResponse(status)
def __str__(self):
''' Builds a representation of the request
:returns: The string representation of the request
'''
return "ReadExceptionStatusRequest(%d)" % (self.function_code)
class ReadExceptionStatusResponse(ModbusResponse):
'''
The normal response contains the status of the eight Exception Status
outputs. The outputs are packed into one data byte, with one bit
per output. The status of the lowest output reference is contained
in the least significant bit of the byte. The contents of the eight
Exception Status outputs are device specific.
'''
function_code = 0x07
_rtu_frame_size = 5
def __init__(self, status=0x00):
''' Initializes a new instance
:param status: The status response to report
'''
ModbusResponse.__init__(self)
self.status = status
def encode(self):
''' Encodes the response
:returns: The byte encoded message
'''
return struct.pack('>B', self.status)
def decode(self, data):
''' Decodes a the response
:param data: The packet data to decode
'''
self.status = struct.unpack('>B', data)[0]
def __str__(self):
''' Builds a representation of the response
:returns: The string representation of the response
'''
arguments = (self.function_code, self.status)
return "ReadExceptionStatusResponse(%d, %s)" % arguments
# Encapsulate interface transport 43, 14
# CANopen general reference 43, 13
#---------------------------------------------------------------------------#
# TODO Make these only work on serial
#---------------------------------------------------------------------------#
class GetCommEventCounterRequest(ModbusRequest):
'''
This function code is used to get a status word and an event count from
the remote device's communication event counter.
By fetching the current count before and after a series of messages, a
client can determine whether the messages were handled normally by the
remote device.
The device's event counter is incremented once for each successful
message completion. It is not incremented for exception responses,
poll commands, or fetch event counter commands.
The event counter can be reset by means of the Diagnostics function
(code 08), with a subfunction of Restart Communications Option
(code 00 01) or Clear Counters and Diagnostic Register (code 00 0A).
'''
function_code = 0x0b
_rtu_frame_size = 4
def __init__(self):
''' Initializes a new instance
'''
ModbusRequest.__init__(self)
def encode(self):
''' Encodes the message
'''
return ''
def decode(self, data):
''' Decodes data part of the message.
:param data: The incoming data
'''
pass
def execute(self):
''' Run a read exeception status request against the store
:returns: The populated response
'''
status = _MCB.Counter.Event
return GetCommEventCounterResponse(status)
def __str__(self):
''' Builds a representation of the request
:returns: The string representation of the request
'''
return "GetCommEventCounterRequest(%d)" % (self.function_code)
class GetCommEventCounterResponse(ModbusResponse):
'''
The normal response contains a two-byte status word, and a two-byte
event count. The status word will be all ones (FF FF hex) if a
previously-issued program command is still being processed by the
remote device (a busy condition exists). Otherwise, the status word
will be all zeros.
'''
function_code = 0x0b
_rtu_frame_size = 8
def __init__(self, count=0x0000):
''' Initializes a new instance
:param count: The current event counter value
'''
ModbusResponse.__init__(self)
self.count = count
self.status = True # this means we are ready, not waiting
def encode(self):
''' Encodes the response
:returns: The byte encoded message
'''
if self.status: ready = ModbusStatus.Ready
else: ready = ModbusStatus.Waiting
return struct.pack('>HH', ready, self.count)
def decode(self, data):
''' Decodes a the response
:param data: The packet data to decode
'''
ready, self.count = struct.unpack('>HH', data)
self.status = (ready == ModbusStatus.Ready)
def __str__(self):
''' Builds a representation of the response
:returns: The string representation of the response
'''
arguments = (self.function_code, self.count, self.status)
return "GetCommEventCounterResponse(%d, %d, %d)" % arguments
#---------------------------------------------------------------------------#
# TODO Make these only work on serial
#---------------------------------------------------------------------------#
class GetCommEventLogRequest(ModbusRequest):
'''
This function code is used to get a status word, event count, message
count, and a field of event bytes from the remote device.
The status word and event counts are identical to that returned by
the Get Communications Event Counter function (11, 0B hex).
The message counter contains the quantity of messages processed by the
remote device since its last restart, clear counters operation, or
power-up. This count is identical to that returned by the Diagnostic
function (code 08), sub-function Return Bus Message Count (code 11,
0B hex).
The event bytes field contains 0-64 bytes, with each byte corresponding
to the status of one MODBUS send or receive operation for the remote
device. The remote device enters the events into the field in
chronological order. Byte 0 is the most recent event. Each new byte
flushes the oldest byte from the field.
'''
function_code = 0x0c
_rtu_frame_size = 4
def __init__(self):
''' Initializes a new instance
'''
ModbusRequest.__init__(self)
def encode(self):
''' Encodes the message
'''
return ''
def decode(self, data):
''' Decodes data part of the message.
:param data: The incoming data
'''
pass
def execute(self):
''' Run a read exeception status request against the store
:returns: The populated response
'''
results = {
'status' : True,
'message_count' : _MCB.Counter.BusMessage,
'event_count' : _MCB.Counter.Event,
'events' : _MCB.getEvents(),
}
return GetCommEventLogResponse(**results)
def __str__(self):
''' Builds a representation of the request
:returns: The string representation of the request
'''
return "GetCommEventLogRequest(%d)" % self.function_code
class GetCommEventLogResponse(ModbusResponse):
'''
The normal response contains a two-byte status word field,
a two-byte event count field, a two-byte message count field,
and a field containing 0-64 bytes of events. A byte count
field defines the total length of the data in these four field
'''
function_code = 0x0c
_rtu_byte_count_pos = 3
def __init__(self, **kwargs):
''' Initializes a new instance
:param status: The status response to report
:param message_count: The current message count
:param event_count: The current event count
:param events: The collection of events to send
'''
ModbusResponse.__init__(self)
self.status = kwargs.get('status', True)
self.message_count = kwargs.get('message_count', 0)
self.event_count = kwargs.get('event_count', 0)
self.events = kwargs.get('events', [])
def encode(self):
''' Encodes the response
:returns: The byte encoded message
'''
if self.status: ready = ModbusStatus.Ready
else: ready = ModbusStatus.Waiting
packet = struct.pack('>B', 6 + len(self.events))
packet += struct.pack('>H', ready)
packet += struct.pack('>HH', self.event_count, self.message_count)
packet += ''.join(struct.pack('>B', e) for e in self.events)
return packet
def decode(self, data):
''' Decodes a the response
:param data: The packet data to decode
'''
length = struct.unpack('>B', data[0])[0]
status = struct.unpack('>H', data[1:3])[0]
self.status = (status == ModbusStatus.Ready)
self.event_count = struct.unpack('>H', data[3:5])[0]
self.message_count = struct.unpack('>H', data[5:7])[0]
self.events = []
for e in xrange(7, length + 1):
self.events.append(struct.unpack('>B', data[e])[0])
def __str__(self):
''' Builds a representation of the response
:returns: The string representation of the response
'''
arguments = (self.function_code, self.status, self.message_count, self.event_count)
return "GetCommEventLogResponse(%d, %d, %d, %d)" % arguments
#---------------------------------------------------------------------------#
# TODO Make these only work on serial
#---------------------------------------------------------------------------#
class ReportSlaveIdRequest(ModbusRequest):
'''
This function code is used to read the description of the type, the
current status, and other information specific to a remote device.
'''
function_code = 0x11
_rtu_frame_size = 4
def __init__(self):
''' Initializes a new instance
'''
ModbusRequest.__init__(self)
def encode(self):
''' Encodes the message
'''
return ''
def decode(self, data):
''' Decodes data part of the message.
:param data: The incoming data
'''
pass
def execute(self):
''' Run a read exeception status request against the store
:returns: The populated response
'''
identifier = '\x70\x79\x6d\x6f\x64\x62\x75\x73'
return ReportSlaveIdResponse(identifier)
def __str__(self):
''' Builds a representation of the request
:returns: The string representation of the request
'''
return "ResportSlaveIdRequest(%d)" % self.function_code
class ReportSlaveIdResponse(ModbusResponse):
'''
The format of a normal response is shown in the following example.
The data contents are specific to each type of device.
'''
function_code = 0x11
_rtu_byte_count_pos = 2
def __init__(self, identifier=0x00, status=True):
''' Initializes a new instance
:param identifier: The identifier of the slave
:param status: The status response to report
'''
ModbusResponse.__init__(self)
self.identifier = identifier
self.status = status
def encode(self):
''' Encodes the response
:returns: The byte encoded message
'''
if self.status: status = ModbusStatus.SlaveOn
else: status = ModbusStatus.SlaveOff
length = len(self.identifier) + 2
packet = struct.pack('>B', length)
packet += self.identifier # we assume it is already encoded
packet += struct.pack('>B', status)
return packet
def decode(self, data):
''' Decodes a the response
Since the identifier is device dependent, we just return the
raw value that a user can decode to whatever it should be.
:param data: The packet data to decode
'''
length = struct.unpack('>B', data[0])[0]
self.identifier = data[1:length + 1]
status = struct.unpack('>B', data[-1])[0]
self.status = status == ModbusStatus.SlaveOn
def __str__(self):
''' Builds a representation of the response
:returns: The string representation of the response
'''
arguments = (self.function_code, self.identifier, self.status)
return "ResportSlaveIdResponse(%d, %d, %d)" % arguments
#---------------------------------------------------------------------------#
# TODO Make these only work on serial
#---------------------------------------------------------------------------#
# report device identification 43, 14
#---------------------------------------------------------------------------#
# Exported symbols
#---------------------------------------------------------------------------#
__all__ = [
"ReadExceptionStatusRequest", "ReadExceptionStatusResponse",
"GetCommEventCounterRequest", "GetCommEventCounterResponse",
"GetCommEventLogRequest", "GetCommEventLogResponse",
"ReportSlaveIdRequest", "ReportSlaveIdResponse",
]
|