/usr/share/pyshared/txaws/client/ssl.py is in python-txaws 0.2.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 | from glob import glob
import os
import re
import sys
from OpenSSL import SSL
from OpenSSL.crypto import load_certificate, FILETYPE_PEM
from twisted.internet.ssl import CertificateOptions
from txaws import exception
__all__ = ["VerifyingContextFactory", "get_ca_certs"]
# Multiple defaults are supported; just add more paths, separated by colons.
if sys.platform == "darwin":
DEFAULT_CERTS_PATH = "/System/Library/OpenSSL/certs/:"
# XXX Windows users can file a bug to add theirs, since we don't know what
# the right path is
else:
DEFAULT_CERTS_PATH = "/etc/ssl/certs/:"
class VerifyingContextFactory(CertificateOptions):
"""
A SSL context factory to pass to C{connectSSL} to check for hostname
validity.
"""
def __init__(self, host, caCerts=None):
if caCerts is None:
caCerts = get_global_ca_certs()
CertificateOptions.__init__(self, verify=True, caCerts=caCerts)
self.host = host
def _dnsname_match(self, dn, host):
pats = []
for frag in dn.split(r"."):
if frag == "*":
pats.append("[^.]+")
else:
frag = re.escape(frag)
pats.append(frag.replace(r"\*", "[^.]*"))
rx = re.compile(r"\A" + r"\.".join(pats) + r"\Z", re.IGNORECASE)
return bool(rx.match(host))
def verify_callback(self, connection, x509, errno, depth, preverifyOK):
# Only check depth == 0 on chained certificates.
if depth == 0:
dns_found = False
if getattr(x509, "get_extension", None) is not None:
for index in range(x509.get_extension_count()):
extension = x509.get_extension(index)
if extension.get_short_name() != "subjectAltName":
continue
data = str(extension)
for element in data.split(", "):
key, value = element.split(":")
if key != "DNS":
continue
if self._dnsname_match(value, self.host):
return preverifyOK
dns_found = True
break
if not dns_found:
commonName = x509.get_subject().commonName
if commonName is None:
return False
if not self._dnsname_match(commonName, self.host):
return False
else:
return False
return preverifyOK
def _makeContext(self):
context = CertificateOptions._makeContext(self)
context.set_verify(
SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
self.verify_callback)
return context
def get_ca_certs():
"""
Retrieve a list of CAs at either the DEFAULT_CERTS_PATH or the env
override, TXAWS_CERTS_PATH.
In order to find .pem files, this function checks first for presence of the
TXAWS_CERTS_PATH environment variable that should point to a directory
containing cert files. In the absense of this variable, the module-level
DEFAULT_CERTS_PATH will be used instead.
Note that both of these variables have have multiple paths in them, just
like the familiar PATH environment variable (separated by colons).
"""
cert_paths = os.getenv("TXAWS_CERTS_PATH", DEFAULT_CERTS_PATH).split(":")
certificate_authority_map = {}
for path in cert_paths:
for cert_file_name in glob(os.path.join(path, "*.pem")):
# There might be some dead symlinks in there, so let's make sure
# it's real.
if not os.path.exists(cert_file_name):
continue
cert_file = open(cert_file_name)
data = cert_file.read()
cert_file.close()
x509 = load_certificate(FILETYPE_PEM, data)
digest = x509.digest("sha1")
# Now, de-duplicate in case the same cert has multiple names.
certificate_authority_map[digest] = x509
values = certificate_authority_map.values()
if len(values) == 0:
raise exception.CertsNotFoundError("Could not find any .pem files.")
return values
_ca_certs = None
def get_global_ca_certs():
"""Retrieve a singleton of CA certificates."""
global _ca_certs
if _ca_certs is None:
_ca_certs = get_ca_certs()
return _ca_certs
|