/usr/share/pyshared/pysnmp/entity/engine.py is in python-pysnmp4 4.2.2-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 | # SNMP engine
from pysnmp.proto.rfc3412 import MsgAndPduDispatcher
from pysnmp.proto.mpmod.rfc2576 import SnmpV1MessageProcessingModel, \
SnmpV2cMessageProcessingModel
from pysnmp.proto.mpmod.rfc3412 import SnmpV3MessageProcessingModel
from pysnmp.proto.secmod.rfc2576 import SnmpV1SecurityModel, \
SnmpV2cSecurityModel
from pysnmp.proto.secmod.rfc3414 import SnmpUSMSecurityModel
from pysnmp.proto.acmod import rfc3415, void
from pysnmp import error
class SnmpEngine:
def __init__(self, snmpEngineID=None, maxMessageSize=65507,
msgAndPduDsp=None):
if msgAndPduDsp is None:
self.msgAndPduDsp = MsgAndPduDispatcher()
else:
self.msgAndPduDsp = msgAndPduDsp
self.messageProcessingSubsystems = {
SnmpV1MessageProcessingModel.messageProcessingModelID:
SnmpV1MessageProcessingModel(),
SnmpV2cMessageProcessingModel.messageProcessingModelID:
SnmpV2cMessageProcessingModel(),
SnmpV3MessageProcessingModel.messageProcessingModelID:
SnmpV3MessageProcessingModel()
}
self.securityModels = {
SnmpV1SecurityModel.securityModelID: SnmpV1SecurityModel(),
SnmpV2cSecurityModel.securityModelID: SnmpV2cSecurityModel(),
SnmpUSMSecurityModel.securityModelID: SnmpUSMSecurityModel()
}
self.accessControlModel = {
void.accessModelID: void,
rfc3415.accessModelID: rfc3415
}
self.transportDispatcher = None
if self.msgAndPduDsp.mibInstrumController is None:
raise error.PySnmpError(
'MIB instrumentation does not yet exist'
)
snmpEngineMaxMessageSize, = self.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineMaxMessageSize')
snmpEngineMaxMessageSize.syntax = snmpEngineMaxMessageSize.syntax.clone(maxMessageSize)
snmpEngineBoots, = self.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineBoots')
snmpEngineBoots.syntax = snmpEngineBoots.syntax + 1
if snmpEngineID is not None:
origSnmpEngineID, = self.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineID')
origSnmpEngineID.syntax = origSnmpEngineID.syntax.clone(snmpEngineID)
# Transport dispatcher bindings
def __receiveMessageCbFun(
self,
transportDispatcher,
transportDomain,
transportAddress,
wholeMsg
):
self.msgAndPduDsp.receiveMessage(
self, transportDomain, transportAddress, wholeMsg
)
def __receiveTimerTickCbFun(self, timeNow):
self.msgAndPduDsp.receiveTimerTick(self, timeNow)
for mpHandler in self.messageProcessingSubsystems.values():
mpHandler.receiveTimerTick(self, timeNow)
for smHandler in self.securityModels.values():
smHandler.receiveTimerTick(self, timeNow)
def registerTransportDispatcher(self, transportDispatcher):
if self.transportDispatcher is not None:
raise error.PySnmpError(
'Transport dispatcher already registered'
)
transportDispatcher.registerRecvCbFun(
self.__receiveMessageCbFun
)
transportDispatcher.registerTimerCbFun(
self.__receiveTimerTickCbFun
)
self.transportDispatcher = transportDispatcher
def unregisterTransportDispatcher(self):
if self.transportDispatcher is None:
raise error.PySnmpError(
'Transport dispatcher not registered'
)
self.transportDispatcher.unregisterRecvCbFun()
self.transportDispatcher.unregisterTimerCbFun()
self.transportDispatcher = None
|