/usr/lib/python2.7/dist-packages/cryptography/hazmat/bindings/openssl/binding.py is in python-cryptography 2.1.4-1ubuntu1.1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | # This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import absolute_import, division, print_function
import collections
import threading
import types
from cryptography import utils
from cryptography.exceptions import InternalError
from cryptography.hazmat.bindings._openssl import ffi, lib
from cryptography.hazmat.bindings.openssl._conditional import CONDITIONAL_NAMES
_OpenSSLErrorWithText = collections.namedtuple(
"_OpenSSLErrorWithText", ["code", "lib", "func", "reason", "reason_text"]
)
class _OpenSSLError(object):
def __init__(self, code, lib, func, reason):
self._code = code
self._lib = lib
self._func = func
self._reason = reason
def _lib_reason_match(self, lib, reason):
return lib == self.lib and reason == self.reason
code = utils.read_only_property("_code")
lib = utils.read_only_property("_lib")
func = utils.read_only_property("_func")
reason = utils.read_only_property("_reason")
def _consume_errors(lib):
errors = []
while True:
code = lib.ERR_get_error()
if code == 0:
break
err_lib = lib.ERR_GET_LIB(code)
err_func = lib.ERR_GET_FUNC(code)
err_reason = lib.ERR_GET_REASON(code)
errors.append(_OpenSSLError(code, err_lib, err_func, err_reason))
return errors
def _openssl_assert(lib, ok):
if not ok:
errors = _consume_errors(lib)
errors_with_text = []
for err in errors:
err_text_reason = ffi.string(
lib.ERR_error_string(err.code, ffi.NULL)
)
errors_with_text.append(
_OpenSSLErrorWithText(
err.code, err.lib, err.func, err.reason, err_text_reason
)
)
raise InternalError(
"Unknown OpenSSL error. This error is commonly encountered when "
"another library is not cleaning up the OpenSSL error stack. If "
"you are using cryptography with another library that uses "
"OpenSSL try disabling it before reporting a bug. Otherwise "
"please file an issue at https://github.com/pyca/cryptography/"
"issues with information on how to reproduce "
"this. ({0!r})".format(errors_with_text),
errors_with_text
)
def build_conditional_library(lib, conditional_names):
conditional_lib = types.ModuleType("lib")
conditional_lib._original_lib = lib
excluded_names = set()
for condition, names_cb in conditional_names.items():
if not getattr(lib, condition):
excluded_names.update(names_cb())
for attr in dir(lib):
if attr not in excluded_names:
setattr(conditional_lib, attr, getattr(lib, attr))
return conditional_lib
class Binding(object):
"""
OpenSSL API wrapper.
"""
lib = None
ffi = ffi
_lib_loaded = False
_init_lock = threading.Lock()
_lock_init_lock = threading.Lock()
def __init__(self):
self._ensure_ffi_initialized()
@classmethod
def _register_osrandom_engine(cls):
# Clear any errors extant in the queue before we start. In many
# scenarios other things may be interacting with OpenSSL in the same
# process space and it has proven untenable to assume that they will
# reliably clear the error queue. Once we clear it here we will
# error on any subsequent unexpected item in the stack.
cls.lib.ERR_clear_error()
cls._osrandom_engine_id = cls.lib.Cryptography_osrandom_engine_id
cls._osrandom_engine_name = cls.lib.Cryptography_osrandom_engine_name
result = cls.lib.Cryptography_add_osrandom_engine()
_openssl_assert(cls.lib, result in (1, 2))
@classmethod
def _ensure_ffi_initialized(cls):
with cls._init_lock:
if not cls._lib_loaded:
cls.lib = build_conditional_library(lib, CONDITIONAL_NAMES)
cls._lib_loaded = True
# initialize the SSL library
cls.lib.SSL_library_init()
# adds all ciphers/digests for EVP
cls.lib.OpenSSL_add_all_algorithms()
# loads error strings for libcrypto and libssl functions
cls.lib.SSL_load_error_strings()
cls._register_osrandom_engine()
@classmethod
def init_static_locks(cls):
with cls._lock_init_lock:
cls._ensure_ffi_initialized()
# Use Python's implementation if available, importing _ssl triggers
# the setup for this.
__import__("_ssl")
if cls.lib.CRYPTO_get_locking_callback() != cls.ffi.NULL:
return
# If nothing else has setup a locking callback already, we set up
# our own
res = lib._setup_ssl_threads()
_openssl_assert(cls.lib, res == 1)
# OpenSSL is not thread safe until the locks are initialized. We call this
# method in module scope so that it executes with the import lock. On
# Pythons < 3.4 this import lock is a global lock, which can prevent a race
# condition registering the OpenSSL locks. On Python 3.4+ the import lock
# is per module so this approach will not work.
Binding.init_static_locks()
|