/usr/share/pyshared/pysnmp/entity/rfc3413/twisted/cmdgen.py is in python-pysnmp4 4.2.5-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | from twisted.internet import defer
from pysnmp.entity.rfc3413 import cmdgen
from pyasn1.compat.octets import null
def _cbFun(sendRequestHandle, errorIndication,
errorStatus, errorIndex, varBinds, cbCtx):
cbCtx.callback((errorIndication, errorStatus, errorIndex, varBinds))
class GetCommandGenerator(cmdgen.GetCommandGenerator):
def sendReq(
self,
snmpEngine,
addrName,
varBinds,
contextEngineId=None,
contextName=null
):
df = defer.Deferred()
cmdgen.GetCommandGenerator.sendReq(
self,
snmpEngine,
addrName,
varBinds,
_cbFun,
df,
contextEngineId,
contextName
)
return df
class SetCommandGenerator(cmdgen.SetCommandGenerator):
def sendReq(
self,
snmpEngine,
addrName,
varBinds,
contextEngineId=None,
contextName=null
):
df = defer.Deferred()
cmdgen.SetCommandGenerator.sendReq(
self,
snmpEngine,
addrName,
varBinds,
_cbFun,
df,
contextEngineId,
contextName
)
return df
def _cbFunWithDeferred(sendRequestHandle, errorIndication,
errorStatus, errorIndex, varBinds, cbCtx):
df = cbCtx['df']
df.callback(
(errorIndication, errorStatus, errorIndex, varBinds)
)
# Callback function may return another deferred to indicate
# it wishes to continue MIB walk.
if isinstance(df.result, defer.Deferred):
cbCtx['df'] = df.result
return 1 # continue walking
class NextCommandGenerator(cmdgen.NextCommandGenerator):
def sendReq(
self,
snmpEngine,
addrName,
varBinds,
contextEngineId=None,
contextName=null
):
df = defer.Deferred()
cmdgen.NextCommandGenerator.sendReq(
self,
snmpEngine,
addrName,
varBinds,
_cbFunWithDeferred,
{ 'df': df }, # anonymous dictionary used for cbCtx
contextEngineId,
contextName
)
return df
class BulkCommandGenerator(cmdgen.BulkCommandGenerator):
def sendReq(
self,
snmpEngine,
addrName,
nonRepeaters,
maxRepetitions,
varBinds,
contextEngineId=None,
contextName=null
):
df = defer.Deferred()
cmdgen.BulkCommandGenerator.sendReq(
self,
snmpEngine,
addrName,
nonRepeaters,
maxRepetitions,
varBinds,
_cbFunWithDeferred,
{ 'df': df }, # anonymous dictionary used for cbCtx
contextEngineId=None,
contextName=null
)
return df
|