This file is indexed.

/usr/lib/python3/dist-packages/pytds/smp.py is in python3-tds 1.8.2-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
import struct
import logging
import threading
from six.moves import range
try:
    from bitarray import bitarray
except ImportError:
    class bitarray(list):
        def __init__(self, len):
            self[:] = [False] * len

        def setall(self, val):
            for i in range(len(self)):
                self[i] = val


from .tds import Error, readall, skipall

logger = logging.getLogger(__name__)


class _SmpSession(object):
    def __init__(self, mgr, session_id):
        self._session_id = session_id
        self._seq_num_for_send = 0
        self._high_water_for_send = 4
        self._seq_num_for_recv = 0
        self._high_water_for_recv = 4
        self._last_high_water_for_recv = 4
        self._mgr = mgr
        self._recv_queue = []
        self._send_queue = []
        self._state = 'new'
        self._curr_buf_pos = 0
        self._curr_buf = b''

    def __repr__(self):
        fmt = "<_SmpSession sid={} state={} recv_queue={} send_queue={} seq_num_for_send={}>"
        return fmt.format(self._session_id, self._state, self._recv_queue, self._send_queue,
                          self._seq_num_for_send)

    def close(self):
        self._mgr._close_smp_session(self)

    def send(self, data, final):
        self._mgr._send_packet(self, data)

    def read(self, size):
        if not self._curr_buf[self._curr_buf_pos:]:
            self._curr_buf = self._mgr._recv_packet(self)
            self._curr_buf_pos = 0
            if not self._curr_buf:
                return b''
        res = self._curr_buf[self._curr_buf_pos:self._curr_buf_pos + size]
        self._curr_buf_pos += len(res)
        return res

    def is_connected(self):
        return self._state == 'SESSION ESTABLISHED'


