This file is indexed.

/usr/lib/python2.7/dist-packages/txamqp/protocol.py is in python-txamqp 0.6.1-0ubuntu2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
# coding: utf-8
from twisted.internet import defer, protocol
from twisted.internet.task import LoopingCall
from twisted.protocols import basic
from txamqp import spec
from txamqp.codec import Codec
from txamqp.connection import Header, Frame, Method, Body, Heartbeat
from txamqp.message import Message
from txamqp.content import Content
from txamqp.queue import TimeoutDeferredQueue, Closed as QueueClosed
from txamqp.client import TwistedEvent, Closed
from cStringIO import StringIO
import struct
from time import time

class GarbageException(Exception):
    pass

# An AMQP channel is a virtual connection that shares the
# same socket with others channels. One can have many channels
# per connection
class AMQChannel(object):

    def __init__(self, id, outgoing):
        self.id = id
        self.outgoing = outgoing
        self.incoming = TimeoutDeferredQueue()
        self.responses = TimeoutDeferredQueue()

        self.queue = None

        self.closed = False
        self.reason = None

    def close(self, reason):
        if self.closed:
            return
        self.closed = True
        self.reason = reason
        self.incoming.close()
        self.responses.close()

    def dispatch(self, frame, work):
        payload = frame.payload
        if isinstance(payload, Method):
            if payload.method.response:
                self.queue = self.responses
            else:
                self.queue = self.incoming
                work.put(self.incoming)
        self.queue.put(frame)

    @defer.inlineCallbacks
    def invoke(self, method, args, content=None):
        if self.closed:
            raise Closed(self.reason)
        frame = Frame(self.id, Method(method, *args))
        self.outgoing.put(frame)

        if method.content:
            if content == None:
                content = Content()
            self.writeContent(method.klass, content, self.outgoing)

        try:
            # here we depend on all nowait fields being named nowait
            f = method.fields.byname["nowait"]
            nowait = args[method.fields.index(f)]
        except KeyError:
            nowait = False

        try:
            if not nowait and method.responses:
                resp = (yield self.responses.get()).payload

                if resp.method.content:
                    content = yield readContent(self.responses)
                else:
                    content = None
                if resp.method in method.responses:
                    defer.returnValue(Message(resp.method, resp.args, content))
                else:
                    raise ValueError(resp)
        except QueueClosed, e:
            if self.closed:
                raise Closed(self.reason)
            else:
                raise e

    def writeContent(self, klass, content, queue):
        size = content.size()
        header = Frame(self.id, Header(klass, content.weight(), size, **content.properties))
        queue.put(header)
        for child in content.children:
            self.writeContent(klass, child, queue)
        # should split up if content.body exceeds max frame size
        if size > 0:
            queue.put(Frame(self.id, Body(content.body)))

