/usr/lib/python3/dist-packages/systemimage/gpg.py is in system-image-common 2.2-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 | # Copyright (C) 2013-2014 Canonical Ltd.
# Author: Barry Warsaw <barry@ubuntu.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Handle GPG signature verification."""
__all__ = [
'Context',
'SignatureError',
]
import os
import gnupg
import hashlib
import tarfile
from contextlib import ExitStack
from systemimage.config import config
from systemimage.helpers import temporary_directory
class SignatureError(Exception):
"""Exception raised when some signature fails to validate.
Note that this exception isn't raised by Context.verify(); that method
always returns a boolean. This exception is used by other functions to
signal that a .asc file did not match.
"""
def __init__(self, signature_path, data_path,
keyrings=None, blacklist=None):
super().__init__()
self.signature_path = signature_path
self.data_path = data_path
self.keyrings = ([] if keyrings is None else keyrings)
self.blacklist = blacklist
# We have to calculate the checksums now, because it's possible that
# the files will be temporary/atomic files, deleted when a context
# manager exits. I.e. the files aren't guaranteed to exist after this
# constructor runs.
#
# Also, md5 is fine; this is not a security critical context, we just
# want to be able to quickly and easily compare the file on disk
# against the file on the server.
with open(self.signature_path, 'rb') as fp:
self.signature_checksum = hashlib.md5(fp.read()).hexdigest()
with open(self.data_path, 'rb') as fp:
self.data_checksum = hashlib.md5(fp.read()).hexdigest()
self.keyring_checksums = []
for path in self.keyrings:
with open(path, 'rb') as fp:
checksum = hashlib.md5(fp.read()).hexdigest()
self.keyring_checksums.append(checksum)
if self.blacklist is None:
self.blacklist_checksum = None
else:
with open(self.blacklist, 'rb') as fp:
self.blacklist_checksum = hashlib.md5(fp.read()).hexdigest()
def __str__(self):
if self.blacklist is None:
checksum_str = 'no blacklist'
path_str = ''
else:
checksum_str = self.blacklist_checksum
path_str = self.blacklist
return """
sig path : {0.signature_checksum}
{0.signature_path}
data path: {0.data_checksum}
{0.data_path}
keyrings : {0.keyring_checksums}
{1}
blacklist: {2} {3}
""".format(self, list(self.keyrings), checksum_str, path_str)
class Context:
def __init__(self, *keyrings, blacklist=None):
"""Create a GPG signature verification context.
:param keyrings: The list of keyrings to use for validating the
signature on data files.
:type keyrings: Sequence of .tar.xz keyring files, which will be
unpacked to retrieve the actual .gpg keyring file.
:param blacklist: The blacklist keyring, from which fingerprints to
explicitly disallow are retrieved.
:type blacklist: A .tar.xz keyring file, which will be unpacked to
retrieve the actual .gpg keyring file.
"""
self.keyring_paths = keyrings
self.blacklist_path = blacklist
self._ctx = None
self._stack = ExitStack()
self._keyrings = []
# The keyrings must be .tar.xz files, which need to be unpacked and
# the keyring.gpg files inside them cached, using their actual name
# (based on the .tar.xz file name). If we don't already have a cache
# of the .gpg file, do the unpackaging and use the contained .gpg file
# as the keyring. Note that this class does *not* validate the
# .tar.xz files. That must be done elsewhere.
for path in keyrings:
base, dot, tarxz = os.path.basename(path).partition('.')
assert dot == '.' and tarxz == 'tar.xz', (
'Expected a .tar.xz path, got: {}'.format(path))
keyring_path = os.path.join(config.tempdir, base + '.gpg')
if not os.path.exists(keyring_path):
with tarfile.open(path, 'r:xz') as tf:
tf.extract('keyring.gpg', config.tempdir)
os.rename(
os.path.join(config.tempdir, 'keyring.gpg'),
os.path.join(config.tempdir, keyring_path))
self._keyrings.append(keyring_path)
# Since python-gnupg doesn't do this for us, verify that all the
# keyrings and blacklist files exist. Yes, this introduces a race
# condition, but I don't see any good way to eliminate this given
# python-gnupg's behavior.
for path in self._keyrings:
if not os.path.exists(path):
raise FileNotFoundError(path)
if blacklist is not None:
if not os.path.exists(blacklist):
raise FileNotFoundError(blacklist)
# Extract all the blacklisted fingerprints.
with Context(blacklist) as ctx:
self._blacklisted_fingerprints = ctx.fingerprints
else:
self._blacklisted_fingerprints = set()
def __enter__(self):
try:
# Use a temporary directory for the $GNUPGHOME, but be sure to
# arrange for the tempdir to be deleted no matter what.
home = self._stack.enter_context(
temporary_directory(prefix='si-gnupghome',
dir=config.tempdir))
self._ctx = gnupg.GPG(gnupghome=home, keyring=self._keyrings)
self._stack.callback(setattr, self, '_ctx', None)
except:
# Restore all context and re-raise the exception.
self._stack.close()
raise
else:
return self
def __exit__(self, *exc_details):
self._stack.close()
# Don't swallow exceptions.
return False
@property
def keys(self):
return self._ctx.list_keys()
@property
def fingerprints(self):
return set(info['fingerprint'] for info in self._ctx.list_keys())
@property
def key_ids(self):
return set(info['keyid'] for info in self._ctx.list_keys())
def verify(self, signature_path, data_path):
"""Verify a GPG signature.
This verifies that the data file signature is valid, given the
keyrings and blacklist specified in the constructor. Specifically, we
use GPG to extract the fingerprint in the signature path, and compare
it against the fingerprints in the keyrings, subtracting any
fingerprints in the blacklist.
:param signature_path: The file system path to the detached signature
file for the data file.
:type signature_path: str
:param data_path: The file system path to the data file.
:type data_path: str
:return: bool
"""
with open(signature_path, 'rb') as sig_fp:
verified = self._ctx.verify_file(sig_fp, data_path)
# If the file is properly signed, we'll be able to get back a set of
# fingerprints that signed the file. From here we do a set operation
# to see if the fingerprints are in the list of keys from all the
# loaded-up keyrings. If so, the signature succeeds.
return verified.fingerprint in (self.fingerprints -
self._blacklisted_fingerprints)
def validate(self, signature_path, data_path):
"""Like .verify() but raises a SignatureError when invalid.
:param signature_path: The file system path to the detached signature
file for the data file.
:type signature_path: str
:param data_path: The file system path to the data file.
:type data_path: str
:return: None
:raises SignatureError: when the signature cannot be verified. Note
that the exception will contain extra information, namely the
keyrings involved in the verification, as well as the blacklist
file if there is one.
"""
if not self.verify(signature_path, data_path):
raise SignatureError(signature_path, data_path,
self.keyring_paths, self.blacklist_path)
|