/usr/lib/python2.7/dist-packages/twisted/words/xish/utility.py is in python-twisted-words 13.2.0-1ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 | # -*- test-case-name: twisted.words.test.test_xishutil -*-
#
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Event Dispatching and Callback utilities.
"""
from twisted.python import log
from twisted.words.xish import xpath
class _MethodWrapper(object):
"""
Internal class for tracking method calls.
"""
def __init__(self, method, *args, **kwargs):
self.method = method
self.args = args
self.kwargs = kwargs
def __call__(self, *args, **kwargs):
nargs = self.args + args
nkwargs = self.kwargs.copy()
nkwargs.update(kwargs)
self.method(*nargs, **nkwargs)
class CallbackList:
"""
Container for callbacks.
Event queries are linked to lists of callables. When a matching event
occurs, these callables are called in sequence. One-time callbacks
are removed from the list after the first time the event was triggered.
Arguments to callbacks are split spread across two sets. The first set,
callback specific, is passed to C{addCallback} and is used for all
subsequent event triggers. The second set is passed to C{callback} and is
event specific. Positional arguments in the second set come after the
positional arguments of the first set. Keyword arguments in the second set
override those in the first set.
@ivar callbacks: The registered callbacks as mapping from the callable to a
tuple of a wrapper for that callable that keeps the
callback specific arguments and a boolean that signifies
if it is to be called only once.
@type callbacks: C{dict}
"""
def __init__(self):
self.callbacks = {}
def addCallback(self, onetime, method, *args, **kwargs):
"""
Add callback.
The arguments passed are used as callback specific arguments.
@param onetime: If C{True}, this callback is called at most once.
@type onetime: C{bool}
@param method: The callback callable to be added.
@param args: Positional arguments to the callable.
@type args: C{list}
@param kwargs: Keyword arguments to the callable.
@type kwargs: C{dict}
"""
if not method in self.callbacks:
self.callbacks[method] = (_MethodWrapper(method, *args, **kwargs),
onetime)
def removeCallback(self, method):
"""
Remove callback.
@param method: The callable to be removed.
"""
if method in self.callbacks:
del self.callbacks[method]
def callback(self, *args, **kwargs):
"""
Call all registered callbacks.
The passed arguments are event specific and augment and override
the callback specific arguments as described above.
@note: Exceptions raised by callbacks are trapped and logged. They will
not propagate up to make sure other callbacks will still be
called, and the event dispatching always succeeds.
@param args: Positional arguments to the callable.
@type args: C{list}
@param kwargs: Keyword arguments to the callable.
@type kwargs: C{dict}
"""
for key, (methodwrapper, onetime) in self.callbacks.items():
try:
methodwrapper(*args, **kwargs)
except:
log.err()
if onetime:
del self.callbacks[key]
def isEmpty(self):
"""
Return if list of registered callbacks is empty.
@rtype: C{bool}
"""
return len(self.callbacks) == 0
class EventDispatcher:
"""
Event dispatching service.
The C{EventDispatcher} allows observers to be registered for certain events
that are dispatched. There are two types of events: XPath events and Named
events.
Every dispatch is triggered by calling L{dispatch} with a data object and,
for named events, the name of the event.
When an XPath type event is dispatched, the associated object is assumed to
be an L{Element<twisted.words.xish.domish.Element>} instance, which is
matched against all registered XPath queries. For every match, the
respective observer will be called with the data object.
A named event will simply call each registered observer for that particular
event name, with the data object. Unlike XPath type events, the data object
is not restricted to L{Element<twisted.words.xish.domish.Element>}, but can
be anything.
When registering observers, the event that is to be observed is specified
using an L{xpath.XPathQuery} instance or a string. In the latter case, the
string can also contain the string representation of an XPath expression.
To distinguish these from named events, each named event should start with
a special prefix that is stored in C{self.prefix}. It defaults to
C{//event/}.
Observers registered using L{addObserver} are persistent: after the
observer has been triggered by a dispatch, it remains registered for a
possible next dispatch. If instead L{addOnetimeObserver} was used to
observe an event, the observer is removed from the list of observers after
the first observed event.
Observers can also be prioritized, by providing an optional C{priority}
parameter to the L{addObserver} and L{addOnetimeObserver} methods. Higher
priority observers are then called before lower priority observers.
Finally, observers can be unregistered by using L{removeObserver}.
"""
def __init__(self, eventprefix="//event/"):
self.prefix = eventprefix
self._eventObservers = {}
self._xpathObservers = {}
self._dispatchDepth = 0 # Flag indicating levels of dispatching
# in progress
self._updateQueue = [] # Queued updates for observer ops
def _getEventAndObservers(self, event):
if isinstance(event, xpath.XPathQuery):
# Treat as xpath
observers = self._xpathObservers
else:
if self.prefix == event[:len(self.prefix)]:
# Treat as event
observers = self._eventObservers
else:
# Treat as xpath
event = xpath.internQuery(event)
observers = self._xpathObservers
return event, observers
def addOnetimeObserver(self, event, observerfn, priority=0, *args, **kwargs):
"""
Register a one-time observer for an event.
Like L{addObserver}, but is only triggered at most once. See there
for a description of the parameters.
"""
self._addObserver(True, event, observerfn, priority, *args, **kwargs)
def addObserver(self, event, observerfn, priority=0, *args, **kwargs):
"""
Register an observer for an event.
Each observer will be registered with a certain priority. Higher
priority observers get called before lower priority observers.
@param event: Name or XPath query for the event to be monitored.
@type event: C{str} or L{xpath.XPathQuery}.
@param observerfn: Function to be called when the specified event
has been triggered. This callable takes
one parameter: the data object that triggered
the event. When specified, the C{*args} and
C{**kwargs} parameters to addObserver are being used
as additional parameters to the registered observer
callable.
@param priority: (Optional) priority of this observer in relation to
other observer that match the same event. Defaults to
C{0}.
@type priority: C{int}
"""
self._addObserver(False, event, observerfn, priority, *args, **kwargs)
def _addObserver(self, onetime, event, observerfn, priority, *args, **kwargs):
# If this is happening in the middle of the dispatch, queue
# it up for processing after the dispatch completes
if self._dispatchDepth > 0:
self._updateQueue.append(lambda:self._addObserver(onetime, event, observerfn, priority, *args, **kwargs))
return
event, observers = self._getEventAndObservers(event)
if priority not in observers:
cbl = CallbackList()
observers[priority] = {event: cbl}
else:
priorityObservers = observers[priority]
if event not in priorityObservers:
cbl = CallbackList()
observers[priority][event] = cbl
else:
cbl = priorityObservers[event]
cbl.addCallback(onetime, observerfn, *args, **kwargs)
def removeObserver(self, event, observerfn):
"""
Remove callable as observer for an event.
The observer callable is removed for all priority levels for the
specified event.
@param event: Event for which the observer callable was registered.
@type event: C{str} or L{xpath.XPathQuery}
@param observerfn: Observer callable to be unregistered.
"""
# If this is happening in the middle of the dispatch, queue
# it up for processing after the dispatch completes
if self._dispatchDepth > 0:
self._updateQueue.append(lambda:self.removeObserver(event, observerfn))
return
event, observers = self._getEventAndObservers(event)
emptyLists = []
for priority, priorityObservers in observers.iteritems():
for query, callbacklist in priorityObservers.iteritems():
if event == query:
callbacklist.removeCallback(observerfn)
if callbacklist.isEmpty():
emptyLists.append((priority, query))
for priority, query in emptyLists:
del observers[priority][query]
def dispatch(self, obj, event=None):
"""
Dispatch an event.
When C{event} is C{None}, an XPath type event is triggered, and
C{obj} is assumed to be an instance of
L{Element<twisted.words.xish.domish.Element>}. Otherwise, C{event}
holds the name of the named event being triggered. In the latter case,
C{obj} can be anything.
@param obj: The object to be dispatched.
@param event: Optional event name.
@type event: C{str}
"""
foundTarget = False
self._dispatchDepth += 1
if event != None:
# Named event
observers = self._eventObservers
match = lambda query, obj: query == event
else:
# XPath event
observers = self._xpathObservers
match = lambda query, obj: query.matches(obj)
priorities = observers.keys()
priorities.sort()
priorities.reverse()
emptyLists = []
for priority in priorities:
for query, callbacklist in observers[priority].iteritems():
if match(query, obj):
callbacklist.callback(obj)
foundTarget = True
if callbacklist.isEmpty():
emptyLists.append((priority, query))
for priority, query in emptyLists:
del observers[priority][query]
self._dispatchDepth -= 1
# If this is a dispatch within a dispatch, don't
# do anything with the updateQueue -- it needs to
# wait until we've back all the way out of the stack
if self._dispatchDepth == 0:
# Deal with pending update operations
for f in self._updateQueue:
f()
self._updateQueue = []
return foundTarget
class XmlPipe(object):
"""
XML stream pipe.
Connects two objects that communicate stanzas through an XML stream like
interface. Each of the ends of the pipe (sink and source) can be used to
send XML stanzas to the other side, or add observers to process XML stanzas
that were sent from the other side.
XML pipes are usually used in place of regular XML streams that are
transported over TCP. This is the reason for the use of the names source
and sink for both ends of the pipe. The source side corresponds with the
entity that initiated the TCP connection, whereas the sink corresponds with
the entity that accepts that connection. In this object, though, the source
and sink are treated equally.
Unlike Jabber
L{XmlStream<twisted.words.protocols.jabber.xmlstream.XmlStream>}s, the sink
and source objects are assumed to represent an eternal connected and
initialized XML stream. As such, events corresponding to connection,
disconnection, initialization and stream errors are not dispatched or
processed.
@since: 8.2
@ivar source: Source XML stream.
@ivar sink: Sink XML stream.
"""
def __init__(self):
self.source = EventDispatcher()
self.sink = EventDispatcher()
self.source.send = lambda obj: self.sink.dispatch(obj)
self.sink.send = lambda obj: self.source.dispatch(obj)
|