class FrameReceiver(protocol.Protocol, basic._PauseableMixin):

    frame_mode = False
    MAX_LENGTH = 4096
    HEADER_LENGTH = 1 + 2 + 4 + 1
    __buffer = ''

    def __init__(self, spec):
        self.spec = spec
        self.FRAME_END = self.spec.constants.bypyname["frame_end"].id

    # packs a frame and writes it to the underlying transport
    def sendFrame(self, frame):
        data = self._packFrame(frame)
        self.transport.write(data)

    # packs a frame, see qpid.connection.Connection#write
    def _packFrame(self, frame):
        s = StringIO()
        c = Codec(s)
        c.encode_octet(self.spec.constants.bypyname[frame.payload.type].id)
        c.encode_short(frame.channel)
        frame.payload.encode(c)
        c.encode_octet(self.FRAME_END)
        data = s.getvalue()
        return data

    # unpacks a frame, see qpid.connection.Connection#read
    def _unpackFrame(self, data):
        s = StringIO(data)
        c = Codec(s)
        frameType = spec.pythonize(self.spec.constants.byid[c.decode_octet()].name)
        channel = c.decode_short()
        payload = Frame.DECODERS[frameType].decode(self.spec, c)
        end = c.decode_octet()
        if end != self.FRAME_END:
            raise GarbageException('frame error: expected %r, got %r' % (self.FRAME_END, end))
        frame = Frame(channel, payload)
        return frame

    def setRawMode(self):
        self.frame_mode = False

    def setFrameMode(self, extra=''):
        self.frame_mode = True
        if extra:
            return self.dataReceived(extra)

    def dataReceived(self, data):
        self.__buffer = self.__buffer + data
        while self.frame_mode and not self.paused:
            sz = len(self.__buffer) - self.HEADER_LENGTH
            if sz >= 0:
                length, = struct.unpack("!I", self.__buffer[3:7]) # size = 4 bytes
                if sz >= length:
                    packet = self.__buffer[:self.HEADER_LENGTH + length]
                    self.__buffer = self.__buffer[self.HEADER_LENGTH + length:]
                    frame = self._unpackFrame(packet)

                    why = self.frameReceived(frame)
                    if why or self.transport and self.transport.disconnecting:
                        return why
                    else:
                        continue
            if len(self.__buffer) > self.MAX_LENGTH:
                frame, self.__buffer = self.__buffer, ''
                return self.frameLengthExceeded(frame)
            break
        else:
            if not self.paused:
                data = self.__buffer
                self.__buffer = ''
                if data:
                    return self.rawDataReceived(data)

    def sendInitString(self):
        initString = "!4s4B"
        s = StringIO()
        c = Codec(s)
        c.pack(initString, "AMQP", 1, 1, self.spec.major, self.spec.minor)
        self.transport.write(s.getvalue())

@defer.inlineCallbacks
def readContent(queue):
    frame = yield queue.get()
    header = frame.payload
    children = []
    for i in range(header.weight):
        content = yield readContent(queue)
        children.append(content)
    size = header.size
    read = 0
    buf = StringIO()
    while read < size:
        body = yield queue.get()
        content = body.payload.content
        buf.write(content)
        read += len(content)
    defer.returnValue(Content(buf.getvalue(), children, header.properties.copy()))

