This file is indexed.

/usr/lib/python3/dist-packages/tlslite/defragmenter.py is in python3-tlslite-ng 0.5.1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# Copyright (c) 2015, Hubert Kario
#
# See the LICENSE file for legal information regarding use of this file.

""" Helper package for handling fragmentation of messages """

from __future__ import generators

from .utils.codec import Parser

class Defragmenter(object):

    """
    Class for demultiplexing TLS messages.

    Since the messages can be interleaved and fragmented between each other
    we need to cache not complete ones and return in order of urgency.

    Supports messages with given size (like Alerts) or with a length header
    in specific place (like Handshake messages).

    @ivar priorities: order in which messages from given types should be
    returned.
    @ivar buffers: data buffers for message types
    @ivar decoders: functions which check buffers if a message of given type
    is complete
    """

    def __init__(self):
        """Set up empty defregmenter"""
        self.priorities = []
        self.buffers = {}
        self.decoders = {}

    def addStaticSize(self, msgType, size):
        """Add a message type which all messages are of same length"""
        if msgType in self.priorities:
            raise ValueError("Message type already defined")
        if size < 1:
            raise ValueError("Message size must be positive integer")

        self.priorities += [msgType]

        self.buffers[msgType] = bytearray(0)
        def sizeHandler(data):
            """
            Size of message in parameter

            If complete message is present in parameter returns its size,
            None otherwise.
            """
            if len(data) < size:
                return None
            else:
                return size
        self.decoders[msgType] = sizeHandler

    def addDynamicSize(self, msgType, sizeOffset, sizeOfSize):
        """Add a message type which has a dynamic size set in a header"""
        if msgType in self.priorities:
            raise ValueError("Message type already defined")
        if sizeOfSize < 1:
            raise ValueError("Size of size must be positive integer")
        if sizeOffset < 0:
            raise ValueError("Offset can't be negative")

        self.priorities += [msgType]
        self.buffers[msgType] = bytearray(0)

        def sizeHandler(data):
            """
            Size of message in parameter

            If complete message is present in parameter returns its size,
            None otherwise.
            """
            if len(data) < sizeOffset+sizeOfSize:
                return None
            else:
                parser = Parser(data)
                # skip the header
                parser.getFixBytes(sizeOffset)

                payloadLength = parser.get(sizeOfSize)
                if parser.getRemainingLength() < payloadLength:
                    # not enough bytes in buffer
                    return None
                return sizeOffset + sizeOfSize + payloadLength

        self.decoders[msgType] = sizeHandler

    def addData(self, msgType, data):
        """Adds data to buffers"""
        if msgType not in self.priorities:
            raise ValueError("Message type not defined")

        self.buffers[msgType] += data

    def getMessage(self):
        """Extract the highest priority complete message from buffer"""
        for msgType in self.priorities:
            length = self.decoders[msgType](self.buffers[msgType])
            if length is None:
                continue

            # extract message
            data = self.buffers[msgType][:length]
            # remove it from buffer
            self.buffers[msgType] = self.buffers[msgType][length:]
            return (msgType, data)
        return None

    def clearBuffers(self):
        """Remove all data from buffers"""
        for key in self.buffers.keys():
            self.buffers[key] = bytearray(0)