/usr/share/pyshared/pymodbus/interfaces.py is in python-pymodbus 0.9.0+r175-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 | '''
Pymodbus Interfaces
---------------------
A collection of base classes that are used throughout
the pymodbus library.
'''
from pymodbus.exceptions import NotImplementedException
#---------------------------------------------------------------------------#
# Generic
#---------------------------------------------------------------------------#
class Singleton(object):
'''
Singleton base class
http://mail.python.org/pipermail/python-list/2007-July/450681.html
'''
def __new__(cls, *args, **kwargs):
''' Create a new instance
'''
if '_inst' not in vars(cls):
cls._inst = object.__new__(cls)
return cls._inst
#---------------------------------------------------------------------------#
# Project Specific
#---------------------------------------------------------------------------#
class IModbusDecoder(object):
''' Modbus Decoder Base Class
This interface must be implemented by a modbus message
decoder factory. These factories are responsible for
abstracting away converting a raw packet into a request / response
message object.
'''
def decode(self, message):
''' Wrapper to decode a given packet
:param message: The raw modbus request packet
:return: The decoded modbus message or None if error
'''
raise NotImplementedException(
"Method not implemented by derived class")
def lookupPduClass(self, function_code):
''' Use `function_code` to determine the class of the PDU.
:param function_code: The function code specified in a frame.
:returns: The class of the PDU that has a matching `function_code`.
'''
raise NotImplementedException(
"Method not implemented by derived class")
class IModbusFramer(object):
'''
A framer strategy interface. The idea is that we abstract away all the
detail about how to detect if a current message frame exists, decoding
it, sending it, etc so that we can plug in a new Framer object (tcp,
rtu, ascii).
'''
def checkFrame(self):
''' Check and decode the next frame
:returns: True if we successful, False otherwise
'''
raise NotImplementedException(
"Method not implemented by derived class")
def advanceFrame(self):
''' Skip over the current framed message
This allows us to skip over the current message after we have processed
it or determined that it contains an error. It also has to reset the
current frame header handle
'''
raise NotImplementedException(
"Method not implemented by derived class")
def addToFrame(self, message):
''' Add the next message to the frame buffer
This should be used before the decoding while loop to add the received
data to the buffer handle.
:param message: The most recent packet
'''
raise NotImplementedException(
"Method not implemented by derived class")
def isFrameReady(self):
''' Check if we should continue decode logic
This is meant to be used in a while loop in the decoding phase to let
the decoder know that there is still data in the buffer.
:returns: True if ready, False otherwise
'''
raise NotImplementedException(
"Method not implemented by derived class")
def getFrame(self):
''' Get the next frame from the buffer
:returns: The frame data or ''
'''
raise NotImplementedException(
"Method not implemented by derived class")
def populateResult(self, result):
''' Populates the modbus result with current frame header
We basically copy the data back over from the current header
to the result header. This may not be needed for serial messages.
:param result: The response packet
'''
raise NotImplementedException(
"Method not implemented by derived class")
def processIncomingPacket(self, data, callback):
''' The new packet processing pattern
This takes in a new request packet, adds it to the current
packet stream, and performs framing on it. That is, checks
for complete messages, and once found, will process all that
exist. This handles the case when we read N + 1 or 1 / N
messages at a time instead of 1.
The processed and decoded messages are pushed to the callback
function to process and send.
:param data: The new packet data
:param callback: The function to send results to
'''
raise NotImplementedException(
"Method not implemented by derived class")
def buildPacket(self, message):
''' Creates a ready to send modbus packet
The raw packet is built off of a fully populated modbus
request / response message.
:param message: The request/response to send
:returns: The built packet
'''
raise NotImplementedException(
"Method not implemented by derived class")
class IModbusSlaveContext(object):
'''
Interface for a modbus slave data context
Derived classes must implemented the following methods:
reset(self)
validate(self, fx, address, count=1)
getValues(self, fx, address, count=1)
setValues(self, fx, address, values)
'''
__fx_mapper = {2: 'd', 4: 'i'}
__fx_mapper.update([(i, 'h') for i in [3, 6, 16, 22, 23]])
__fx_mapper.update([(i, 'c') for i in [1, 5, 15]])
def decode(self, fx):
''' Converts the function code to the datastore to
:param fx: The function we are working with
:returns: one of [d(iscretes),i(inputs),h(oliding),c(oils)
'''
return self.__fx_mapper[fx]
def reset(self):
''' Resets all the datastores to their default values
'''
raise NotImplementedException("Context Reset")
def validate(self, fx, address, count=1):
''' Validates the request to make sure it is in range
:param fx: The function we are working with
:param address: The starting address
:param count: The number of values to test
:returns: True if the request in within range, False otherwise
'''
raise NotImplementedException("validate context values")
def getValues(self, fx, address, count=1):
''' Validates the request to make sure it is in range
:param fx: The function we are working with
:param address: The starting address
:param count: The number of values to retrieve
:returns: The requested values from a:a+c
'''
raise NotImplementedException("get context values")
def setValues(self, fx, address, values):
''' Sets the datastore with the supplied values
:param fx: The function we are working with
:param address: The starting address
:param values: The new values to be set
'''
raise NotImplementedException("set context values")
#---------------------------------------------------------------------------#
# Exported symbols
#---------------------------------------------------------------------------#
__all__ = [
'Singleton',
'IModbusDecoder', 'IModbusFramer', 'IModbusSlaveContext',
]
|