This file is indexed.

/usr/share/pyshared/pika/frame.py is in python-pika 0.9.5-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
# ***** BEGIN LICENSE BLOCK *****
#
# For copyright and licensing please refer to COPYING.
#
# ***** END LICENSE BLOCK *****

import struct
import pika.log as log
import pika.spec as spec
import pika.exceptions as exceptions

from pika.object import object_


class Frame(object_):
    """
    Base Frame object mapping. Defines a behavior for all child classes for
    assignment of core attributes and implementation of the a core _marshal
    method which child classes use to create the binary AMQP frame.
    """

    def __init__(self, frame_type, channel_number):
        """
        Parameters:

        - frame_type: int
        - channel_number: int
        """
        self.frame_type = frame_type
        self.channel_number = channel_number

    def _marshal(self, pieces):
        """
        Create the full AMQP wire protocol frame data representation
        """
        payload = str(''.join(pieces))
        return struct.pack('>BHI',
                           self.frame_type,
                           self.channel_number,
                           len(payload)) + payload + chr(spec.FRAME_END)


class Method(Frame):
    """
    Base Method frame object mapping. AMQP method frames are mappend on top
    of this class for creating or accessing their data and attributes.
    """

    def __init__(self, channel_number, method):
        """
        Parameters:

        - channel_number: int
        - method: a spec.Class.Method object
        """
        Frame.__init__(self, spec.FRAME_METHOD, channel_number)
        self.method = method

    def marshal(self):
        """
        Return the AMQP binary encoded value of the frame
        """
        pieces = self.method.encode()
        pieces.insert(0, struct.pack('>I', self.method.INDEX))
        return self._marshal(pieces)


class Header(Frame):
    """
    Header frame object mapping. AMQP content header frames are mapped
    on top of this class for creating or accessing their data and attributes.
    """

    def __init__(self, channel_number, body_size, props):
        """
        Parameters:

        - channel_number: int
        - body_size: int
        - props: spec.BasicProperties object
        """
        Frame.__init__(self, spec.FRAME_HEADER, channel_number)
        self.body_size = body_size
        self.properties = props

    def marshal(self):
        """
        Return the AMQP binary encoded value of the frame
        """
        pieces = self.properties.encode()
        pieces.insert(0, struct.pack('>HxxQ',
                                     self.properties.INDEX,
                                     self.body_size))
        return self._marshal(pieces)


class Body(Frame):
    """
    Body frame object mapping class. AMQP content body frames are mapped on
    to this base class for getting/setting of attributes/data.
    """

    def __init__(self, channel_number, fragment):
        """
        Parameters:

        - channel_number: int
        - fragment: str
        """
        Frame.__init__(self, spec.FRAME_BODY, channel_number)
        self.fragment = fragment

    def marshal(self):
        """
        Return the AMQP binary encoded value of the frame
        """
        return self._marshal([self.fragment])


class Heartbeat(Frame):
    """
    Heartbeat frame object mapping class. AMQP Heartbeat frames are mapped on
    to this class for a common access structure to the attributes/data values.
    """

    def __init__(self):
        Frame.__init__(self, spec.FRAME_HEARTBEAT, 0)

    def marshal(self):
        """
        Return the AMQP binary encoded value of the frame
        """
        return self._marshal(list())


class ProtocolHeader(object_):
    """
    AMQP Protocol header frame class which provides a pythonic interface
    for creating AMQP Protocol headers
    """

    def __init__(self, major=None, minor=None, revision=None):
        """
        Construct a Protocol Header frame object for the specified AMQP version

        Parameters:

        - major: int
        - miinor: int
        - revision: int
        """
        self.frame_type = -1
        self.major = major or spec.PROTOCOL_VERSION[0]
        self.minor = minor or spec.PROTOCOL_VERSION[1]
        self.revision = revision or spec.PROTOCOL_VERSION[2]

    def marshal(self):
        """
        Return the full AMQP wire protocol frame data representation of the
        ProtocolHeader frame
        """
        return 'AMQP' + struct.pack('BBBB', 0,
                                    self.major,
                                    self.minor,
                                    self.revision)


