/usr/share/pyshared/pysnmp/carrier/twisted/dispatch.py is in python-pysnmp4 4.2.5-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | #
# Copyright (C) 2008 Truelite Srl <info@truelite.it>
# Author: Filippo Giunchedi <filippo@truelite.it>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License Version 2
# as published by the Free Software Foundation
#
# Description: Transport dispatcher based on twisted.internet.reactor
#
import sys, time, traceback
from twisted.internet import reactor, task
from pysnmp.carrier.base import AbstractTransportDispatcher
from pysnmp.error import PySnmpError
class TwistedDispatcher(AbstractTransportDispatcher):
"""TransportDispatcher based on twisted.internet.reactor"""
def __init__(self, *args, **kwargs):
AbstractTransportDispatcher.__init__(self)
self.__transportCount = 0
if 'timeout' in kwargs:
self.setTimerResolution(kwargs['timeout'])
self.loopingcall = task.LoopingCall(
lambda self=self: self.handleTimerTick(time.time())
)
def runDispatcher(self, timeout=0.0):
if not reactor.running:
try:
reactor.run()
except KeyboardInterrupt:
raise
except:
raise PySnmpError('reactor error: %s' % ';'.join(traceback.format_exception(*sys.exc_info())))
# jobstarted/jobfinished might be okay as-is
def registerTransport(self, tDomain, transport):
if not self.loopingcall.running and self.getTimerResolution() > 0:
self.loopingcall.start(self.getTimerResolution(), now = False)
AbstractTransportDispatcher.registerTransport(
self, tDomain, transport
)
self.__transportCount = self.__transportCount + 1
def unregisterTransport(self, tDomain):
t = AbstractTransportDispatcher.getTransport(self, tDomain)
if t is not None:
AbstractTransportDispatcher.unregisterTransport(self, tDomain)
self.__transportCount = self.__transportCount - 1
# The last transport has been removed, stop the timeout
if self.__transportCount == 0 and self.loopingcall.running:
self.loopingcall.stop()
|