/usr/share/pyshared/M2Crypto/SSL/Connection.py is in python-m2crypto 0.21.1-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 | """SSL Connection aka socket
Copyright (c) 1999-2004 Ng Pheng Siong. All rights reserved.
Portions created by Open Source Applications Foundation (OSAF) are
Copyright (C) 2004-2007 OSAF. All Rights Reserved.
Copyright 2008 Heikki Toivonen. All rights reserved.
"""
__all__ = ['Connection',
'timeout', # XXX Not really, but for documentation purposes
]
# Python
import socket
# M2Crypto
from Cipher import Cipher, Cipher_Stack
from Session import Session
from M2Crypto import BIO, X509, m2
import timeout
import Checker
#SSLError = getattr(__import__('M2Crypto.SSL', globals(), locals(), 'SSLError'), 'SSLError')
from M2Crypto.SSL import SSLError
def _serverPostConnectionCheck(*args, **kw):
return 1
class Connection:
"""An SSL connection."""
clientPostConnectionCheck = Checker.Checker()
serverPostConnectionCheck = _serverPostConnectionCheck
m2_bio_free = m2.bio_free
m2_ssl_free = m2.ssl_free
def __init__(self, ctx, sock=None):
self.ctx = ctx
self.ssl = m2.ssl_new(self.ctx.ctx)
if sock is not None:
self.socket = sock
else:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._fileno = self.socket.fileno()
self.blocking = self.socket.gettimeout()
self.ssl_close_flag = m2.bio_noclose
def __del__(self):
if getattr(self, 'sslbio', None):
self.m2_bio_free(self.sslbio)
if getattr(self, 'sockbio', None):
self.m2_bio_free(self.sockbio)
if self.ssl_close_flag == m2.bio_noclose and getattr(self, 'ssl', None):
self.m2_ssl_free(self.ssl)
self.socket.close()
def close(self):
m2.ssl_shutdown(self.ssl)
def clear(self):
"""
If there were errors in this connection, call clear() rather
than close() to end it, so that bad sessions will be cleared
from cache.
"""
return m2.ssl_clear(self.ssl)
def set_shutdown(self, mode):
m2.ssl_set_shutdown1(self.ssl, mode)
def get_shutdown(self):
return m2.ssl_get_shutdown(self.ssl)
def bind(self, addr):
self.socket.bind(addr)
def listen(self, qlen=5):
self.socket.listen(qlen)
def ssl_get_error(self, ret):
return m2.ssl_get_error(self.ssl, ret)
def set_bio(self, readbio, writebio):
"""
Explicitly set read and write bios
"""
m2.ssl_set_bio(self.ssl, readbio._ptr(), writebio._ptr())
def set_client_CA_list_from_file(self, cafile):
"""
Set the acceptable client CA list. If the client
returns a certificate, it must have been issued by
one of the CAs listed in cafile.
Makes sense only for servers.
@param cafile: Filename from which to load the CA list.
"""
m2.ssl_set_client_CA_list_from_file(self.ssl, cafile)
def set_client_CA_list_from_context(self):
"""
Set the acceptable client CA list. If the client
returns a certificate, it must have been issued by
one of the CAs listed in context.
Makes sense only for servers.
"""
m2.ssl_set_client_CA_list_from_context(self.ssl, self.ctx.ctx)
def setup_addr(self, addr):
self.addr = addr
def set_ssl_close_flag(self, flag):
"""
By default, SSL struct will be freed in __del__. Call with
m2.bio_close to override this default.
"""
if flag not in (m2.bio_close, m2.bio_noclose):
raise ValueError("flag must be m2.bio_close or m2.bio_noclose")
self.ssl_close_flag = flag
def setup_ssl(self):
# Make a BIO_s_socket.
self.sockbio = m2.bio_new_socket(self.socket.fileno(), 0)
# Link SSL struct with the BIO_socket.
m2.ssl_set_bio(self.ssl, self.sockbio, self.sockbio)
# Make a BIO_f_ssl.
self.sslbio = m2.bio_new(m2.bio_f_ssl())
# Link BIO_f_ssl with the SSL struct.
m2.bio_set_ssl(self.sslbio, self.ssl, m2.bio_noclose)
def _setup_ssl(self, addr):
"""Deprecated"""
self.setup_addr(addr)
self.setup_ssl()
def set_accept_state(self):
m2.ssl_set_accept_state(self.ssl)
def accept_ssl(self):
return m2.ssl_accept(self.ssl)
def accept(self):
"""Accept an SSL connection. The return value is a pair (ssl, addr) where
ssl is a new SSL connection object and addr is the address bound to
the other end of the SSL connection."""
sock, addr = self.socket.accept()
ssl = Connection(self.ctx, sock)
ssl.addr = addr
ssl.setup_ssl()
ssl.set_accept_state()
ssl.accept_ssl()
check = getattr(self, 'postConnectionCheck', self.serverPostConnectionCheck)
if check is not None:
if not check(ssl.get_peer_cert(), ssl.addr[0]):
raise Checker.SSLVerificationError, 'post connection check failed'
return ssl, addr
def set_connect_state(self):
m2.ssl_set_connect_state(self.ssl)
def connect_ssl(self):
return m2.ssl_connect(self.ssl)
def connect(self, addr):
self.socket.connect(addr)
self.addr = addr
self.setup_ssl()
self.set_connect_state()
ret = self.connect_ssl()
check = getattr(self, 'postConnectionCheck', self.clientPostConnectionCheck)
if check is not None:
if not check(self.get_peer_cert(), self.addr[0]):
raise Checker.SSLVerificationError, 'post connection check failed'
return ret
def shutdown(self, how):
m2.ssl_set_shutdown(self.ssl, how)
def renegotiate(self):
"""Renegotiate this connection's SSL parameters."""
return m2.ssl_renegotiate(self.ssl)
def pending(self):
"""Return the numbers of octets that can be read from the
connection."""
return m2.ssl_pending(self.ssl)
def _write_bio(self, data):
return m2.ssl_write(self.ssl, data)
def _write_nbio(self, data):
return m2.ssl_write_nbio(self.ssl, data)
def _read_bio(self, size=1024):
if size <= 0:
raise ValueError, 'size <= 0'
return m2.ssl_read(self.ssl, size)
def _read_nbio(self, size=1024):
if size <= 0:
raise ValueError, 'size <= 0'
return m2.ssl_read_nbio(self.ssl, size)
def write(self, data):
if self.blocking:
return self._write_bio(data)
return self._write_nbio(data)
sendall = send = write
def read(self, size=1024):
if self.blocking:
return self._read_bio(size)
return self._read_nbio(size)
recv = read
def setblocking(self, mode):
"""Set this connection's underlying socket to _mode_."""
self.socket.setblocking(mode)
self.blocking = mode
def fileno(self):
return self.socket.fileno()
def getsockopt(self, *args):
return apply(self.socket.getsockopt, args)
def setsockopt(self, *args):
return apply(self.socket.setsockopt, args)
def get_context(self):
"""Return the SSL.Context object associated with this
connection."""
return m2.ssl_get_ssl_ctx(self.ssl)
def get_state(self):
"""Return the SSL state of this connection."""
return m2.ssl_get_state(self.ssl)
def verify_ok(self):
return (m2.ssl_get_verify_result(self.ssl) == m2.X509_V_OK)
def get_verify_mode(self):
"""Return the peer certificate verification mode."""
return m2.ssl_get_verify_mode(self.ssl)
def get_verify_depth(self):
"""Return the peer certificate verification depth."""
return m2.ssl_get_verify_depth(self.ssl)
def get_verify_result(self):
"""Return the peer certificate verification result."""
return m2.ssl_get_verify_result(self.ssl)
def get_peer_cert(self):
"""Return the peer certificate; if the peer did not provide
a certificate, return None."""
c=m2.ssl_get_peer_cert(self.ssl)
if c is None:
return None
# Need to free the pointer coz OpenSSL doesn't.
return X509.X509(c, 1)
def get_peer_cert_chain(self):
"""Return the peer certificate chain; if the peer did not provide
a certificate chain, return None.
@warning: The returned chain will be valid only for as long as the
connection object is alive. Once the connection object gets freed,
the chain will be freed as well.
"""
c=m2.ssl_get_peer_cert_chain(self.ssl)
if c is None:
return None
# No need to free the pointer coz OpenSSL does.
return X509.X509_Stack(c)
def get_cipher(self):
"""Return an M2Crypto.SSL.Cipher object for this connection; if the
connection has not been initialised with a cipher suite, return None."""
c=m2.ssl_get_current_cipher(self.ssl)
if c is None:
return None
return Cipher(c)
def get_ciphers(self):
"""Return an M2Crypto.SSL.Cipher_Stack object for this connection; if the
connection has not been initialised with cipher suites, return None."""
c=m2.ssl_get_ciphers(self.ssl)
if c is None:
return None
return Cipher_Stack(c)
def get_cipher_list(self, idx=0):
"""Return the cipher suites for this connection as a string object."""
return m2.ssl_get_cipher_list(self.ssl, idx)
def set_cipher_list(self, cipher_list):
"""Set the cipher suites for this connection."""
return m2.ssl_set_cipher_list(self.ssl, cipher_list)
def makefile(self, mode='rb', bufsize='ignored'):
r = 'r' in mode or '+' in mode
w = 'w' in mode or 'a' in mode or '+' in mode
b = 'b' in mode
m2mode = ['', 'r'][r] + ['', 'w'][w] + ['', 'b'][b]
# XXX Need to dup().
bio = BIO.BIO(self.sslbio, _close_cb=self.close)
m2.bio_do_handshake(bio._ptr())
return BIO.IOBuffer(bio, m2mode, _pyfree=0)
def getsockname(self):
return self.socket.getsockname()
def getpeername(self):
return self.socket.getpeername()
def set_session_id_ctx(self, id):
ret = m2.ssl_set_session_id_context(self.ssl, id)
if not ret:
raise SSLError(m2.err_reason_error_string(m2.err_get_error()))
def get_session(self):
sess = m2.ssl_get_session(self.ssl)
return Session(sess)
def set_session(self, session):
m2.ssl_set_session(self.ssl, session._ptr())
def get_default_session_timeout(self):
return m2.ssl_get_default_session_timeout(self.ssl)
def get_socket_read_timeout(self):
return timeout.struct_to_timeout(self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, timeout.struct_size()))
def get_socket_write_timeout(self):
return timeout.struct_to_timeout(self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_SNDTIMEO, timeout.struct_size()))
def set_socket_read_timeout(self, timeo):
assert isinstance(timeo, timeout.timeout)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, timeo.pack())
def set_socket_write_timeout(self, timeo):
assert isinstance(timeo, timeout.timeout)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDTIMEO, timeo.pack())
def get_version(self):
"Return the TLS/SSL protocol version for this connection."
return m2.ssl_get_version(self.ssl)
def set_post_connection_check_callback(self, postConnectionCheck):
self.postConnectionCheck = postConnectionCheck
|