class AMQClient(FrameReceiver):

    channelClass = AMQChannel

    # Max unreceived heartbeat frames. The AMQP standard says it's 3.
    MAX_UNSEEN_HEARTBEAT = 3

    def __init__(self, delegate, vhost, spec, heartbeat=0, clock=None, insist=False):
        FrameReceiver.__init__(self, spec)
        self.delegate = delegate

        # XXX Cyclic dependency
        self.delegate.client = self

        self.vhost = vhost

        self.channelFactory = type("Channel%s" % self.spec.klass.__name__,
                                    (self.channelClass, self.spec.klass), {})
        self.channels = {}
        self.channelLock = defer.DeferredLock()

        self.outgoing = defer.DeferredQueue()
        self.work = defer.DeferredQueue()

        self.started = TwistedEvent()

        self.queueLock = defer.DeferredLock()
        self.basic_return_queue = TimeoutDeferredQueue()

        self.queues = {}

        self.outgoing.get().addCallback(self.writer)
        self.work.get().addCallback(self.worker)
        self.heartbeatInterval = heartbeat
        self.insist = insist
        if self.heartbeatInterval > 0:
            if clock is None:
                from twisted.internet import reactor as clock
            self.clock = clock
            self.checkHB = self.clock.callLater(self.heartbeatInterval *
                          self.MAX_UNSEEN_HEARTBEAT, self.checkHeartbeat)
            self.sendHB = LoopingCall(self.sendHeartbeat)
            d = self.started.wait()
            d.addCallback(lambda _: self.reschedule_sendHB())
            d.addCallback(lambda _: self.reschedule_checkHB())

    def reschedule_sendHB(self):
        if self.heartbeatInterval > 0:
            if self.sendHB.running:
                self.sendHB.stop()
            self.sendHB.start(self.heartbeatInterval, now=False)

    def reschedule_checkHB(self):
        if self.checkHB.active():
            self.checkHB.cancel()
        self.checkHB = self.clock.callLater(self.heartbeatInterval *
              self.MAX_UNSEEN_HEARTBEAT, self.checkHeartbeat)

    def check_0_8(self):
        return (self.spec.minor, self.spec.major) == (0, 8)

    @defer.inlineCallbacks
    def channel(self, id):
        yield self.channelLock.acquire()
        try:
            try:
                ch = self.channels[id]
            except KeyError:
                ch = self.channelFactory(id, self.outgoing)
                self.channels[id] = ch
        finally:
            self.channelLock.release()
        defer.returnValue(ch)

    @defer.inlineCallbacks
    def queue(self, key):
        yield self.queueLock.acquire()
        try:
            try:
                q = self.queues[key]
            except KeyError:
                q = TimeoutDeferredQueue()
                self.queues[key] = q
        finally:
            self.queueLock.release()
        defer.returnValue(q)

    def close(self, reason):
        for ch in self.channels.values():
            ch.close(reason)
        for q in self.queues.values():
            q.close()
        self.delegate.close(reason)

    def writer(self, frame):
        self.sendFrame(frame)
        self.outgoing.get().addCallback(self.writer)

    def worker(self, queue):
        d = self.dispatch(queue)
        def cb(ign):
            self.work.get().addCallback(self.worker)
        d.addCallback(cb)
        d.addErrback(self.close)

    @defer.inlineCallbacks
    def dispatch(self, queue):
        frame = yield queue.get()
        channel = yield self.channel(frame.channel)
        payload = frame.payload
        if payload.method.content:
            content = yield readContent(queue)
        else:
            content = None
        # Let the caller deal with exceptions thrown here.
        message = Message(payload.method, payload.args, content)
        self.delegate.dispatch(channel, message)

    # As soon as we connect to the target AMQP broker, send the init string
    def connectionMade(self):
        self.sendInitString()
        self.setFrameMode()

    def frameReceived(self, frame):
        self.processFrame(frame)

    def sendFrame(self, frame):
        if frame.payload.type != Frame.HEARTBEAT:
            self.reschedule_sendHB()
        FrameReceiver.sendFrame(self, frame)

    @defer.inlineCallbacks
    def processFrame(self, frame):
        ch = yield self.channel(frame.channel)
        if frame.payload.type == Frame.HEARTBEAT:
            self.lastHBReceived = time()
        else:
            ch.dispatch(frame, self.work)
        if self.heartbeatInterval > 0:
            self.reschedule_checkHB()

    @defer.inlineCallbacks
    def authenticate(self, username, password, mechanism='AMQPLAIN', locale='en_US'):
        if self.check_0_8():
            response = {"LOGIN": username, "PASSWORD": password}
        else:
            response = "\0" + username + "\0" + password

        yield self.start(response, mechanism, locale)

    @defer.inlineCallbacks
    def start(self, response, mechanism='AMQPLAIN', locale='en_US'):
        self.response = response
        self.mechanism = mechanism
        self.locale = locale

        yield self.started.wait()

        channel0 = yield self.channel(0)
        if self.check_0_8():
            result = yield channel0.connection_open(self.vhost, insist=self.insist)
        else:
            result = yield channel0.connection_open(self.vhost)
        defer.returnValue(result)

    def sendHeartbeat(self):
        self.sendFrame(Frame(0, Heartbeat()))
        self.lastHBSent = time()

    def checkHeartbeat(self):
        if self.checkHB.active():
            self.checkHB.cancel()
        self.transport.loseConnection()

    def connectionLost(self, reason):
        if self.heartbeatInterval > 0:
            if self.sendHB.running:
                self.sendHB.stop()
            if self.checkHB.active():
                self.checkHB.cancel()
        self.close(reason)