/usr/share/pyshared/txzmq/req_rep.py is in python-txzmq 0.6.2-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 | """
ZeroMQ REQ-REP wrappers.
"""
import uuid
import warnings
from zmq.core import constants
from twisted.internet import defer
from txzmq.connection import ZmqConnection
class ZmqREQConnection(ZmqConnection):
"""
A REQ connection.
This is implemented with an underlying DEALER socket, even though
semantics are closer to REQ socket.
"""
socketType = constants.DEALER
# the number of new UUIDs to generate when the pool runs out of them
UUID_POOL_GEN_SIZE = 5
def __init__(self, *args, **kwargs):
ZmqConnection.__init__(self, *args, **kwargs)
self._requests = {}
self._uuids = []
def _getNextId(self):
"""
Returns an unique id.
By default, generates pool of UUID in increments
of C{UUID_POOL_GEN_SIZE}. Could be overridden to
provide custom ID generation.
@return: generated unique "on the wire" message ID
@rtype: C{str}
"""
if not self._uuids:
for _ in xrange(self.UUID_POOL_GEN_SIZE):
self._uuids.append(str(uuid.uuid4()))
return self._uuids.pop()
def _releaseId(self, msgId):
"""
Release message ID to the pool.
@param msgId: message ID, no longer on the wire
@type msgId: C{str}
"""
self._uuids.append(msgId)
if len(self._uuids) > 2 * self.UUID_POOL_GEN_SIZE:
self._uuids[-self.UUID_POOL_GEN_SIZE:] = []
def sendMsg(self, *messageParts):
"""
Send L{message} with specified L{tag}.
@param messageParts: message data
@type messageParts: C{tuple}
"""
d = defer.Deferred()
messageId = self._getNextId()
self._requests[messageId] = d
self.send([messageId, ''] + list(messageParts))
return d
def messageReceived(self, message):
"""
Called on incoming message from ZeroMQ.
@param message: message data
"""
msgId, _, msg = message[0], message[1], message[2:]
d = self._requests.pop(msgId)
self._releaseId(msgId)
d.callback(msg)
class ZmqREPConnection(ZmqConnection):
"""
A REP connection.
This is implemented with an underlying ROUTER socket, but the semantics
are close to REP socket.
"""
socketType = constants.ROUTER
def __init__(self, *args, **kwargs):
ZmqConnection.__init__(self, *args, **kwargs)
self._routingInfo = {} # keep track of routing info
def reply(self, messageId, *messageParts):
"""
Send L{message} with specified L{tag}.
@param messageId: message uuid
@type messageId: C{str}
@param message: message data
@type message: C{str}
"""
routingInfo = self._routingInfo.pop(messageId)
self.send(routingInfo + [messageId, ''] + list(messageParts))
def messageReceived(self, message):
"""
Called on incoming message from ZeroMQ.
@param message: message data
"""
i = message.index('')
assert i > 0
(routingInfo, msgId, payload) = (
message[:i - 1], message[i - 1], message[i + 1:])
msgParts = payload[0:]
self._routingInfo[msgId] = routingInfo
self.gotMessage(msgId, *msgParts)
def gotMessage(self, messageId, *messageParts):
"""
Called on incoming message.
@param messageId: message uuid
@type messageId: C{str}
@param messageParts: message data
@param tag: message tag
"""
raise NotImplementedError(self)
class ZmqXREPConnection(ZmqREPConnection):
"""
Provided for backwards compatibility.
Deprecated in favour of either ZmqREPConnection or ZmqROUTERConnection.
"""
def __init__(self, factory, *endpoints):
warnings.warn("ZmqXREPConnection is deprecated in favour of "
"either ZmqREPConnection or ZmqROUTERConnection",
DeprecationWarning)
ZmqREPConnection.__init__(self, factory)
self.add_endpoints(endpoints)
class ZmqXREQConnection(ZmqREQConnection):
"""
Provided for backwards compatibility.
Deprecated in favour of either ZmqREQConnection or ZmqDEALERConnection.
"""
def __init__(self, factory, *endpoints):
warnings.warn("ZmqXREQConnection is deprecated in favour of "
"either ZmqREQConnection or ZmqDEALERConnection",
DeprecationWarning)
ZmqREQConnection.__init__(self, factory)
self.add_endpoints(endpoints)
|