/usr/lib/python2.7/dist-packages/pysnmp/hlapi/asyncore/sync/compat/cmdgen.py is in python-pysnmp4 4.4.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 | #
# This file is part of pysnmp software.
#
# Copyright (c) 2005-2017, Ilya Etingof <etingof@gmail.com>
# License: http://snmplabs.com/pysnmp/license.html
#
# This is a Python 2.6- version of the same file at level up
#
from pysnmp.hlapi.asyncore import cmdgen
from pysnmp.hlapi.varbinds import *
from pysnmp.proto.rfc1905 import endOfMibView
from pysnmp.proto import errind
from pyasn1.type.univ import Null
__all__ = ['getCmd', 'nextCmd', 'setCmd', 'bulkCmd', 'next']
# noinspection PyShadowingBuiltins
def next(iter):
return iter.next()
def getCmd(snmpEngine, authData, transportTarget, contextData,
*varBinds, **options):
# noinspection PyShadowingNames
def cbFun(snmpEngine, sendRequestHandle,
errorIndication, errorStatus, errorIndex,
varBinds, cbCtx):
cbCtx['errorIndication'] = errorIndication
cbCtx['errorStatus'] = errorStatus
cbCtx['errorIndex'] = errorIndex
cbCtx['varBinds'] = varBinds
cbCtx = {}
if varBinds:
cmdgen.getCmd(snmpEngine, authData, transportTarget,
contextData, *varBinds,
**dict(cbFun=cbFun, cbCtx=cbCtx,
lookupMib=options.get('lookupMib', True)))
snmpEngine.transportDispatcher.runDispatcher()
errorIndication = cbCtx['errorIndication']
errorStatus = cbCtx['errorStatus']
errorIndex = cbCtx['errorIndex']
varBinds = cbCtx['varBinds']
else:
errorIndication = errorStatus = errorIndex = None
varBinds = []
yield errorIndication, errorStatus, errorIndex, varBinds
def setCmd(snmpEngine, authData, transportTarget, contextData,
*varBinds, **options):
# noinspection PyShadowingNames
def cbFun(snmpEngine, sendRequestHandle,
errorIndication, errorStatus, errorIndex,
varBinds, cbCtx):
cbCtx['errorIndication'] = errorIndication
cbCtx['errorStatus'] = errorStatus
cbCtx['errorIndex'] = errorIndex
cbCtx['varBinds'] = varBinds
cbCtx = {}
while True:
cmdgen.setCmd(snmpEngine, authData, transportTarget,
contextData, *varBinds,
**dict(cbFun=cbFun, cbCtx=cbCtx,
lookupMib=options.get('lookupMib', True)))
snmpEngine.transportDispatcher.runDispatcher()
yield (cbCtx['errorIndication'],
cbCtx['errorStatus'], cbCtx['errorIndex'],
cbCtx['varBinds'])
if cbCtx['errorIndication'] != errind.requestTimedOut:
break
def nextCmd(snmpEngine, authData, transportTarget, contextData,
*varBinds, **options):
# noinspection PyShadowingNames
def cbFun(snmpEngine, sendRequestHandle,
errorIndication, errorStatus, errorIndex,
varBindTable, cbCtx):
cbCtx['errorIndication'] = errorIndication
cbCtx['errorStatus'] = errorStatus
cbCtx['errorIndex'] = errorIndex
cbCtx['varBindTable'] = varBindTable
lexicographicMode = options.get('lexicographicMode', True)
ignoreNonIncreasingOid = options.get('ignoreNonIncreasingOid', False)
maxRows = options.get('maxRows', 0)
maxCalls = options.get('maxCalls', 0)
cbCtx = {}
vbProcessor = CommandGeneratorVarBinds()
initialVars = [x[0] for x in vbProcessor.makeVarBinds(snmpEngine, varBinds)]
totalRows = totalCalls = 0
while True:
cmdgen.nextCmd(snmpEngine, authData, transportTarget, contextData,
*[(x[0], Null('')) for x in varBinds],
**dict(cbFun=cbFun, cbCtx=cbCtx,
lookupMib=options.get('lookupMib', True)))
snmpEngine.transportDispatcher.runDispatcher()
errorIndication = cbCtx['errorIndication']
errorStatus = cbCtx['errorStatus']
errorIndex = cbCtx['errorIndex']
if ignoreNonIncreasingOid and errorIndication and \
isinstance(errorIndication, errind.OidNotIncreasing):
errorIndication = None
if errorIndication:
yield errorIndication, errorStatus, errorIndex, varBinds
if errorIndication != errind.requestTimedOut:
return
elif errorStatus:
if errorStatus == 2:
# Hide SNMPv1 noSuchName error which leaks in here
# from SNMPv1 Agent through internal pysnmp proxy.
errorStatus = errorStatus.clone(0)
errorIndex = errorIndex.clone(0)
yield errorIndication, errorStatus, errorIndex, varBinds
return
else:
varBinds = cbCtx['varBindTable'] and cbCtx['varBindTable'][0]
for idx, varBind in enumerate(varBinds):
name, val = varBind
if not isinstance(val, Null):
if lexicographicMode or initialVars[idx].isPrefixOf(name):
break
else:
return
totalRows += 1
totalCalls += 1
yield errorIndication, errorStatus, errorIndex, varBinds
if maxRows and totalRows >= maxRows or \
maxCalls and totalCalls >= maxCalls:
return
def bulkCmd(snmpEngine, authData, transportTarget, contextData,
nonRepeaters, maxRepetitions, *varBinds, **options):
# noinspection PyShadowingNames
def cbFun(snmpEngine, sendRequestHandle,
errorIndication, errorStatus, errorIndex,
varBindTable, cbCtx):
cbCtx['errorIndication'] = errorIndication
cbCtx['errorStatus'] = errorStatus
cbCtx['errorIndex'] = errorIndex
cbCtx['varBindTable'] = varBindTable
lexicographicMode = options.get('lexicographicMode', True)
ignoreNonIncreasingOid = options.get('ignoreNonIncreasingOid', False)
maxRows = options.get('maxRows', 0)
maxCalls = options.get('maxCalls', 0)
cbCtx = {}
vbProcessor = CommandGeneratorVarBinds()
initialVars = [x[0] for x in vbProcessor.makeVarBinds(snmpEngine, varBinds)]
nullVarBinds = [False] * len(initialVars)
totalRows = totalCalls = 0
stopFlag = False
while not stopFlag:
if maxRows and totalRows < maxRows:
maxRepetitions = min(maxRepetitions, maxRows - totalRows)
cmdgen.bulkCmd(snmpEngine, authData, transportTarget, contextData,
nonRepeaters, maxRepetitions,
*[(x[0], Null('')) for x in varBinds],
**dict(cbFun=cbFun, cbCtx=cbCtx,
lookupMib=options.get('lookupMib', True)))
snmpEngine.transportDispatcher.runDispatcher()
errorIndication = cbCtx['errorIndication']
errorStatus = cbCtx['errorStatus']
errorIndex = cbCtx['errorIndex']
varBindTable = cbCtx['varBindTable']
if ignoreNonIncreasingOid and errorIndication and \
isinstance(errorIndication, errind.OidNotIncreasing):
errorIndication = None
if errorIndication:
yield (errorIndication, errorStatus, errorIndex,
varBindTable and varBindTable[0] or [])
if errorIndication != errind.requestTimedOut:
return
elif errorStatus:
if errorStatus == 2:
# Hide SNMPv1 noSuchName error which leaks in here
# from SNMPv1 Agent through internal pysnmp proxy.
errorStatus = errorStatus.clone(0)
errorIndex = errorIndex.clone(0)
yield (errorIndication, errorStatus, errorIndex,
varBindTable and varBindTable[0] or [])
return
else:
for i in range(len(varBindTable)):
stopFlag = True
if len(varBindTable[i]) != len(initialVars):
varBindTable = i and varBindTable[:i - 1] or []
break
for j in range(len(varBindTable[i])):
name, val = varBindTable[i][j]
if nullVarBinds[j]:
varBindTable[i][j] = name, endOfMibView
continue
stopFlag = False
if isinstance(val, Null):
nullVarBinds[j] = True
elif not lexicographicMode and \
not initialVars[j].isPrefixOf(name):
varBindTable[i][j] = name, endOfMibView
nullVarBinds[j] = True
if stopFlag:
varBindTable = i and varBindTable[:i - 1] or []
break
totalRows += len(varBindTable)
totalCalls += 1
if maxRows and totalRows >= maxRows:
if totalRows > maxRows:
varBindTable = varBindTable[:-(totalRows - maxRows)]
stopFlag = True
if maxCalls and totalCalls >= maxCalls:
stopFlag = True
for varBinds in varBindTable:
yield errorIndication, errorStatus, errorIndex, varBinds
|