class SmpManager(object):
    _smid = 0x53
    _smp_header = struct.Struct('<BBHLLL')
    _SYN = 0x1
    _ACK = 0x2
    _FIN = 0x4
    _DATA = 0x8

    def __init__(self, transport):
        self._transport = transport
        self._sessions = {}
        self._used_ids_ba = bitarray(2 ** 16)
        self._used_ids_ba.setall(False)
        self._lock = threading.RLock()

    def __repr__(self):
        return "<SmpManager sessions={}>".format(self._sessions)

    def create_session(self):
        try:
            session_id = self._used_ids_ba.index(False)
        except ValueError:
            raise Error("Can't create more MARS sessions, close some sessions and try again")
        session = _SmpSession(self, session_id)
        with self._lock:
            self._sessions[session_id] = session
            self._used_ids_ba[session_id] = True
            hdr = self._smp_header.pack(
                self._smid,
                self._SYN,
                session_id,
                self._smp_header.size,
                0,
                session._high_water_for_recv,
                )
            self._transport.send(hdr, True)
            session._state = 'SESSION ESTABLISHED'
        return session

    def _close_smp_session(self, session):
        if session._state in ('CLOSED', 'FIN SENT'):
            return
        elif session._state == 'SESSION ESTABLISHED':
            with self._lock:
                if self._transport.is_connected():
                    hdr = self._smp_header.pack(
                        self._smid,
                        self._FIN,
                        session._session_id,
                        self._smp_header.size,
                        session._seq_num_for_send,
                        session._high_water_for_recv,
                        )
                    session._state = 'FIN SENT'
                    self._transport.send(hdr, True)
                    self._recv_packet(session)
                else:
                    session._state = 'CLOSED'

    def _send_queued_packets(self, session):
        with self._lock:
            while session._send_queue and session._seq_num_for_send < session._high_water_for_send:
                data = session._send_queue.pop(0)
                self._send_packet(session, data)

    @staticmethod
    def _add_one_wrap(val):
        return 0 if val == 2 ** 32 - 1 else val + 1

    def _send_packet(self, session, data):
        with self._lock:
            if session._seq_num_for_send < session._high_water_for_send:
                l = self._smp_header.size + len(data)
                seq_num = self._add_one_wrap(session._seq_num_for_send)
                hdr = self._smp_header.pack(
                    self._smid,
                    self._DATA,
                    session._session_id,
                    l,
                    seq_num,
                    session._high_water_for_recv,
                    )
                session._last_high_water_for_recv = session._high_water_for_recv
                self._transport.send(hdr + data, True)
                session._seq_num_for_send = self._add_one_wrap(session._seq_num_for_send)
            else:
                session._send_queue.append(data)
                self._read_smp_message()

    def _recv_packet(self, session):
        with self._lock:
            if session._state == 'CLOSED':
                return b''
            while not session._recv_queue:
                self._read_smp_message()
                if session._state in ('CLOSED', 'FIN RECEIVED'):
                    return b''
            session._high_water_for_recv = self._add_one_wrap(session._high_water_for_recv)
            if session._high_water_for_recv - session._last_high_water_for_recv >= 2:
                hdr = self._smp_header.pack(
                    self._smid,
                    self._ACK,
                    session._session_id,
                    self._smp_header.size,
                    session._seq_num_for_send,
                    session._high_water_for_recv,
                    )
                self._transport.send(hdr, True)
                session._last_high_water_for_recv = session._high_water_for_recv
            return session._recv_queue.pop(0)

    @classmethod
    def _type_to_str(cls, t):
        if t == cls._SYN:
            return 'SYN'
        elif t == cls._ACK:
            return 'ACK'
        elif t == cls._DATA:
            return 'DATA'
        elif t == cls._FIN:
            return 'FIN'

    def _bad_stm(self, message):
        self.close()
        raise Error(message)

    def _read_smp_message(self):
        with self._lock:
            smid, flags, sid, l, seq_num, wnd = self._smp_header.unpack(readall(self._transport, self._smp_header.size))
            if smid != self._smid:
                self._bad_stm('Invalid SMP packet signature')
            #logger.debug('received smp packet t:%s sid:%s len:%s num:%s wnd:%s', self._type_to_str(flags), sid, l, seq_num, wnd)
            try:
                session = self._sessions[sid]
            except KeyError:
                self._bad_stm('Invalid SMP packet session id')
            if wnd < session._high_water_for_send:
                self._bad_stm('Invalid WNDW in packet from server')
            if seq_num > session._high_water_for_recv:
                self._bad_stm('Invalid SEQNUM in packet from server')
            session._last_recv_seq_num = seq_num
            if flags == self._ACK:
                if session._state in ('FIN RECEIVED', 'CLOSED'):
                    self._bad_stm('Unexpected SMP ACK packet from server')
                if seq_num != session._seq_num_for_recv:
                    self._bad_stm('Invalid SEQNUM in ACK packet from server')
                session._high_water_for_send = wnd
                self._send_queued_packets(session)
            elif flags == self._DATA:
                if session._state == 'SESSION ESTABLISHED':
                    if seq_num != self._add_one_wrap(session._seq_num_for_recv):
                        self._bad_stm('Invalid SEQNUM in ACK packet from server')
                    session._seq_num_for_recv = seq_num
                    data = readall(self._transport, l - self._smp_header.size)
                    session._recv_queue.append(data)
                    if wnd > session._high_water_for_send:
                        session._high_water_for_send = wnd
                        self._send_queued_packets(session)

                elif session._state == 'FIN SENT':
                    skipall(self._transport, l - self._smp_header.size)
                else:
                    self._bad_stm('Unexpected DATA packet from server')
            elif flags == self._FIN:
                if session._state == 'SESSION ESTABLISHED':
                    session._state = 'FIN RECEIVED'
                elif session._state == 'FIN SENT':
                    session._state = 'CLOSED'
                    del self._sessions[session._session_id]
                    self._used_ids_ba[session._session_id] = False
                elif session._state == 'FIN RECEIVED':
                    self._bad_stm('Unexpected SMP FIN packet from server')
                else:
                    self._bad_stm('Invalid state: ' + session._state)
            elif flags == self._SYN:
                self._bad_stm('Unexpected SMP SYN packet from server')
            else:
                self._bad_stm('Unexpected SMP flags in packet from server')

    def close(self):
        self._transport.close()

    def _transport_closed(self):
        for session in self._sessions.values():
            session._state = 'CLOSED'