/usr/share/pyshared/pymodbus/datastore/context.py is in python-pymodbus 1.2.0-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | from pymodbus.exceptions import ParameterException
from pymodbus.interfaces import IModbusSlaveContext
from pymodbus.datastore.store import ModbusSequentialDataBlock
from pymodbus.constants import Defaults
#---------------------------------------------------------------------------#
# Logging
#---------------------------------------------------------------------------#
import logging;
_logger = logging.getLogger(__name__)
#---------------------------------------------------------------------------#
# Slave Contexts
#---------------------------------------------------------------------------#
class ModbusSlaveContext(IModbusSlaveContext):
'''
This creates a modbus data model with each data access
stored in its own personal block
'''
def __init__(self, *args, **kwargs):
''' Initializes the datastores, defaults to fully populated
sequential data blocks if none are passed in.
:param kwargs: Each element is a ModbusDataBlock
'di' - Discrete Inputs initializer
'co' - Coils initializer
'hr' - Holding Register initializer
'ir' - Input Registers iniatializer
'''
self.store = {}
self.store['d'] = kwargs.get('di', ModbusSequentialDataBlock.create())
self.store['c'] = kwargs.get('co', ModbusSequentialDataBlock.create())
self.store['i'] = kwargs.get('ir', ModbusSequentialDataBlock.create())
self.store['h'] = kwargs.get('hr', ModbusSequentialDataBlock.create())
def __str__(self):
''' Returns a string representation of the context
:returns: A string representation of the context
'''
return "Modbus Slave Context"
def reset(self):
''' Resets all the datastores to their default values '''
for datastore in self.store.values():
datastore.reset()
def validate(self, fx, address, count=1):
''' Validates the request to make sure it is in range
:param fx: The function we are working with
:param address: The starting address
:param count: The number of values to test
:returns: True if the request in within range, False otherwise
'''
address = address + 1 # section 4.4 of specification
_logger.debug("validate[%d] %d:%d" % (fx, address, count))
return self.store[self.decode(fx)].validate(address, count)
def getValues(self, fx, address, count=1):
''' Validates the request to make sure it is in range
:param fx: The function we are working with
:param address: The starting address
:param count: The number of values to retrieve
:returns: The requested values from a:a+c
'''
address = address + 1 # section 4.4 of specification
_logger.debug("getValues[%d] %d:%d" % (fx, address, count))
return self.store[self.decode(fx)].getValues(address, count)
def setValues(self, fx, address, values):
''' Sets the datastore with the supplied values
:param fx: The function we are working with
:param address: The starting address
:param values: The new values to be set
'''
address = address + 1 # section 4.4 of specification
_logger.debug("setValues[%d] %d:%d" % (fx, address, len(values)))
self.store[self.decode(fx)].setValues(address, values)
class ModbusServerContext(object):
''' This represents a master collection of slave contexts.
If single is set to true, it will be treated as a single
context so every unit-id returns the same context. If single
is set to false, it will be interpreted as a collection of
slave contexts.
'''
def __init__(self, slaves=None, single=True):
''' Initializes a new instance of a modbus server context.
:param slaves: A dictionary of client contexts
:param single: Set to true to treat this as a single context
'''
self.single = single
self.__slaves = slaves or {}
if self.single:
self.__slaves = {Defaults.UnitId: self.__slaves}
def __iter__(self):
''' Iterater over the current collection of slave
contexts.
:returns: An iterator over the slave contexts
'''
return self.__slaves.iteritems()
def __contains__(self, slave):
''' Check if the given slave is in this list
:param slave: slave The slave to check for existance
:returns: True if the slave exists, False otherwise
'''
return slave in self.__slaves
def __setitem__(self, slave, context):
''' Wrapper used to access the slave context
:param slave: slave The context to set
:param context: The new context to set for this slave
'''
if self.single: slave = Defaults.UnitId
if 0xf7 >= slave >= 0x00:
self.__slaves[slave] = context
else: raise ParameterException('slave index out of range')
def __getitem__(self, slave):
''' Wrapper used to access the slave context
:param slave: The slave context to get
:returns: The requested slave context
'''
if self.single: slave = Defaults.UnitId
if slave in self.__slaves:
return self.__slaves.get(slave)
else: raise ParameterException("slave does not exist, or is out of range")
|