This file is indexed.

/usr/share/pyshared/M2Crypto/SSL/Connection.py is in python-m2crypto 0.21.1-3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
"""SSL Connection aka socket

Copyright (c) 1999-2004 Ng Pheng Siong. All rights reserved.

Portions created by Open Source Applications Foundation (OSAF) are
Copyright (C) 2004-2007 OSAF. All Rights Reserved.

Copyright 2008 Heikki Toivonen. All rights reserved.
"""

__all__ = ['Connection',
           'timeout', # XXX Not really, but for documentation purposes
           ]

# Python
import socket

# M2Crypto
from Cipher import Cipher, Cipher_Stack
from Session import Session
from M2Crypto import BIO, X509, m2
import timeout
import Checker

#SSLError = getattr(__import__('M2Crypto.SSL', globals(), locals(), 'SSLError'), 'SSLError')
from M2Crypto.SSL import SSLError

def _serverPostConnectionCheck(*args, **kw):
    return 1

class Connection:

    """An SSL connection."""

    clientPostConnectionCheck = Checker.Checker()
    serverPostConnectionCheck = _serverPostConnectionCheck

    m2_bio_free = m2.bio_free
    m2_ssl_free = m2.ssl_free
    
    def __init__(self, ctx, sock=None):
        self.ctx = ctx
        self.ssl = m2.ssl_new(self.ctx.ctx)
        if sock is not None:    
            self.socket = sock
        else:
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self._fileno = self.socket.fileno()
        
        self.blocking = self.socket.gettimeout()
        
        self.ssl_close_flag = m2.bio_noclose

        
    def __del__(self):
        if getattr(self, 'sslbio', None):
            self.m2_bio_free(self.sslbio)
        if getattr(self, 'sockbio', None):
            self.m2_bio_free(self.sockbio)
        if self.ssl_close_flag == m2.bio_noclose and getattr(self, 'ssl', None):
            self.m2_ssl_free(self.ssl)
        self.socket.close()

    def close(self):
        m2.ssl_shutdown(self.ssl)

    def clear(self):
        """
        If there were errors in this connection, call clear() rather
        than close() to end it, so that bad sessions will be cleared
        from cache.
        """
        return m2.ssl_clear(self.ssl)

    def set_shutdown(self, mode):
        m2.ssl_set_shutdown1(self.ssl, mode)

    def get_shutdown(self):
        return m2.ssl_get_shutdown(self.ssl)

    def bind(self, addr):
        self.socket.bind(addr)

    def listen(self, qlen=5):
        self.socket.listen(qlen)    

    def ssl_get_error(self, ret):
        return m2.ssl_get_error(self.ssl, ret)

    def set_bio(self, readbio, writebio):
        """
        Explicitly set read and write bios
        """
        m2.ssl_set_bio(self.ssl, readbio._ptr(), writebio._ptr())
        
    def set_client_CA_list_from_file(self, cafile):
        """
        Set the acceptable client CA list. If the client
        returns a certificate, it must have been issued by
        one of the CAs listed in cafile.
        
        Makes sense only for servers.
        
        @param cafile: Filename from which to load the CA list.
        """
        m2.ssl_set_client_CA_list_from_file(self.ssl, cafile)

    def set_client_CA_list_from_context(self):
        """
        Set the acceptable client CA list. If the client
        returns a certificate, it must have been issued by
        one of the CAs listed in context.
        
        Makes sense only for servers.
        """
        m2.ssl_set_client_CA_list_from_context(self.ssl, self.ctx.ctx)

    def setup_addr(self, addr):
        self.addr = addr

    def set_ssl_close_flag(self, flag):
        """
        By default, SSL struct will be freed in __del__. Call with
        m2.bio_close to override this default.
        """
        if flag not in (m2.bio_close, m2.bio_noclose):
            raise ValueError("flag must be m2.bio_close or m2.bio_noclose")
        self.ssl_close_flag = flag

    def setup_ssl(self):
        # Make a BIO_s_socket.
        self.sockbio = m2.bio_new_socket(self.socket.fileno(), 0)
        # Link SSL struct with the BIO_socket.
        m2.ssl_set_bio(self.ssl, self.sockbio, self.sockbio)
        # Make a BIO_f_ssl.
        self.sslbio = m2.bio_new(m2.bio_f_ssl())
        # Link BIO_f_ssl with the SSL struct.
        m2.bio_set_ssl(self.sslbio, self.ssl, m2.bio_noclose)

    def _setup_ssl(self, addr):
        """Deprecated"""
        self.setup_addr(addr)
        self.setup_ssl()

    def set_accept_state(self):
        m2.ssl_set_accept_state(self.ssl)

    def accept_ssl(self):
        return m2.ssl_accept(self.ssl)

    def accept(self):
        """Accept an SSL connection. The return value is a pair (ssl, addr) where
        ssl is a new SSL connection object and addr is the address bound to
        the other end of the SSL connection."""
        sock, addr = self.socket.accept()
        ssl = Connection(self.ctx, sock)
        ssl.addr = addr
        ssl.setup_ssl()
        ssl.set_accept_state()
        ssl.accept_ssl()
        check = getattr(self, 'postConnectionCheck', self.serverPostConnectionCheck)
        if check is not None:
            if not check(ssl.get_peer_cert(), ssl.addr[0]):
                raise Checker.SSLVerificationError, 'post connection check failed'
        return ssl, addr

    def set_connect_state(self):
        m2.ssl_set_connect_state(self.ssl)

    def connect_ssl(self):
        return m2.ssl_connect(self.ssl)

    def connect(self, addr):
        self.socket.connect(addr)
        self.addr = addr
        self.setup_ssl()
        self.set_connect_state()
        ret = self.connect_ssl()
        check = getattr(self, 'postConnectionCheck', self.clientPostConnectionCheck)
        if check is not None:
            if not check(self.get_peer_cert(), self.addr[0]):
                raise Checker.SSLVerificationError, 'post connection check failed'
        return ret

    def shutdown(self, how):
        m2.ssl_set_shutdown(self.ssl, how)

    def renegotiate(self):
        """Renegotiate this connection's SSL parameters."""
        return m2.ssl_renegotiate(self.ssl)

    def pending(self):
        """Return the numbers of octets that can be read from the 
        connection."""
        return m2.ssl_pending(self.ssl)

    def _write_bio(self, data):
        return m2.ssl_write(self.ssl, data)

    def _write_nbio(self, data):
        return m2.ssl_write_nbio(self.ssl, data)

    def _read_bio(self, size=1024):
        if size <= 0:
            raise ValueError, 'size <= 0'
        return m2.ssl_read(self.ssl, size)

    def _read_nbio(self, size=1024):
        if size <= 0:
            raise ValueError, 'size <= 0'
        return m2.ssl_read_nbio(self.ssl, size)

    def write(self, data):
        if self.blocking:
            return self._write_bio(data)
        return self._write_nbio(data)
    sendall = send = write
    
    def read(self, size=1024):
        if self.blocking:
            return self._read_bio(size)
        return self._read_nbio(size)
    recv = read

    def setblocking(self, mode):
        """Set this connection's underlying socket to _mode_."""
        self.socket.setblocking(mode)
        self.blocking = mode

    def fileno(self):
        return self.socket.fileno()

    def getsockopt(self, *args):
        return apply(self.socket.getsockopt, args)

    def setsockopt(self, *args):
        return apply(self.socket.setsockopt, args)

    def get_context(self):
        """Return the SSL.Context object associated with this 
        connection."""
        return m2.ssl_get_ssl_ctx(self.ssl)

    def get_state(self):
        """Return the SSL state of this connection."""
        return m2.ssl_get_state(self.ssl)

    def verify_ok(self):
        return (m2.ssl_get_verify_result(self.ssl) == m2.X509_V_OK)

    def get_verify_mode(self):
        """Return the peer certificate verification mode."""
        return m2.ssl_get_verify_mode(self.ssl)

    def get_verify_depth(self):
        """Return the peer certificate verification depth."""
        return m2.ssl_get_verify_depth(self.ssl)

    def get_verify_result(self):
        """Return the peer certificate verification result."""
        return m2.ssl_get_verify_result(self.ssl)

    def get_peer_cert(self):
        """Return the peer certificate; if the peer did not provide 
        a certificate, return None."""
        c=m2.ssl_get_peer_cert(self.ssl)
        if c is None:
            return None
        # Need to free the pointer coz OpenSSL doesn't.
        return X509.X509(c, 1)
    
    def get_peer_cert_chain(self):
        """Return the peer certificate chain; if the peer did not provide 
        a certificate chain, return None.
        
        @warning: The returned chain will be valid only for as long as the
        connection object is alive. Once the connection object gets freed,
        the chain will be freed as well.
        """
        c=m2.ssl_get_peer_cert_chain(self.ssl)
        if c is None:
            return None
        # No need to free the pointer coz OpenSSL does.
        return X509.X509_Stack(c)
    
    def get_cipher(self):
        """Return an M2Crypto.SSL.Cipher object for this connection; if the 
        connection has not been initialised with a cipher suite, return None."""
        c=m2.ssl_get_current_cipher(self.ssl)
        if c is None:
            return None
        return Cipher(c)
    
    def get_ciphers(self):
        """Return an M2Crypto.SSL.Cipher_Stack object for this connection; if the
        connection has not been initialised with cipher suites, return None."""
        c=m2.ssl_get_ciphers(self.ssl)
        if c is None:
            return None
        return Cipher_Stack(c)

    def get_cipher_list(self, idx=0):
        """Return the cipher suites for this connection as a string object."""
        return m2.ssl_get_cipher_list(self.ssl, idx)

    def set_cipher_list(self, cipher_list):
        """Set the cipher suites for this connection."""
        return m2.ssl_set_cipher_list(self.ssl, cipher_list)

    def makefile(self, mode='rb', bufsize='ignored'):
        r = 'r' in mode or '+' in mode
        w = 'w' in mode or 'a' in mode or '+' in mode
        b = 'b' in mode
        m2mode = ['', 'r'][r] + ['', 'w'][w] + ['', 'b'][b]      
        # XXX Need to dup().
        bio = BIO.BIO(self.sslbio, _close_cb=self.close)
        m2.bio_do_handshake(bio._ptr())
        return BIO.IOBuffer(bio, m2mode, _pyfree=0)

    def getsockname(self):
        return self.socket.getsockname()

    def getpeername(self):
        return self.socket.getpeername()

    def set_session_id_ctx(self, id):
        ret = m2.ssl_set_session_id_context(self.ssl, id)
        if not ret:
            raise SSLError(m2.err_reason_error_string(m2.err_get_error()))

    def get_session(self):
        sess = m2.ssl_get_session(self.ssl)
        return Session(sess)

    def set_session(self, session):
        m2.ssl_set_session(self.ssl, session._ptr())

    def get_default_session_timeout(self):
        return m2.ssl_get_default_session_timeout(self.ssl)

    def get_socket_read_timeout(self):
        return timeout.struct_to_timeout(self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, timeout.struct_size()))

    def get_socket_write_timeout(self):
        return timeout.struct_to_timeout(self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_SNDTIMEO, timeout.struct_size()))

    def set_socket_read_timeout(self, timeo):
        assert isinstance(timeo, timeout.timeout)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, timeo.pack())

    def set_socket_write_timeout(self, timeo):
        assert isinstance(timeo, timeout.timeout)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDTIMEO, timeo.pack())

    def get_version(self):
        "Return the TLS/SSL protocol version for this connection."
        return m2.ssl_get_version(self.ssl)

    def set_post_connection_check_callback(self, postConnectionCheck):
        self.postConnectionCheck = postConnectionCheck