/usr/share/pyshared/epsilon/amprouter.py is in python-epsilon 0.6.0+r2713-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 | # -*- test-case-name: epsilon.test.test_amprouter -*-
# Copyright (c) 2008 Divmod. See LICENSE for details.
"""
This module provides an implementation of I{Routes}, a system for multiplexing
multiple L{IBoxReceiver}/I{IBoxSender} pairs over a single L{AMP} connection.
"""
from itertools import count
from zope.interface import implements
from twisted.protocols.amp import IBoxReceiver, IBoxSender
from epsilon.structlike import record
__metaclass__ = type
_ROUTE = '_route'
_unspecified = object()
class RouteNotConnected(Exception):
"""
An attempt was made to send AMP boxes through a L{Route} which is not yet
connected to anything.
"""
class Route(record('router receiver localRouteName remoteRouteName',
remoteRouteName=_unspecified)):
"""
Wrap up a route name and a box sender to transparently add the route name
to boxes sent through this box sender.
@ivar router: The L{Router} which created this route. This will be used
for route tear down and for its L{IBoxSender}, to send boxes.
@ivar receiver: The receiver which will be started with this object as its
sender.
@type localRouteName: C{unicode}
@ivar localRouteName: The name of this route as known by the other side of
the AMP connection. AMP boxes with this route are expected to be
routed to this object.
@type remoteRouteName: C{unicode} or L{NoneType}
@ivar remoteRouteName: The name of the route which will be added to all
boxes sent to this sender. If C{None}, no route will be added.
"""
implements(IBoxSender)
def connectTo(self, remoteRouteName):
"""
Set the name of the route which will be added to outgoing boxes.
"""
self.remoteRouteName = remoteRouteName
# This route must not be started before its router is started. If
# sender is None, then the router is not started. When the router is
# started, it will start this route.
if self.router._sender is not None:
self.start()
def unbind(self):
"""
Remove the association between this route and its router.
"""
del self.router._routes[self.localRouteName]
def start(self):
"""
Associate this object with a receiver as its L{IBoxSender}.
"""
self.receiver.startReceivingBoxes(self)
def stop(self, reason):
"""
Shut down the underlying receiver.
"""
self.receiver.stopReceivingBoxes(reason)
def sendBox(self, box):
"""
Add the route and send the box.
"""
if self.remoteRouteName is _unspecified:
raise RouteNotConnected()
if self.remoteRouteName is not None:
box[_ROUTE] = self.remoteRouteName.encode('ascii')
self.router._sender.sendBox(box)
def unhandledError(self, failure):
"""
Pass failures through to the wrapped L{IBoxSender} without
modification.
"""
self.router._sender.unhandledError(failure)
class Router:
"""
An L{IBoxReceiver} implementation which demultiplexes boxes from an AMP
connection being used with zero, one, or more routes.
@ivar _sender: An L{IBoxSender} provider which is used to allow
L{IBoxReceiver}s added to this router to send boxes.
@ivar _unstarted: A C{dict} similar to C{_routes} set before
C{startReceivingBoxes} is called and containing all routes which have
been added but not yet started. These are started and moved to the
C{_routes} dict when the router is started.
@ivar _routes: A C{dict} mapping local route identifiers to
L{IBoxReceivers} associated with them. This is only initialized after
C{startReceivingBoxes} is called.
@ivar _routeCounter: A L{itertools.count} instance used to generate unique
identifiers for routes in this router.
"""
implements(IBoxReceiver)
_routes = None
_sender = None
def __init__(self):
self._routeCounter = count()
self._unstarted = {}
def createRouteIdentifier(self):
"""
Return a route identifier which is not yet associated with a route on
this dispatcher.
@rtype: C{unicode}
"""
return unicode(self._routeCounter.next())
def bindRoute(self, receiver, routeName=_unspecified):
"""
Create a new route to associate the given route name with the given
receiver.
@type routeName: C{unicode} or L{NoneType}
@param routeName: The identifier for the newly created route. If
C{None}, boxes with no route in them will be delivered to this
receiver.
@rtype: L{Route}
"""
if routeName is _unspecified:
routeName = self.createRouteIdentifier()
# self._sender may yet be None; if so, this route goes into _unstarted
# and will have its sender set correctly in startReceivingBoxes below.
route = Route(self, receiver, routeName)
mapping = self._routes
if mapping is None:
mapping = self._unstarted
mapping[routeName] = route
return route
def startReceivingBoxes(self, sender):
"""
Initialize route tracking objects.
"""
self._sender = sender
for routeName, route in self._unstarted.iteritems():
# Any route which has been bound but which does not yet have a
# remote route name should not yet be started. These will be
# started in Route.connectTo.
if route.remoteRouteName is not _unspecified:
route.start()
self._routes = self._unstarted
self._unstarted = None
def ampBoxReceived(self, box):
"""
Dispatch the given box to the L{IBoxReceiver} associated with the route
indicated by the box, or handle it directly if there is no route.
"""
route = box.pop(_ROUTE, None)
self._routes[route].receiver.ampBoxReceived(box)
def stopReceivingBoxes(self, reason):
"""
Stop all the L{IBoxReceiver}s which have been added to this router.
"""
for routeName, route in self._routes.iteritems():
route.stop(reason)
self._routes = None
__all__ = ['Router', 'Route']
|