This file is indexed.

/usr/share/pyshared/txzmq/req_rep.py is in python-txzmq 0.6.2-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""
ZeroMQ REQ-REP wrappers.
"""
import uuid
import warnings

from zmq.core import constants

from twisted.internet import defer

from txzmq.connection import ZmqConnection


class ZmqREQConnection(ZmqConnection):
    """
    A REQ connection.

    This is implemented with an underlying DEALER socket, even though
    semantics are closer to REQ socket.

    """
    socketType = constants.DEALER

    # the number of new UUIDs to generate when the pool runs out of them
    UUID_POOL_GEN_SIZE = 5

    def __init__(self, *args, **kwargs):
        ZmqConnection.__init__(self, *args, **kwargs)
        self._requests = {}
        self._uuids = []

    def _getNextId(self):
        """
        Returns an unique id.

        By default, generates pool of UUID in increments
        of C{UUID_POOL_GEN_SIZE}. Could be overridden to
        provide custom ID generation.

        @return: generated unique "on the wire" message ID
        @rtype: C{str}
        """
        if not self._uuids:
            for _ in xrange(self.UUID_POOL_GEN_SIZE):
                self._uuids.append(str(uuid.uuid4()))
        return self._uuids.pop()

    def _releaseId(self, msgId):
        """
        Release message ID to the pool.

        @param msgId: message ID, no longer on the wire
        @type msgId: C{str}
        """
        self._uuids.append(msgId)
        if len(self._uuids) > 2 * self.UUID_POOL_GEN_SIZE:
            self._uuids[-self.UUID_POOL_GEN_SIZE:] = []

    def sendMsg(self, *messageParts):
        """
        Send L{message} with specified L{tag}.

        @param messageParts: message data
        @type messageParts: C{tuple}
        """
        d = defer.Deferred()
        messageId = self._getNextId()
        self._requests[messageId] = d
        self.send([messageId, ''] + list(messageParts))
        return d

    def messageReceived(self, message):
        """
        Called on incoming message from ZeroMQ.

        @param message: message data
        """
        msgId, _, msg = message[0], message[1], message[2:]
        d = self._requests.pop(msgId)
        self._releaseId(msgId)
        d.callback(msg)


class ZmqREPConnection(ZmqConnection):
    """
    A REP connection.

    This is implemented with an underlying ROUTER socket, but the semantics
    are close to REP socket.
    """
    socketType = constants.ROUTER

    def __init__(self, *args, **kwargs):
        ZmqConnection.__init__(self, *args, **kwargs)
        self._routingInfo = {}  # keep track of routing info

    def reply(self, messageId, *messageParts):
        """
        Send L{message} with specified L{tag}.

        @param messageId: message uuid
        @type messageId: C{str}
        @param message: message data
        @type message: C{str}
        """
        routingInfo = self._routingInfo.pop(messageId)
        self.send(routingInfo + [messageId, ''] + list(messageParts))

    def messageReceived(self, message):
        """
        Called on incoming message from ZeroMQ.

        @param message: message data
        """
        i = message.index('')
        assert i > 0
        (routingInfo, msgId, payload) = (
            message[:i - 1], message[i - 1], message[i + 1:])
        msgParts = payload[0:]
        self._routingInfo[msgId] = routingInfo
        self.gotMessage(msgId, *msgParts)

    def gotMessage(self, messageId, *messageParts):
        """
        Called on incoming message.

        @param messageId: message uuid
        @type messageId: C{str}
        @param messageParts: message data
        @param tag: message tag
        """
        raise NotImplementedError(self)


class ZmqXREPConnection(ZmqREPConnection):
    """
    Provided for backwards compatibility.

    Deprecated in favour of either ZmqREPConnection or ZmqROUTERConnection.

    """

    def __init__(self, factory, *endpoints):
        warnings.warn("ZmqXREPConnection is deprecated in favour of "
                      "either ZmqREPConnection or ZmqROUTERConnection",
                      DeprecationWarning)
        ZmqREPConnection.__init__(self, factory)
        self.add_endpoints(endpoints)


class ZmqXREQConnection(ZmqREQConnection):
    """
    Provided for backwards compatibility.

    Deprecated in favour of either ZmqREQConnection or ZmqDEALERConnection.

    """

    def __init__(self, factory, *endpoints):
        warnings.warn("ZmqXREQConnection is deprecated in favour of "
                      "either ZmqREQConnection or ZmqDEALERConnection",
                      DeprecationWarning)
        ZmqREQConnection.__init__(self, factory)
        self.add_endpoints(endpoints)