class Dispatcher(object):
    """
    This handles content frames which come in in synchronous order as follows:

    1) Method Frame
    2) Header Frame
    3) Body Frame(s)

    The way that content handler works is to assign the active frame type to
    the self._handler variable. When we receive a header frame that is either
    a Basic.Deliver, Basic.GetOk, or Basic.Return, we will assign the handler
    to the ContentHandler._handle_header_frame. This will fire the next time
    we are called and parse out the attributes required to receive the body
    frames and assemble the content to be returned. We will then assign the
    self._handler to ContentHandler._handle_body_frame.

    _handle_body_frame has two methods inside of it, handle and finish.
    handle will be invoked until the requirements set by the header frame have
    been met at which point it will call the finish method. This calls
    the callback manager with the method frame, header frame and assembled
    body and then reset the self._handler to the _handle_method_frame method.
    """

    def __init__(self, callback_manager):
        # We start with Method frames always
        self._handler = self._handle_method_frame
        self.callbacks = callback_manager

    def process(self, frame):
        """
        Invoked by the ChannelTransport object when passed frames that are not
        setup in the rpc process and that don't have explicit reply types
        defined. This includes Basic.Publish, Basic.GetOk and Basic.Return
        """
        self._handler(frame)

    def _handle_method_frame(self, frame):
        """
        Receive a frame and process it, we should have content by the time we
        reach this handler, set the next handler to be the header frame handler
        """
        # If we don't have FrameMethod something is wrong so throw an exception
        if not isinstance(frame, Method):
            raise exceptions.UnexpectedFrameError(frame)

        # If the frame is a content related frame go deal with the content
        # By getting the content header frame
        if spec.has_content(frame.method.INDEX):
            self._handler = self._handle_header_frame(frame)

        # We were passed a frame we don't know how to deal with
        else:
            raise NotImplementedError(frame.method.__class__)

    def _handle_header_frame(self, frame):
        """
        Receive a header frame and process that, setting the next handler
        to the body frame handler
        """

        def handler(header_frame):
            # Make sure it's a header frame
            if not isinstance(header_frame, Header):
                raise exceptions.UnexpectedFrameError(header_frame)

            # Call the handle body frame including our header frame
            self._handle_body_frame(frame, header_frame)

        return handler

    def _handle_body_frame(self, method_frame, header_frame):
        """
        Receive body frames. We'll keep receiving them in handler until we've
        received the body size specified in the header frame. When done
        call our finish function which will call our transports callbacks
        """
        seen_so_far = [0]
        body_fragments = list()

        def handler(body_frame):
            # Make sure it's a body frame
            if not isinstance(body_frame, Body):
                raise exceptions.UnexpectedFrameError(body_frame)

            # Increment our counter so we know when we've had enough
            seen_so_far[0] += len(body_frame.fragment)

            # Append the fragment to our list
            body_fragments.append(body_frame.fragment)

            # Did we get enough bytes? If so finish
            if seen_so_far[0] == header_frame.body_size:
                finish()

            # Did we get too many bytes?
            elif seen_so_far[0] > header_frame.body_size:
                error = 'Received %i and only expected %i' % \
                        (seen_so_far[0], header_frame.body_size)
                raise exceptions.BodyTooLongError(error)

        def finish():
            # We're done so set our handler back to the method frame
            self._handler = self._handle_method_frame

            # Get our method name
            method = method_frame.method.__class__.__name__
            if method == 'Deliver':
                key = '_on_basic_deliver'
            elif method == 'GetOk':
                key = '_on_basic_get'
            elif method == 'Return':
                key = '_on_basic_return'
            else:
                raise Exception('Unimplemented Content Return Key')

            # Check for a processing callback for our method name
            self.callbacks.process(method_frame.channel_number,  # Prefix
                                   key,                          # Key
                                   self,                         # Caller
                                   method_frame,                 # Arg 1
                                   header_frame,                 # Arg 2
                                   ''.join(body_fragments))      # Arg 3

        # If we don't have a header frame body size, finish. Otherwise keep
        # going and keep our handler function as the frame handler
        if not header_frame.body_size:
            finish()
        else:
            self._handler = handler


def decode_frame(data_in):
    """
    Receives raw socket data and attempts to turn it into a frame.
    Returns bytes used to make the frame and the frame
    """
    # Look to see if it's a protocol header frame
    try:
        if data_in[0:4] == 'AMQP':
            major, minor, revision = struct.unpack_from('BBB', data_in, 5)
            return 8, ProtocolHeader(major, minor, revision)
    except IndexError:
        # We didn't get a full frame
        return 0, None
    except struct.error:
        # We didn't get a full frame
        return 0, None

    # Get the Frame Type, Channel Number and Frame Size
    try:
        frame_type, channel_number, frame_size = \
            struct.unpack('>BHL', data_in[0:7])
    except struct.error:
        # We didn't get a full frame
        return 0, None

    # Get the frame data
    frame_end = spec.FRAME_HEADER_SIZE +\
                frame_size +\
                spec.FRAME_END_SIZE

    # We don't have all of the frame yet
    if frame_end > len(data_in):
        return 0, None

    # The Frame termination chr is wrong
    if data_in[frame_end - 1] != chr(spec.FRAME_END):
        raise exceptions.InvalidFrameError("Invalid FRAME_END marker")

    # Get the raw frame data
    frame_data = data_in[spec.FRAME_HEADER_SIZE:frame_end - 1]

    if frame_type == spec.FRAME_METHOD:

        # Get the Method ID from the frame data
        method_id = struct.unpack_from('>I', frame_data)[0]

        # Get a Method object for this method_id
        method = spec.methods[method_id]()

        # Decode the content
        method.decode(frame_data, 4)

        # Return the amount of data consumed and the Method object
        return frame_end, Method(channel_number, method)

    elif frame_type == spec.FRAME_HEADER:

        # Return the header class and body size
        class_id, weight, body_size = struct.unpack_from('>HHQ', frame_data)

        # Get the Properties type
        properties = spec.props[class_id]()

        log.debug("<%r>", properties)

        # Decode the properties
        out = properties.decode(frame_data[12:])

        log.debug("<%r>", out)

        # Return a Header frame
        return frame_end, Header(channel_number, body_size, properties)

    elif frame_type == spec.FRAME_BODY:

        # Return the amount of data consumed and the Body frame w/ data
        return frame_end, Body(channel_number, frame_data)

    elif frame_type == spec.FRAME_HEARTBEAT:

        # Return the amount of data and a Heartbeat frame
        return frame_end, Heartbeat()

    raise exceptions.InvalidFrameError("Unknown frame type: %i" % frame_type)