/usr/lib/python3/dist-packages/pgpy/pgp.py is in python3-pgpy 0.4.3-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 | """ pgp.py
this is where the armorable PGP block objects live
"""
import binascii
import calendar
import collections
import contextlib
import copy
import functools
import itertools
import operator
import os
import re
import warnings
import weakref
import six
from datetime import datetime
from cryptography.hazmat.primitives import hashes
from .constants import CompressionAlgorithm
from .constants import Features
from .constants import HashAlgorithm
from .constants import ImageEncoding
from .constants import KeyFlags
from .constants import NotationDataFlags
from .constants import PacketTag
from .constants import PubKeyAlgorithm
from .constants import RevocationKeyClass
from .constants import RevocationReason
from .constants import SignatureType
from .constants import SymmetricKeyAlgorithm
from .decorators import KeyAction
from .errors import PGPDecryptionError
from .errors import PGPError
from .packet import Key
from .packet import MDC
from .packet import Packet
from .packet import Primary
from .packet import Private
from .packet import PubKeyV4
from .packet import PubSubKeyV4
from .packet import PrivKeyV4
from .packet import PrivSubKeyV4
from .packet import Public
from .packet import Sub
from .packet import UserID
from .packet import UserAttribute
from .packet.packets import CompressedData
from .packet.packets import IntegrityProtectedSKEData
from .packet.packets import IntegrityProtectedSKEDataV1
from .packet.packets import LiteralData
from .packet.packets import OnePassSignature
from .packet.packets import OnePassSignatureV3
from .packet.packets import PKESessionKey
from .packet.packets import PKESessionKeyV3
from .packet.packets import Signature
from .packet.packets import SignatureV4
from .packet.packets import SKEData
from .packet.packets import Marker
from .packet.packets import SKESessionKey
from .packet.packets import SKESessionKeyV4
from .packet.types import Opaque
from .types import Armorable
from .types import Fingerprint
from .types import ParentRef
from .types import PGPObject
from .types import SignatureVerification
from .types import SorteDeque
__all__ = ['PGPSignature',
'PGPUID',
'PGPMessage',
'PGPKey',
'PGPKeyring']
class PGPSignature(Armorable, ParentRef, PGPObject):
@property
def __sig__(self):
return self._signature.signature.__sig__()
@property
def cipherprefs(self):
"""
A ``list`` of preferred symmetric algorithms specified in this signature, if any. Otherwise, an empty ``list``.
"""
if 'PreferredSymmetricAlgorithms' in self._signature.subpackets:
return next(iter(self._signature.subpackets['h_PreferredSymmetricAlgorithms'])).flags
return []
@property
def compprefs(self):
"""
A ``list`` of preferred compression algorithms specified in this signature, if any. Otherwise, an empty ``list``.
"""
if 'PreferredCompressionAlgorithms' in self._signature.subpackets:
return next(iter(self._signature.subpackets['h_PreferredCompressionAlgorithms'])).flags
return []
@property
def created(self):
"""
A :py:obj:`~datetime.datetime` of when this signature was created.
"""
return self._signature.subpackets['h_CreationTime'][-1].created
@property
def embedded(self):
return self.parent is not None
@property
def expires_at(self):
"""
A :py:obj:`~datetime.datetime` of when this signature expires, if a signature expiration date is specified.
Otherwise, ``None``
"""
if 'SignatureExpirationTime' in self._signature.subpackets:
expd = next(iter(self._signature.subpackets['SignatureExpirationTime'])).expires
return self.created + expd
return None
@property
def exportable(self):
"""
``False`` if this signature is marked as being not exportable. Otherwise, ``True``.
"""
if 'ExportableCertification' in self._signature.subpackets:
return bool(next(iter(self._signature.subpackets['ExportableCertification'])))
return True
@property
def features(self):
"""
A ``set`` of implementation features specified in this signature, if any. Otherwise, an empty ``set``.
"""
if 'Features' in self._signature.subpackets:
return next(iter(self._signature.subpackets['Features'])).flags
return set()
@property
def hash2(self):
return self._signature.hash2
@property
def hashprefs(self):
"""
A ``list`` of preferred hash algorithms specified in this signature, if any. Otherwise, an empty ``list``.
"""
if 'PreferredHashAlgorithms' in self._signature.subpackets:
return next(iter(self._signature.subpackets['h_PreferredHashAlgorithms'])).flags
return []
@property
def hash_algorithm(self):
"""
The :py:obj:`~constants.HashAlgorithm` used when computing this signature.
"""
return self._signature.halg
@property
def is_expired(self):
"""
``True`` if the signature has an expiration date, and is expired. Otherwise, ``False``
"""
expires_at = self.expires_at
if expires_at is not None and expires_at != self.created:
return expires_at < datetime.utcnow()
return False
@property
def key_algorithm(self):
"""
The :py:obj:`~constants.PubKeyAlgorithm` of the key that generated this signature.
"""
return self._signature.pubalg
@property
def key_expiration(self):
if 'KeyExpirationTime' in self._signature.subpackets:
return next(iter(self._signature.subpackets['KeyExpirationTime'])).expires
return None
@property
def key_flags(self):
"""
A ``set`` of :py:obj:`~constants.KeyFlags` specified in this signature, if any. Otherwise, an empty ``set``.
"""
if 'KeyFlags' in self._signature.subpackets:
return next(iter(self._signature.subpackets['h_KeyFlags'])).flags
return set()
@property
def keyserver(self):
"""
The preferred key server specified in this signature, if any. Otherwise, an empty ``str``.
"""
if 'PreferredKeyServer' in self._signature.subpackets:
return next(iter(self._signature.subpackets['h_PreferredKeyServer'])).uri
return ''
@property
def keyserverprefs(self):
"""
A ``list`` of :py:obj:`~constants.KeyServerPreferences` in this signature, if any. Otherwise, an empty ``list``.
"""
if 'KeyServerPreferences' in self._signature.subpackets:
return next(iter(self._signature.subpackets['h_KeyServerPreferences'])).flags
return []
@property
def magic(self):
return "SIGNATURE"
@property
def notation(self):
"""
A ``dict`` of notation data in this signature, if any. Otherwise, an empty ``dict``.
"""
return dict((nd.name, nd.value) for nd in self._signature.subpackets['NotationData'])
@property
def policy_uri(self):
"""
The policy URI specified in this signature, if any. Otherwise, an empty ``str``.
"""
if 'Policy' in self._signature.subpackets:
return next(iter(self._signature.subpackets['Policy'])).uri
return ''
@property
def revocable(self):
"""
``False`` if this signature is marked as being not revocable. Otherwise, ``True``.
"""
if 'Revocable' in self._signature.subpackets:
return bool(next(iter(self._signature.subpackets['Revocable'])))
return True
@property
def revocation_key(self):
if 'RevocationKey' in self._signature.subpackets:
raise NotImplementedError()
return None
@property
def signer(self):
"""
The 16-character Key ID of the key that generated this signature.
"""
return self._signature.signer
@property
def target_signature(self):
return NotImplemented
@property
def type(self):
"""
The :py:obj:`~constants.SignatureType` of this signature.
"""
return self._signature.sigtype
@classmethod
def new(cls, sigtype, pkalg, halg, signer):
sig = PGPSignature()
sigpkt = SignatureV4()
sigpkt.header.tag = 2
sigpkt.header.version = 4
sigpkt.subpackets.addnew('CreationTime', hashed=True, created=datetime.utcnow())
sigpkt.subpackets.addnew('Issuer', _issuer=signer)
sigpkt.sigtype = sigtype
sigpkt.pubalg = pkalg
if halg is not None:
sigpkt.halg = halg
sig._signature = sigpkt
return sig
def __init__(self):
"""
PGPSignature objects represent OpenPGP compliant signatures.
PGPSignature implements the ``__str__`` method, the output of which will be the signature object in
OpenPGP-compliant ASCII-armored format.
PGPSignature implements the ``__bytes__`` method, the output of which will be the signature object in
OpenPGP-compliant binary format.
"""
super(PGPSignature, self).__init__()
self._signature = None
def __bytearray__(self):
return self._signature.__bytearray__()
def __repr__(self):
return "<PGPSignature [{:s}] object at 0x{:02x}>".format(self.type.name, id(self))
def __lt__(self, other):
return self.created < other.created
def __or__(self, other):
if isinstance(other, Signature):
if self._signature is None:
self._signature = other
return self
##TODO: this is not a great way to do this
if other.__class__.__name__ == 'EmbeddedSignature':
self._signature = other
return self
raise TypeError
def __copy__(self):
# because the default shallow copy isn't actually all that useful,
# and deepcopy does too much work
sig = super(PGPSignature, self).__copy__()
# sig = PGPSignature()
# sig.ascii_headers = self.ascii_headers.copy()
sig |= copy.copy(self._signature)
return sig
def hashdata(self, subject):
_data = bytearray()
if isinstance(subject, six.string_types):
subject = subject.encode('charmap')
"""
All signatures are formed by producing a hash over the signature
data, and then using the resulting hash in the signature algorithm.
"""
if self.type == SignatureType.BinaryDocument:
"""
For binary document signatures (type 0x00), the document data is
hashed directly.
"""
if isinstance(subject, (SKEData, IntegrityProtectedSKEData)):
_data += subject.__bytearray__()
else:
_data += bytearray(subject)
if self.type == SignatureType.CanonicalDocument:
"""
For text document signatures (type 0x01), the
document is canonicalized by converting line endings to <CR><LF>,
and the resulting data is hashed.
"""
_data += re.subn(br'\r?\n', b'\r\n', subject)[0]
if self.type in {SignatureType.Generic_Cert, SignatureType.Persona_Cert, SignatureType.Casual_Cert,
SignatureType.Positive_Cert, SignatureType.CertRevocation, SignatureType.Subkey_Binding,
SignatureType.PrimaryKey_Binding}:
"""
When a signature is made over a key, the hash data starts with the
octet 0x99, followed by a two-octet length of the key, and then body
of the key packet. (Note that this is an old-style packet header for
a key packet with two-octet length.) ...
Key revocation signatures (types 0x20 and 0x28)
hash only the key being revoked.
"""
_s = b''
if isinstance(subject, PGPUID):
_s = subject._parent.hashdata
elif isinstance(subject, PGPKey) and not subject.is_primary:
_s = subject._parent.hashdata
elif isinstance(subject, PGPKey) and subject.is_primary:
_s = subject.hashdata
if len(_s) > 0:
_data += b'\x99' + self.int_to_bytes(len(_s), 2) + _s
if self.type in {SignatureType.Subkey_Binding, SignatureType.PrimaryKey_Binding}:
"""
A subkey binding signature
(type 0x18) or primary key binding signature (type 0x19) then hashes
the subkey using the same format as the main key (also using 0x99 as
the first octet).
"""
if subject.is_primary:
_s = subject.subkeys[self.signer].hashdata
else:
_s = subject.hashdata
_data += b'\x99' + self.int_to_bytes(len(_s), 2) + _s
if self.type in {SignatureType.KeyRevocation, SignatureType.SubkeyRevocation, SignatureType.DirectlyOnKey}:
"""
The signature is calculated directly on the key being revoked. A
revoked key is not to be used. Only revocation signatures by the
key being revoked, or by an authorized revocation key, should be
considered valid revocation signatures.
Subkey revocation signature
The signature is calculated directly on the subkey being revoked.
A revoked subkey is not to be used. Only revocation signatures
by the top-level signature key that is bound to this subkey, or
by an authorized revocation key, should be considered valid
revocation signatures.
- clarification from draft-ietf-openpgp-rfc4880bis-02:
Primary key revocation signatures (type 0x20) hash
only the key being revoked. Subkey revocation signature (type 0x28)
hash first the primary key and then the subkey being revoked
Signature directly on a key
This signature is calculated directly on a key. It binds the
information in the Signature subpackets to the key, and is
appropriate to be used for subpackets that provide information
about the key, such as the Revocation Key subpacket. It is also
appropriate for statements that non-self certifiers want to make
about the key itself, rather than the binding between a key and a
name.
"""
if self.type == SignatureType.SubkeyRevocation:
# hash the primary key first if this is a Subkey Revocation signature
_s = subject.parent.hashdata
_data += b'\x99' + self.int_to_bytes(len(_s), 2) + _s
_s = subject.hashdata
_data += b'\x99' + self.int_to_bytes(len(_s), 2) + _s
if self.type in {SignatureType.Generic_Cert, SignatureType.Persona_Cert, SignatureType.Casual_Cert,
SignatureType.Positive_Cert, SignatureType.CertRevocation}:
"""
A certification signature (type 0x10 through 0x13) hashes the User
ID being bound to the key into the hash context after the above
data. ... A V4 certification
hashes the constant 0xB4 for User ID certifications or the constant
0xD1 for User Attribute certifications, followed by a four-octet
number giving the length of the User ID or User Attribute data, and
then the User ID or User Attribute data.
...
The [certificate revocation] signature
is computed over the same data as the certificate that it
revokes, and should have a later creation date than that
certificate.
"""
_s = subject.hashdata
if subject.is_uid:
_data += b'\xb4'
else:
_data += b'\xd1'
_data += self.int_to_bytes(len(_s), 4) + _s
# if this is a new signature, do update_hlen
if 0 in list(self._signature.signature):
self._signature.update_hlen()
"""
Once the data body is hashed, then a trailer is hashed. (...)
A V4 signature hashes the packet body
starting from its first field, the version number, through the end
of the hashed subpacket data. Thus, the fields hashed are the
signature version, the signature type, the public-key algorithm, the
hash algorithm, the hashed subpacket length, and the hashed
subpacket body.
V4 signatures also hash in a final trailer of six octets: the
version of the Signature packet, i.e., 0x04; 0xFF; and a four-octet,
big-endian number that is the length of the hashed data from the
Signature packet (note that this number does not include these final
six octets).
"""
hcontext = bytearray()
hcontext.append(self._signature.header.version if not self.embedded else self._signature._sig.header.version)
hcontext.append(self.type)
hcontext.append(self.key_algorithm)
hcontext.append(self.hash_algorithm)
hcontext += self._signature.subpackets.__hashbytearray__()
hlen = len(hcontext)
_data += hcontext
_data += b'\x04\xff'
_data += self.int_to_bytes(hlen, 4)
return bytes(_data)
def make_onepass(self):
onepass = OnePassSignatureV3()
onepass.sigtype = self.type
onepass.halg = self.hash_algorithm
onepass.pubalg = self.key_algorithm
onepass.signer = self.signer
onepass.update_hlen()
return onepass
def parse(self, packet):
unarmored = self.ascii_unarmor(packet)
data = unarmored['body']
if unarmored['magic'] is not None and unarmored['magic'] != 'SIGNATURE':
raise ValueError('Expected: SIGNATURE. Got: {}'.format(str(unarmored['magic'])))
if unarmored['headers'] is not None:
self.ascii_headers = unarmored['headers']
# load *one* packet from data
pkt = Packet(data)
if pkt.header.tag == PacketTag.Signature and not isinstance(pkt, Opaque):
self._signature = pkt
else:
raise ValueError('Expected: Signature. Got: {:s}'.format(pkt.__class__.__name__))
class PGPUID(ParentRef):
@property
def __sig__(self):
return list(self._signatures)
@property
def name(self):
"""If this is a User ID, the stored name. If this is not a User ID, this will be an empty string."""
return self._uid.name if isinstance(self._uid, UserID) else ""
@property
def comment(self):
"""
If this is a User ID, this will be the stored comment. If this is not a User ID, or there is no stored comment,
this will be an empty string.,
"""
return self._uid.comment if isinstance(self._uid, UserID) else ""
@property
def email(self):
"""
If this is a User ID, this will be the stored email address. If this is not a User ID, or there is no stored
email address, this will be an empty string.
"""
return self._uid.email if isinstance(self._uid, UserID) else ""
@property
def image(self):
"""
If this is a User Attribute, this will be the stored image. If this is not a User Attribute, this will be ``None``.
"""
return self._uid.image.image if isinstance(self._uid, UserAttribute) else None
@property
def is_primary(self):
"""
If the most recent, valid self-signature specifies this as being primary, this will be True. Otherwise, Faqlse.
"""
return bool(next(iter(self.selfsig._signature.subpackets['h_PrimaryUserID']), False))
@property
def is_uid(self):
"""
``True`` if this is a User ID, otherwise False.
"""
return isinstance(self._uid, UserID)
@property
def is_ua(self):
"""
``True`` if this is a User Attribute, otherwise False.
"""
return isinstance(self._uid, UserAttribute)
@property
def selfsig(self):
"""
This will be the most recent, self-signature of this User ID or Attribute. If there isn't one, this will be ``None``.
"""
if self.parent is not None:
return next((sig for sig in reversed(self._signatures) if sig.signer == self.parent.fingerprint.keyid), None)
@property
def signers(self):
"""
This will be a set of all of the key ids which have signed this User ID or Attribute.
"""
return set(s.signer for s in self.__sig__)
@property
def hashdata(self):
if self.is_uid:
return self._uid.__bytearray__()[len(self._uid.header):]
if self.is_ua:
return self._uid.subpackets.__bytearray__()
@classmethod
def new(cls, pn, comment="", email=""):
"""
Create a new User ID or photo.
:param pn: User ID name, or photo. If this is a ``bytearray``, it will be loaded as a photo.
Otherwise, it will be used as the name field for a User ID.
:type pn: ``bytearray``, ``str``, ``unicode``
:param comment: The comment field for a User ID. Ignored if this is a photo.
:type comment: ``str``, ``unicode``
:param email: The email address field for a User ID. Ignored if this is a photo.
:type email: ``str``, ``unicode``
:returns: :py:obj:`PGPUID`
"""
uid = PGPUID()
if isinstance(pn, bytearray):
uid._uid = UserAttribute()
uid._uid.image.image = pn
uid._uid.image.iencoding = ImageEncoding.encodingof(pn)
uid._uid.update_hlen()
else:
uid._uid = UserID()
uid._uid.name = pn
uid._uid.comment = comment
uid._uid.email = email
uid._uid.update_hlen()
return uid
def __init__(self):
"""
PGPUID objects represent User IDs and User Attributes for keys.
PGPUID implements the ``__format__`` method for User IDs, returning a string in the format
'name (comment) <email>', leaving out any comment or email fields that are not present.
"""
super(PGPUID, self).__init__()
self._uid = None
self._signatures = SorteDeque()
def __repr__(self):
if self.selfsig is not None:
return "<PGPUID [{:s}][{}] at 0x{:02X}>".format(self._uid.__class__.__name__, self.selfsig.created, id(self))
return "<PGPUID [{:s}] at 0x{:02X}>".format(self._uid.__class__.__name__, id(self))
def __lt__(self, other): # pragma: no cover
if self.is_uid == other.is_uid:
if self.is_primary == other.is_primary:
return self.selfsig > other.selfsig
if self.is_primary:
return True
return False
if self.is_uid and other.is_ua:
return True
if self.is_ua and other.is_uid:
return False
def __or__(self, other):
if isinstance(other, PGPSignature):
self._signatures.insort(other)
if self.parent is not None and self in self.parent._uids:
self.parent._uids.resort(self)
return self
if isinstance(other, UserID) and self._uid is None:
self._uid = other
return self
if isinstance(other, UserAttribute) and self._uid is None:
self._uid = other
return self
raise TypeError("unsupported operand type(s) for |: '{:s}' and '{:s}'"
"".format(self.__class__.__name__, other.__class__.__name__))
def __copy__(self):
# because the default shallow copy isn't actually all that useful,
# and deepcopy does too much work
uid = PGPUID()
uid |= copy.copy(self._uid)
for sig in self._signatures:
uid |= copy.copy(sig)
return uid
def __format__(self, format_spec):
if self.is_uid:
comment = six.u("") if self.comment == "" else six.u(" ({:s})").format(self.comment)
email = six.u("") if self.email == "" else six.u(" <{:s}>").format(self.email)
return six.u("{:s}{:s}{:s}").format(self.name, comment, email)
raise NotImplementedError
class PGPMessage(Armorable, PGPObject):
@staticmethod
def dash_unescape(text):
return re.subn(r'^- -', '-', text, flags=re.MULTILINE)[0]
@staticmethod
def dash_escape(text):
return re.subn(r'^-', '- -', text, flags=re.MULTILINE)[0]
@property
def encrypters(self):
"""A ``set`` containing all key ids (if any) to which this message was encrypted."""
return set(m.encrypter for m in self._sessionkeys if isinstance(m, PKESessionKey))
@property
def filename(self):
"""If applicable, returns the original filename of the message. Otherwise, returns an empty string."""
if self.type == 'literal':
return self._message.filename
return ''
@property
def is_compressed(self):
"""``True`` if this message will be compressed when exported"""
return self._compression != CompressionAlgorithm.Uncompressed
@property
def is_encrypted(self):
"""``True`` if this message is encrypted; otherwise, ``False``"""
return isinstance(self._message, (SKEData, IntegrityProtectedSKEData))
@property
def is_sensitive(self):
"""``True`` if this message is marked sensitive; otherwise ``False``"""
return self.type == 'literal' and self._message.filename == '_CONSOLE'
@property
def is_signed(self):
"""
``True`` if this message is signed; otherwise, ``False``.
Should always be ``False`` if the message is encrypted.
"""
return len(self._signatures) > 0
@property
def issuers(self):
"""A ``set`` containing all key ids (if any) which have signed or encrypted this message."""
return self.encrypters | self.signers
@property
def magic(self):
if self.type == 'cleartext':
return "SIGNATURE"
return "MESSAGE"
@property
def message(self):
"""The message contents"""
if self.type == 'cleartext':
return self.bytes_to_text(self._message)
if self.type == 'literal':
return self._message.contents
if self.type == 'encrypted':
return self._message
@property
def signatures(self):
"""A ``set`` containing all key ids (if any) which have signed this message."""
return list(self._signatures)
@property
def signers(self):
"""A ``set`` containing all key ids (if any) which have signed this message."""
return set(m.signer for m in self._signatures)
@property
def type(self):
##TODO: it might be better to use an Enum for the output of this
if isinstance(self._message, (six.string_types, six.binary_type, bytearray)):
return 'cleartext'
if isinstance(self._message, LiteralData):
return 'literal'
if isinstance(self._message, (SKEData, IntegrityProtectedSKEData)):
return 'encrypted'
raise NotImplementedError
def __init__(self):
"""
PGPMessage objects represent OpenPGP message compositions.
PGPMessage implements the `__str__` method, the output of which will be the message composition in
OpenPGP-compliant ASCII-armored format.
PGPMessage implements the `__bytes__` method, the output of which will be the message composition in
OpenPGP-compliant binary format.
Any signatures within the PGPMessage that are marked as being non-exportable will not be included in the output
of either of those methods.
"""
super(PGPMessage, self).__init__()
self._compression = CompressionAlgorithm.Uncompressed
self._message = None
self._mdc = None
self._signatures = SorteDeque()
self._sessionkeys = []
def __bytearray__(self):
if self.is_compressed:
comp = CompressedData()
comp.calg = self._compression
comp.packets = [pkt for pkt in self]
comp.update_hlen()
return comp.__bytearray__()
_bytes = bytearray()
for pkt in self:
_bytes += pkt.__bytearray__()
return _bytes
def __str__(self):
if self.type == 'cleartext':
tmpl = u"-----BEGIN PGP SIGNED MESSAGE-----\n" \
u"{hhdr:s}\n" \
u"{cleartext:s}\n" \
u"{signature:s}"
# only add a Hash: header if we actually have at least one signature
hashes = set(s.hash_algorithm.name for s in self.signatures)
hhdr = 'Hash: {hashes:s}\n'.format(hashes=','.join(sorted(hashes))) if hashes else ''
return tmpl.format(hhdr=hhdr,
cleartext=self.dash_escape(self.bytes_to_text(self._message)),
signature=super(PGPMessage, self).__str__())
return super(PGPMessage, self).__str__()
def __iter__(self):
if self.type == 'cleartext':
for sig in self._signatures:
yield sig
elif self.is_encrypted:
for sig in self._signatures:
yield sig
for pkt in self._sessionkeys:
yield pkt
yield self.message
else:
##TODO: is it worth coming up with a way of disabling one-pass signing?
for sig in self._signatures:
ops = sig.make_onepass()
if sig is not self._signatures[-1]:
ops.nested = True
yield ops
yield self._message
if self._mdc is not None: # pragma: no cover
yield self._mdc
for sig in self._signatures:
yield sig
def __or__(self, other):
if isinstance(other, Marker):
return self
if isinstance(other, CompressedData):
self._compression = other.calg
for pkt in other.packets:
self |= pkt
return self
if isinstance(other, (six.string_types, six.binary_type, bytearray)):
if self._message is None:
self._message = self.text_to_bytes(other)
return self
if isinstance(other, (LiteralData, SKEData, IntegrityProtectedSKEData)):
if self._message is None:
self._message = other
return self
if isinstance(other, MDC):
if self._mdc is None:
self._mdc = other
return self
if isinstance(other, OnePassSignature):
# these are "generated" on the fly during composition
return self
if isinstance(other, Signature):
other = PGPSignature() | other
if isinstance(other, PGPSignature):
self._signatures.insort(other)
return self
if isinstance(other, (PKESessionKey, SKESessionKey)):
self._sessionkeys.append(other)
return self
if isinstance(other, PGPMessage):
self._message = other._message
self._mdc = other._mdc
self._compression = other._compression
self._sessionkeys += other._sessionkeys
self._signatures += other._signatures
return self
raise NotImplementedError(str(type(other)))
def __copy__(self):
msg = super(PGPMessage, self).__copy__()
msg._compression = self._compression
msg._message = copy.copy(self._message)
msg._mdc = copy.copy(self._mdc)
for sig in self._signatures:
msg |= copy.copy(sig)
for sk in self._sessionkeys:
msg |= copy.copy(sk)
return msg
@classmethod
def new(cls, message, **kwargs):
"""
Create a new PGPMessage object.
:param message: The message to be stored.
:type message: ``str``, ``unicode``, ``bytes``, ``bytearray``
:returns: :py:obj:`PGPMessage`
The following optional keyword arguments can be used with :py:meth:`PGPMessage.new`:
:keyword file: if True, ``message`` should be a path to a file. The contents of that file will be read and used
as the contents of the message.
:type file: ``bool``
:keyword cleartext: if True, the message will be cleartext with inline signatures.
:type cleartext: ``bool``
:keyword sensitive: if True, the filename will be set to '_CONSOLE' to signal other OpenPGP clients to treat
this message as being 'for your eyes only'. Ignored if cleartext is True.
:type sensitive: ``bool``
:keyword format: Set the message format identifier. Ignored if cleartext is True.
:type format: ``str``
:keyword compression: Set the compression algorithm for the new message.
Defaults to :py:obj:`CompressionAlgorithm.ZIP`. Ignored if cleartext is True.
:keyword encoding: Set the Charset header for the message.
:type encoding: ``str`` representing a valid codec in codecs
"""
# TODO: have 'codecs' above (in :type encoding:) link to python documentation page on codecs
cleartext = kwargs.pop('cleartext', False)
format = kwargs.pop('format', None)
sensitive = kwargs.pop('sensitive', False)
compression = kwargs.pop('compression', CompressionAlgorithm.ZIP)
file = kwargs.pop('file', False)
charset = kwargs.pop('encoding', None)
filename = ''
mtime = datetime.utcnow()
msg = PGPMessage()
if charset:
msg.charset = charset
# if format in 'tu' and isinstance(message, (six.binary_type, bytearray)):
# # if message format is text or unicode and we got binary data, we'll need to transcode it to UTF-8
# message =
if file and os.path.isfile(message):
filename = message
message = bytearray(os.path.getsize(filename))
mtime = datetime.utcfromtimestamp(os.path.getmtime(filename))
with open(filename, 'rb') as mf:
mf.readinto(message)
# if format is None, we can try to detect it
if format is None:
if isinstance(message, six.text_type):
# message is definitely UTF-8 already
format = 'u'
elif cls.is_ascii(message):
# message is probably text
format = 't'
else:
# message is probably binary
format = 'b'
# if message is a binary type and we're building a textual message, we need to transcode the bytes to UTF-8
if isinstance(message, (six.binary_type, bytearray)) and (cleartext or format in 'tu'):
message = message.decode(charset or 'utf-8')
if cleartext:
msg |= message
else:
# load literal data
lit = LiteralData()
lit._contents = bytearray(msg.text_to_bytes(message))
lit.filename = '_CONSOLE' if sensitive else os.path.basename(filename)
lit.mtime = mtime
lit.format = format
# if cls.is_ascii(message):
# lit.format = 't'
lit.update_hlen()
msg |= lit
msg._compression = compression
return msg
def encrypt(self, passphrase, sessionkey=None, **prefs):
"""
Encrypt the contents of this message using a passphrase.
:param passphrase: The passphrase to use for encrypting this message.
:type passphrase: ``str``, ``unicode``, ``bytes``
:optional param sessionkey: Provide a session key to use when encrypting something. Default is ``None``.
If ``None``, a session key of the appropriate length will be generated randomly.
.. warning::
Care should be taken when making use of this option! Session keys *absolutely need*
to be unpredictable! Use the ``gen_key()`` method on the desired
:py:obj:`~constants.SymmetricKeyAlgorithm` to generate the session key!
:type sessionkey: ``bytes``, ``str``
:raises: :py:exc:`~errors.PGPEncryptionError`
:returns: A new :py:obj:`PGPMessage` containing the encrypted contents of this message.
"""
cipher_algo = prefs.pop('cipher', SymmetricKeyAlgorithm.AES256)
hash_algo = prefs.pop('hash', HashAlgorithm.SHA256)
# set up a new SKESessionKeyV4
skesk = SKESessionKeyV4()
skesk.s2k.usage = 255
skesk.s2k.specifier = 3
skesk.s2k.halg = hash_algo
skesk.s2k.encalg = cipher_algo
skesk.s2k.count = skesk.s2k.halg.tuned_count
if sessionkey is None:
sessionkey = cipher_algo.gen_key()
skesk.encrypt_sk(passphrase, sessionkey)
del passphrase
msg = PGPMessage() | skesk
if not self.is_encrypted:
skedata = IntegrityProtectedSKEDataV1()
skedata.encrypt(sessionkey, cipher_algo, self.__bytes__())
msg |= skedata
else:
msg |= self
return msg
def decrypt(self, passphrase):
"""
Attempt to decrypt this message using a passphrase.
:param passphrase: The passphrase to use to attempt to decrypt this message.
:type passphrase: ``str``, ``unicode``, ``bytes``
:raises: :py:exc:`~errors.PGPDecryptionError` if decryption failed for any reason.
:returns: A new :py:obj:`PGPMessage` containing the decrypted contents of this message
"""
if not self.is_encrypted:
raise PGPError("This message is not encrypted!")
for skesk in iter(sk for sk in self._sessionkeys if isinstance(sk, SKESessionKey)):
try:
symalg, key = skesk.decrypt_sk(passphrase)
decmsg = PGPMessage()
decmsg.parse(self.message.decrypt(key, symalg))
except (TypeError, ValueError, NotImplementedError, PGPDecryptionError):
continue
else:
del passphrase
break
else:
raise PGPDecryptionError("Decryption failed")
return decmsg
def parse(self, packet):
unarmored = self.ascii_unarmor(packet)
data = unarmored['body']
if unarmored['magic'] is not None and unarmored['magic'] not in ['MESSAGE', 'SIGNATURE']:
raise ValueError('Expected: MESSAGE. Got: {}'.format(str(unarmored['magic'])))
if unarmored['headers'] is not None:
self.ascii_headers = unarmored['headers']
# cleartext signature
if unarmored['magic'] == 'SIGNATURE':
# the composition for this will be the 'cleartext' as a str,
# followed by one or more signatures (each one loaded into a PGPSignature)
self |= self.dash_unescape(unarmored['cleartext'])
while len(data) > 0:
pkt = Packet(data)
if not isinstance(pkt, Signature): # pragma: no cover
warnings.warn("Discarded unexpected packet: {:s}".format(pkt.__class__.__name__), stacklevel=2)
continue
self |= PGPSignature() | pkt
else:
while len(data) > 0:
self |= Packet(data)
class PGPKey(Armorable, ParentRef, PGPObject):
"""
11.1. Transferable Public Keys
OpenPGP users may transfer public keys. The essential elements of a
transferable public key are as follows:
- One Public-Key packet
- Zero or more revocation signatures
- One or more User ID packets
- After each User ID packet, zero or more Signature packets
(certifications)
- Zero or more User Attribute packets
- After each User Attribute packet, zero or more Signature packets
(certifications)
- Zero or more Subkey packets
- After each Subkey packet, one Signature packet, plus optionally a
revocation
The Public-Key packet occurs first. Each of the following User ID
packets provides the identity of the owner of this public key. If
there are multiple User ID packets, this corresponds to multiple
means of identifying the same unique individual user; for example, a
user may have more than one email address, and construct a User ID
for each one.
Immediately following each User ID packet, there are zero or more
Signature packets. Each Signature packet is calculated on the
immediately preceding User ID packet and the initial Public-Key
packet. The signature serves to certify the corresponding public key
and User ID. In effect, the signer is testifying to his or her
belief that this public key belongs to the user identified by this
User ID.
Within the same section as the User ID packets, there are zero or
more User Attribute packets. Like the User ID packets, a User
Attribute packet is followed by zero or more Signature packets
calculated on the immediately preceding User Attribute packet and the
initial Public-Key packet.
User Attribute packets and User ID packets may be freely intermixed
in this section, so long as the signatures that follow them are
maintained on the proper User Attribute or User ID packet.
After the User ID packet or Attribute packet, there may be zero or
more Subkey packets. In general, subkeys are provided in cases where
the top-level public key is a signature-only key. However, any V4
key may have subkeys, and the subkeys may be encryption-only keys,
signature-only keys, or general-purpose keys. V3 keys MUST NOT have
subkeys.
Each Subkey packet MUST be followed by one Signature packet, which
should be a subkey binding signature issued by the top-level key.
For subkeys that can issue signatures, the subkey binding signature
MUST contain an Embedded Signature subpacket with a primary key
binding signature (0x19) issued by the subkey on the top-level key.
Subkey and Key packets may each be followed by a revocation Signature
packet to indicate that the key is revoked. Revocation signatures
are only accepted if they are issued by the key itself, or by a key
that is authorized to issue revocations via a Revocation Key
subpacket in a self-signature by the top-level key.
Transferable public-key packet sequences may be concatenated to allow
transferring multiple public keys in one operation.
11.2. Transferable Secret Keys
OpenPGP users may transfer secret keys. The format of a transferable
secret key is the same as a transferable public key except that
secret-key and secret-subkey packets are used instead of the public
key and public-subkey packets. Implementations SHOULD include self-
signatures on any user IDs and subkeys, as this allows for a complete
public key to be automatically extracted from the transferable secret
key. Implementations MAY choose to omit the self-signatures,
especially if a transferable public key accompanies the transferable
secret key.
"""
@property
def __key__(self):
return self._key.keymaterial
@property
def __sig__(self):
return list(self._signatures)
@property
def created(self):
"""A :py:obj:`~datetime.datetime` object of the creation date and time of the key, in UTC."""
return self._key.created
@property
def expires_at(self):
"""A :py:obj:`~datetime.datetime` object of when this key is to be considered expired, if any. Otherwise, ``None``"""
try:
expires = min(sig.key_expiration for sig in itertools.chain(iter(uid.selfsig for uid in self.userids), self.self_signatures)
if sig.key_expiration is not None)
except ValueError:
return None
else:
return (self.created + expires)
@property
def fingerprint(self):
"""The fingerprint of this key, as a :py:obj:`~pgpy.types.Fingerprint` object."""
if self._key:
return self._key.fingerprint
@property
def hashdata(self):
# when signing a key, only the public portion of the keys is hashed
# if this is a private key, the private components of the key material need to be left out
pub = self._key if self.is_public else self._key.pubkey()
return pub.__bytearray__()[len(pub.header):]
@property
def is_expired(self):
"""``True`` if this key is expired, otherwise ``False``"""
expires = self.expires_at
if expires is not None:
return expires <= datetime.utcnow()
return False
@property
def is_primary(self):
"""``True`` if this is a primary key; ``False`` if this is a subkey"""
return isinstance(self._key, Primary) and not isinstance(self._key, Sub)
@property
def is_protected(self):
"""``True`` if this is a private key that is protected with a passphrase, otherwise ``False``"""
if self.is_public:
return False
return self._key.protected
@property
def is_public(self):
"""``True`` if this is a public key, otherwise ``False``"""
return isinstance(self._key, Public) and not isinstance(self._key, Private)
@property
def is_unlocked(self):
"""``False`` if this is a private key that is protected with a passphrase and has not yet been unlocked, otherwise ``True``"""
if self.is_public:
return True
if not self.is_protected:
return True
return self._key.unlocked
@property
def key_algorithm(self):
"""The :py:obj:`constants.PubKeyAlgorithm` pertaining to this key"""
return self._key.pkalg
@property
def key_size(self):
"""*new in 0.4.1*
The size pertaining to this key. ``int`` for non-EC key algorithms; :py:obj:`constants.EllipticCurveOID` for EC keys.
"""
if self.key_algorithm in {PubKeyAlgorithm.ECDSA, PubKeyAlgorithm.ECDH}:
return self._key.keymaterial.oid
return next(iter(self._key.keymaterial)).bit_length()
@property
def magic(self):
return '{:s} KEY BLOCK'.format('PUBLIC' if (isinstance(self._key, Public) and not isinstance(self._key, Private)) else
'PRIVATE' if isinstance(self._key, Private) else '')
@property
def pubkey(self):
"""If the :py:obj:`PGPKey` object is a private key, this method returns a corresponding public key object with
all the trimmings. Otherwise, returns ``None``
"""
if not self.is_public:
if self._sibling is None or isinstance(self._sibling, weakref.ref):
# create a new key shell
pub = PGPKey()
pub.ascii_headers = self.ascii_headers.copy()
# get the public half of the primary key
pub._key = self._key.pubkey()
# get the public half of each subkey
for skid, subkey in self.subkeys.items():
pub |= subkey.pubkey
# copy user ids and user attributes
for uid in self._uids:
pub |= copy.copy(uid)
# copy signatures that weren't copied with uids
for sig in self._signatures:
if sig.parent is None:
pub |= copy.copy(sig)
# keep connect the two halves using a weak reference
self._sibling = weakref.ref(pub)
pub._sibling = weakref.ref(self)
return self._sibling()
return None
@pubkey.setter
def pubkey(self, pubkey):
if self.is_public:
raise TypeError("cannot add public sibling to pubkey")
if not pubkey.is_public:
raise TypeError("sibling must be public")
if self._sibling is not None and self._sibling() is not None:
raise ValueError("public key reference already set")
if pubkey.fingerprint != self.fingerprint:
raise ValueError("key fingerprint mismatch")
# TODO: sync packets with sibling
self._sibling = weakref.ref(pubkey)
pubkey._sibling = weakref.ref(self)
@property
def self_signatures(self):
keyid, keytype = (self.fingerprint.keyid, SignatureType.DirectlyOnKey) if self.is_primary \
else (self.parent.fingerprint.keyid, SignatureType.Subkey_Binding)
##TODO: filter out revoked signatures as well
for sig in iter(sig for sig in self._signatures
if all([sig.type == keytype, sig.signer == keyid, not sig.is_expired])):
yield sig
@property
def signers(self):
"""A ``set`` of key ids of keys that were used to sign this key"""
return {sig.signer for sig in self.__sig__}
@property
def subkeys(self):
"""An :py:obj:`~collections.OrderedDict` of subkeys bound to this primary key, if applicable,
selected by 16-character keyid."""
return self._children
@property
def userids(self):
"""A ``list`` of :py:obj:`PGPUID` objects containing User ID information about this key"""
return [ u for u in self._uids if u.is_uid ]
@property
def userattributes(self):
"""A ``list`` of :py:obj:`PGPUID` objects containing one or more images associated with this key"""
return [u for u in self._uids if u.is_ua]
@classmethod
def new(cls, key_algorithm, key_size):
"""
Generate a new PGP key
:param key_algorithm: Key algorithm to use.
:type key_algorithm: A :py:obj:`~constants.PubKeyAlgorithm`
:param key_size: Key size in bits, unless `key_algorithm` is :py:obj:`~constants.PubKeyAlgorithm.ECDSA` or
:py:obj:`~constants.PubKeyAlgorithm.ECDH`, in which case it should be the Curve OID to use.
:type key_size: ``int`` or :py:obj:`~constants.EllipticCurveOID`
:return: A newly generated :py:obj:`PGPKey`
"""
# new private key shell first
key = PGPKey()
if key_algorithm in {PubKeyAlgorithm.RSAEncrypt, PubKeyAlgorithm.RSASign}: # pragma: no cover
warnings.warn('{:s} is deprecated - generating key using RSAEncryptOrSign'.format(key_algorithm.name))
key_algorithm = PubKeyAlgorithm.RSAEncryptOrSign
# generate some key data to match key_algorithm and key_size
key._key = PrivKeyV4.new(key_algorithm, key_size)
return key
def __init__(self):
"""
PGPKey objects represent OpenPGP compliant keys along with all of their associated data.
PGPKey implements the `__str__` method, the output of which will be the key composition in
OpenPGP-compliant ASCII-armored format.
PGPKey implements the `__bytes__` method, the output of which will be the key composition in
OpenPGP-compliant binary format.
Any signatures within the PGPKey that are marked as being non-exportable will not be included in the output
of either of those methods.
"""
super(PGPKey, self).__init__()
self._key = None
self._children = collections.OrderedDict()
self._signatures = SorteDeque()
self._uids = SorteDeque()
self._sibling = None
def __bytearray__(self):
_bytes = bytearray()
# us
_bytes += self._key.__bytearray__()
# our signatures; ignore embedded signatures
for sig in iter(s for s in self._signatures if not s.embedded and s.exportable):
_bytes += sig.__bytearray__()
# one or more User IDs, followed by their signatures
for uid in self._uids:
_bytes += uid._uid.__bytearray__()
for s in [s for s in uid._signatures if s.exportable]:
_bytes += s.__bytearray__()
# subkeys
for sk in self._children.values():
_bytes += sk.__bytearray__()
return _bytes
def __repr__(self):
if self._key is not None:
return "<PGPKey [{:s}][0x{:s}] at 0x{:02X}>" \
"".format(self._key.__class__.__name__, self.fingerprint.keyid, id(self))
return "<PGPKey [unknown] at 0x{:02X}>" \
"".format(id(self))
def __contains__(self, item):
if isinstance(item, PGPKey): # pragma: no cover
return item.fingerprint.keyid in self.subkeys
if isinstance(item, Fingerprint): # pragma: no cover
return item.keyid in self.subkeys
if isinstance(item, PGPUID):
return item in self._uids
if isinstance(item, PGPSignature):
return item in self._signatures
raise TypeError
def __or__(self, other, from_sib=False):
if isinstance(other, Key) and self._key is None:
self._key = other
elif isinstance(other, PGPKey) and not other.is_primary and other.is_public == self.is_public:
other._parent = self
self._children[other.fingerprint.keyid] = other
elif isinstance(other, PGPSignature):
self._signatures.insort(other)
# if this is a subkey binding signature that has embedded primary key binding signatures, add them to parent
if other.type == SignatureType.Subkey_Binding:
for es in iter(pkb for pkb in other._signature.subpackets['EmbeddedSignature']):
esig = PGPSignature() | es
esig._parent = other
self._signatures.insort(esig)
elif isinstance(other, PGPUID):
other._parent = weakref.ref(self)
self._uids.insort(other)
else:
raise TypeError("unsupported operand type(s) for |: '{:s}' and '{:s}'"
"".format(self.__class__.__name__, other.__class__.__name__))
if isinstance(self._sibling, weakref.ref) and not from_sib:
sib = self._sibling()
if sib is None:
self._sibling = None
else: # pragma: no cover
sib.__or__(copy.copy(other), True)
return self
def __copy__(self):
key = super(PGPKey, self).__copy__()
key._key = copy.copy(self._key)
for uid in self._uids:
key |= copy.copy(uid)
for id, subkey in self._children.items():
key |= copy.copy(subkey)
for sig in self._signatures:
if sig.embedded:
# embedded signatures don't need to be explicitly copied
continue
key |= copy.copy(sig)
return key
def protect(self, passphrase, enc_alg, hash_alg):
"""
Add a passphrase to a private key. If the key is already passphrase protected, it should be unlocked before
a new passphrase can be specified.
Has no effect on public keys.
:param passphrase: A passphrase to protect the key with
:type passphrase: ``str``, ``unicode``
:param enc_alg: Symmetric encryption algorithm to use to protect the key
:type enc_alg: :py:obj:`~constants.SymmetricKeyAlgorithm`
:param hash_alg: Hash algorithm to use in the String-to-Key specifier
:type hash_alg: :py:obj:`~constants.HashAlgorithm`
"""
##TODO: specify strong defaults for enc_alg and hash_alg
if self.is_public:
# we can't protect public keys because only private key material is ever protected
warnings.warn("Public keys cannot be passphrase-protected", stacklevel=2)
return
if self.is_protected and not self.is_unlocked:
# we can't protect a key that is already protected unless it is unlocked first
warnings.warn("This key is already protected with a passphrase - "
"please unlock it before attempting to specify a new passphrase", stacklevel=2)
return
for sk in itertools.chain([self], self.subkeys.values()):
sk._key.protect(passphrase, enc_alg, hash_alg)
del passphrase
@contextlib.contextmanager
def unlock(self, passphrase):
"""
Context manager method for unlocking passphrase-protected private keys. Has no effect if the key is not both
private and passphrase-protected.
When the context managed block is exited, the unprotected private key material is removed.
Example::
privkey = PGPKey()
privkey.parse(keytext)
assert privkey.is_protected
assert privkey.is_unlocked is False
# privkey.sign("some text") <- this would raise an exception
with privkey.unlock("TheCorrectPassphrase"):
# privkey is now unlocked
assert privkey.is_unlocked
# so you can do things with it
sig = privkey.sign("some text")
# privkey is no longer unlocked
assert privkey.is_unlocked is False
Emits a :py:obj:`~warnings.UserWarning` if the key is public or not passphrase protected.
:param str passphrase: The passphrase to be used to unlock this key.
:raises: :py:exc:`~pgpy.errors.PGPDecryptionError` if the passphrase is incorrect
"""
if self.is_public:
# we can't unprotect public keys because only private key material is ever protected
warnings.warn("Public keys cannot be passphrase-protected", stacklevel=3)
yield self
return
if not self.is_protected:
# we can't unprotect private keys that are not protected, because there is no ciphertext to decrypt
warnings.warn("This key is not protected with a passphrase", stacklevel=3)
yield self
return
try:
for sk in itertools.chain([self], self.subkeys.values()):
sk._key.unprotect(passphrase)
del passphrase
yield self
finally:
# clean up here by deleting the previously decrypted secret key material
for sk in itertools.chain([self], self.subkeys.values()):
sk._key.keymaterial.clear()
def add_uid(self, uid, selfsign=True, **prefs):
"""
Add a User ID to this key.
:param uid: The user id to add
:type uid: :py:obj:`~pgpy.PGPUID`
:param selfsign: Whether or not to self-sign the user id before adding it
:type selfsign: ``bool``
Valid optional keyword arguments are identical to those of self-signatures for :py:meth:`PGPKey.certify`.
Any such keyword arguments are ignored if selfsign is ``False``
"""
uid._parent = self
if selfsign:
uid |= self.certify(uid, SignatureType.Positive_Cert, **prefs)
self |= uid
def get_uid(self, search):
"""
Find and return a User ID that matches the search string given.
:param search: A text string to match name, comment, or email address against
:type search: ``str``, ``unicode``
:return: The first matching :py:obj:`~pgpy.PGPUID`, or ``None`` if no matches were found.
"""
if self.is_primary:
return next((u for u in self._uids if search in filter(lambda a: a is not None, (u.name, u.comment, u.email))), None)
return self.parent.get_uid(search)
def del_uid(self, search):
"""
Find and remove a user id that matches the search string given. This method does not modify the corresponding
:py:obj:`~pgpy.PGPUID` object; it only removes it from the list of user ids on the key.
:param search: A text string to match name, comment, or email address against
:type search: ``str``, ``unicode``
"""
u = self.get_uid(search)
if u is None:
raise KeyError("uid '{:s}' not found".format(search))
u._parent = None
self._uids.remove(u)
def add_subkey(self, key, **prefs):
"""
Add a key as a subkey to this key.
:param key: A private :py:obj:`~pgpy.PGPKey` that does not have any subkeys of its own
:keyword usage: A ``set`` of key usage flags, as :py:obj:`~constants.KeyFlags` for the subkey to be added.
:type usage: ``set``
Other valid optional keyword arguments are identical to those of self-signatures for :py:meth:`PGPKey.certify`
"""
if self.is_public:
raise PGPError("Cannot add a subkey to a public key. Add the subkey to the private component first!")
if key.is_public:
raise PGPError("Cannot add a public key as a subkey to this key")
if key.is_primary:
if len(key._children) > 0:
raise PGPError("Cannot add a key that already has subkeys as a subkey!")
# convert key into a subkey
npk = PrivSubKeyV4()
npk.pkalg = key._key.pkalg
npk.created = key._key.created
npk.keymaterial = key._key.keymaterial
key._key = npk
key._key.update_hlen()
self._children[key.fingerprint.keyid] = key
key._parent = self
##TODO: skip this step if the key already has a subkey binding signature
bsig = self.bind(key, **prefs)
key |= bsig
def _get_key_flags(self, user=None):
if self.is_primary:
if user is not None:
user = self.get_uid(user)
elif len(self._uids) == 0:
return {KeyFlags.Certify}
else:
user = next(iter(self.userids))
# RFC 4880 says that primary keys *must* be capable of certification
return {KeyFlags.Certify} | user.selfsig.key_flags
return next(self.self_signatures).key_flags
def _sign(self, subject, sig, **prefs):
"""
The actual signing magic happens here.
:param subject: The subject to sign
:param sig: The :py:obj:`PGPSignature` object the new signature is to be encapsulated within
:returns: ``sig``, after the signature is added to it.
"""
user = prefs.pop('user', None)
uid = None
if user is not None:
uid = self.get_uid(user)
else:
uid = next(iter(self.userids), None)
if uid is None and self.parent is not None:
uid = next(iter(self.parent.userids), None)
if sig.hash_algorithm is None:
sig._signature.halg = uid.selfsig.hashprefs[0]
if uid is not None and sig.hash_algorithm not in uid.selfsig.hashprefs:
warnings.warn("Selected hash algorithm not in key preferences", stacklevel=4)
# signature options that can be applied at any level
expires = prefs.pop('expires', None)
notation = prefs.pop('notation', None)
revocable = prefs.pop('revocable', True)
policy_uri = prefs.pop('policy_uri', None)
if expires is not None:
# expires should be a timedelta, so if it's a datetime, turn it into a timedelta
if isinstance(expires, datetime):
expires = expires - self.created
sig._signature.subpackets.addnew('SignatureExpirationTime', hashed=True, expires=expires)
if revocable is False:
sig._signature.subpackets.addnew('Revocable', hashed=True, bflag=revocable)
if notation is not None:
for name, value in notation.items():
# mark all notations as human readable unless value is a bytearray
flags = NotationDataFlags.HumanReadable
if isinstance(value, bytearray):
flags = 0x00
sig._signature.subpackets.addnew('NotationData', hashed=True, flags=flags, name=name, value=value)
if policy_uri is not None:
sig._signature.subpackets.addnew('Policy', hashed=True, uri=policy_uri)
if user is not None and uid is not None:
signers_uid = "{:s}".format(uid)
sig._signature.subpackets.addnew('SignersUserID', hashed=True, userid=signers_uid)
# handle an edge case for timestamp signatures vs standalone signatures
if sig.type == SignatureType.Timestamp and len(sig._signature.subpackets._hashed_sp) > 1:
sig._signature.sigtype = SignatureType.Standalone
sigdata = sig.hashdata(subject)
h2 = sig.hash_algorithm.hasher
h2.update(sigdata)
sig._signature.hash2 = bytearray(h2.digest()[:2])
_sig = self._key.sign(sigdata, getattr(hashes, sig.hash_algorithm.name)())
if _sig is NotImplemented:
raise NotImplementedError(self.key_algorithm)
sig._signature.signature.from_signer(_sig)
sig._signature.update_hlen()
return sig
@KeyAction(KeyFlags.Sign, is_unlocked=True, is_public=False)
def sign(self, subject, **prefs):
"""
Sign text, a message, or a timestamp using this key.
:param subject: The text to be signed
:type subject: ``str``, :py:obj:`~pgpy.PGPMessage`, ``None``
:raises: :py:exc:`~pgpy.errors.PGPError` if the key is passphrase-protected and has not been unlocked
:raises: :py:exc:`~pgpy.errors.PGPError` if the key is public
:returns: :py:obj:`PGPSignature`
The following optional keyword arguments can be used with :py:meth:`PGPKey.sign`, as well as
:py:meth:`PGPKey.certify`, :py:meth:`PGPKey.revoke`, and :py:meth:`PGPKey.bind`:
:keyword expires: Set an expiration date for this signature
:type expires: :py:obj:`~datetime.datetime`, :py:obj:`~datetime.timedelta`
:keyword notation: Add arbitrary notation data to this signature.
:type notation: ``dict``
:keyword policy_uri: Add a URI to the signature that should describe the policy under which the signature
was issued.
:type policy_uri: ``str``
:keyword revocable: If ``False``, this signature will be marked non-revocable
:type revocable: ``bool``
:keyword user: Specify which User ID to use when creating this signature. Also adds a "Signer's User ID"
to the signature.
:type user: ``str``
"""
sig_type = SignatureType.BinaryDocument
hash_algo = prefs.pop('hash', None)
if subject is None:
sig_type = SignatureType.Timestamp
if isinstance(subject, PGPMessage):
if subject.type == 'cleartext':
sig_type = SignatureType.CanonicalDocument
subject = subject.message
sig = PGPSignature.new(sig_type, self.key_algorithm, hash_algo, self.fingerprint.keyid)
return self._sign(subject, sig, **prefs)
@KeyAction(KeyFlags.Certify, is_unlocked=True, is_public=False)
def certify(self, subject, level=SignatureType.Generic_Cert, **prefs):
"""
Sign a key or a user id within a key.
:param subject: The user id or key to be certified.
:type subject: :py:obj:`PGPKey`, :py:obj:`PGPUID`
:param level: :py:obj:`~constants.SignatureType.Generic_Cert`, :py:obj:`~constants.SignatureType.Persona_Cert`,
:py:obj:`~constants.SignatureType.Casual_Cert`, or :py:obj:`~constants.SignatureType.Positive_Cert`.
Only used if subject is a :py:obj:`PGPUID`; otherwise, it is ignored.
:raises: :py:exc:`~pgpy.errors.PGPError` if the key is passphrase-protected and has not been unlocked
:raises: :py:exc:`~pgpy.errors.PGPError` if the key is public
:returns: :py:obj:`PGPSignature`
In addition to the optional keyword arguments accepted by :py:meth:`PGPKey.sign`, the following optional
keyword arguments can be used with :py:meth:`PGPKey.certify`.
These optional keywords only make sense, and thus only have an effect, when self-signing a key or User ID:
:keyword usage: A ``set`` of key usage flags, as :py:obj:`~constants.KeyFlags`.
This keyword is ignored for non-self-certifications.
:type usage: ``set``
:keyword ciphers: A list of preferred symmetric ciphers, as :py:obj:`~constants.SymmetricKeyAlgorithm`.
This keyword is ignored for non-self-certifications.
:type ciphers: ``list``
:keyword hashes: A list of preferred hash algorithms, as :py:obj:`~constants.HashAlgorithm`.
This keyword is ignored for non-self-certifications.
:type hashes: ``list``
:keyword compression: A list of preferred compression algorithms, as :py:obj:`~constants.CompressionAlgorithm`.
This keyword is ignored for non-self-certifications.
:type compression: ``list``
:keyword key_expiration: Specify a key expiration date for when this key should expire, or a
:py:obj:`~datetime.timedelta` of how long after the key was created it should expire.
This keyword is ignored for non-self-certifications.
:type key_expiration: :py:obj:`datetime.datetime`, :py:obj:`datetime.timedelta`
:keyword keyserver: Specify the URI of the preferred key server of the user.
This keyword is ignored for non-self-certifications.
:type keyserver: ``str``, ``unicode``, ``bytes``
:keyword primary: Whether or not to consider the certified User ID as the primary one.
This keyword is ignored for non-self-certifications, and any certifications directly on keys.
:type primary: ``bool``
These optional keywords only make sense, and thus only have an effect, when signing another key or User ID:
:keyword trust: Specify the level and amount of trust to assert when certifying a public key. Should be a tuple
of two ``int`` s, specifying the trust level and trust amount. See
`RFC 4880 Section 5.2.3.13. Trust Signature <https://tools.ietf.org/html/rfc4880#section-5.2.3.13>`_
for more on what these values mean.
:type trust: ``tuple`` of two ``int`` s
:keyword regex: Specify a regular expression to constrain the specified trust signature in the resulting signature.
Symbolically signifies that the specified trust signature only applies to User IDs which match
this regular expression.
This is meaningless without also specifying trust level and amount.
:type regex: ``str``
"""
hash_algo = prefs.pop('hash', None)
sig_type = level
if isinstance(subject, PGPKey):
sig_type = SignatureType.DirectlyOnKey
sig = PGPSignature.new(sig_type, self.key_algorithm, hash_algo, self.fingerprint.keyid)
# signature options that only make sense in certifications
usage = prefs.pop('usage', None)
exportable = prefs.pop('exportable', None)
if usage is not None:
sig._signature.subpackets.addnew('KeyFlags', hashed=True, flags=usage)
if exportable is not None:
sig._signature.subpackets.addnew('ExportableCertification', hashed=True, bflag=exportable)
keyfp = self.fingerprint
if isinstance(subject, PGPKey):
keyfp = subject.fingerprint
if isinstance(subject, PGPUID) and subject._parent is not None:
keyfp = subject._parent.fingerprint
if keyfp == self.fingerprint:
# signature options that only make sense in self-certifications
cipher_prefs = prefs.pop('ciphers', None)
hash_prefs = prefs.pop('hashes', None)
compression_prefs = prefs.pop('compression', None)
key_expires = prefs.pop('key_expiration', None)
keyserver_flags = prefs.pop('keyserver_flags', None)
keyserver = prefs.pop('keyserver', None)
primary_uid = prefs.pop('primary', None)
if key_expires is not None:
# key expires should be a timedelta, so if it's a datetime, turn it into a timedelta
if isinstance(key_expires, datetime):
key_expires = key_expires - self.created
sig._signature.subpackets.addnew('KeyExpirationTime', hashed=True, expires=key_expires)
if cipher_prefs is not None:
sig._signature.subpackets.addnew('PreferredSymmetricAlgorithms', hashed=True, flags=cipher_prefs)
if hash_prefs is not None:
sig._signature.subpackets.addnew('PreferredHashAlgorithms', hashed=True, flags=hash_prefs)
if sig.hash_algorithm is None:
sig._signature.halg = hash_prefs[0]
if compression_prefs is not None:
sig._signature.subpackets.addnew('PreferredCompressionAlgorithms', hashed=True, flags=compression_prefs)
if keyserver_flags is not None:
sig._signature.subpackets.addnew('KeyServerPreferences', hashed=True, flags=keyserver_flags)
if keyserver is not None:
sig._signature.subpackets.addnew('PreferredKeyServer', hashed=True, uri=keyserver)
if primary_uid is not None:
sig._signature.subpackets.addnew('PrimaryUserID', hashed=True, primary=primary_uid)
# Features is always set on self-signatures
sig._signature.subpackets.addnew('Features', hashed=True, flags=Features.pgpy_features)
else:
# signature options that only make sense in non-self-certifications
trust = prefs.pop('trust', None)
regex = prefs.pop('regex', None)
if trust is not None:
sig._signature.subpackets.addnew('TrustSignature', hashed=True, level=trust[0], amount=trust[1])
if regex is not None:
sig._signature.subpackets.addnew('RegularExpression', hashed=True, regex=regex)
return self._sign(subject, sig, **prefs)
@KeyAction(KeyFlags.Certify, is_unlocked=True, is_public=False)
def revoke(self, target, **prefs):
"""
Revoke a key, a subkey, or all current certification signatures of a User ID that were generated by this key so far.
:param target: The key to revoke
:type target: :py:obj:`PGPKey`, :py:obj:`PGPUID`
:raises: :py:exc:`~pgpy.errors.PGPError` if the key is passphrase-protected and has not been unlocked
:raises: :py:exc:`~pgpy.errors.PGPError` if the key is public
:returns: :py:obj:`PGPSignature`
In addition to the optional keyword arguments accepted by :py:meth:`PGPKey.sign`, the following optional
keyword arguments can be used with :py:meth:`PGPKey.revoke`.
:keyword reason: Defaults to :py:obj:`constants.RevocationReason.NotSpecified`
:type reason: One of :py:obj:`constants.RevocationReason`.
:keyword comment: Defaults to an empty string.
:type comment: ``str``
"""
hash_algo = prefs.pop('hash', None)
if isinstance(target, PGPUID):
sig_type = SignatureType.CertRevocation
elif isinstance(target, PGPKey):
##TODO: check to make sure that the key that is being revoked:
# - is this key
# - is one of this key's subkeys
# - specifies this key as its revocation key
if target.is_primary:
sig_type = SignatureType.KeyRevocation
else:
sig_type = SignatureType.SubkeyRevocation
else: # pragma: no cover
raise TypeError
sig = PGPSignature.new(sig_type, self.key_algorithm, hash_algo, self.fingerprint.keyid)
# signature options that only make sense when revoking
reason = prefs.pop('reason', RevocationReason.NotSpecified)
comment = prefs.pop('comment', "")
sig._signature.subpackets.addnew('ReasonForRevocation', hashed=True, code=reason, string=comment)
return self._sign(target, sig, **prefs)
@KeyAction(is_unlocked=True, is_public=False)
def revoker(self, revoker, **prefs):
"""
Generate a signature that specifies another key as being valid for revoking this key.
:param revoker: The :py:obj:`PGPKey` to specify as a valid revocation key.
:type revoker: :py:obj:`PGPKey`
:raises: :py:exc:`~pgpy.errors.PGPError` if the key is passphrase-protected and has not been unlocked
:raises: :py:exc:`~pgpy.errors.PGPError` if the key is public
:returns: :py:obj:`PGPSignature`
In addition to the optional keyword arguments accepted by :py:meth:`PGPKey.sign`, the following optional
keyword arguments can be used with :py:meth:`PGPKey.revoker`.
:keyword sensitive: If ``True``, this sets the sensitive flag on the RevocationKey subpacket. Currently,
this has no other effect.
:type sensitive: ``bool``
"""
hash_algo = prefs.pop('hash', None)
sig = PGPSignature.new(SignatureType.DirectlyOnKey, self.key_algorithm, hash_algo, self.fingerprint.keyid)
# signature options that only make sense when adding a revocation key
sensitive = prefs.pop('sensitive', False)
keyclass = RevocationKeyClass.Normal | (RevocationKeyClass.Sensitive if sensitive else 0x00)
sig._signature.subpackets.addnew('RevocationKey',
hashed=True,
algorithm=revoker.key_algorithm,
fingerprint=revoker.fingerprint,
keyclass=keyclass)
# revocation keys should really not be revocable themselves
prefs['revocable'] = False
return self._sign(self, sig, **prefs)
@KeyAction(is_unlocked=True, is_public=False)
def bind(self, key, **prefs):
"""
Bind a subkey to this key.
Valid optional keyword arguments are identical to those of self-signatures for :py:meth:`PGPkey.certify`
"""
hash_algo = prefs.pop('hash', None)
if self.is_primary and not key.is_primary:
sig_type = SignatureType.Subkey_Binding
elif key.is_primary and not self.is_primary:
sig_type = SignatureType.PrimaryKey_Binding
else: # pragma: no cover
raise PGPError
sig = PGPSignature.new(sig_type, self.key_algorithm, hash_algo, self.fingerprint.keyid)
if sig_type == SignatureType.Subkey_Binding:
# signature options that only make sense in subkey binding signatures
usage = prefs.pop('usage', None)
if usage is not None:
sig._signature.subpackets.addnew('KeyFlags', hashed=True, flags=usage)
# if possible, have the subkey create a primary key binding signature
if key.key_algorithm.can_sign:
subkeyid = key.fingerprint.keyid
esig = None
if not key.is_public:
esig = key.bind(self)
elif subkeyid in self.subkeys: # pragma: no cover
esig = self.subkeys[subkeyid].bind(self)
if esig is not None:
sig._signature.subpackets.addnew('EmbeddedSignature', hashed=False, _sig=esig._signature)
return self._sign(key, sig, **prefs)
def verify(self, subject, signature=None):
"""
Verify a subject with a signature using this key.
:param subject: The subject to verify
:type subject: ``str``, ``unicode``, ``None``, :py:obj:`PGPMessage`, :py:obj:`PGPKey`, :py:obj:`PGPUID`
:param signature: If the signature is detached, it should be specified here.
:type signature: :py:obj:`PGPSignature`
:returns: :py:obj:`~pgpy.types.SignatureVerification`
"""
sspairs = []
# some type checking
if not isinstance(subject, (type(None), PGPMessage, PGPKey, PGPUID, PGPSignature, six.string_types, bytes, bytearray)):
raise TypeError("Unexpected subject value: {:s}".format(str(type(subject))))
if not isinstance(signature, (type(None), PGPSignature)):
raise TypeError("Unexpected signature value: {:s}".format(str(type(signature))))
def _filter_sigs(sigs):
_ids = {self.fingerprint.keyid} | set(self.subkeys)
return [ sig for sig in sigs if sig.signer in _ids ]
# collect signature(s)
if signature is None:
if isinstance(subject, PGPMessage):
sspairs += [ (sig, subject.message) for sig in _filter_sigs(subject.signatures) ]
if isinstance(subject, (PGPUID, PGPKey)):
sspairs += [ (sig, subject) for sig in _filter_sigs(subject.__sig__) ]
if isinstance(subject, PGPKey):
# user ids
sspairs += [ (sig, uid) for uid in subject.userids for sig in _filter_sigs(uid.__sig__) ]
# user attributes
sspairs += [ (sig, ua) for ua in subject.userattributes for sig in _filter_sigs(ua.__sig__) ]
# subkey binding signatures
sspairs += [ (sig, subkey) for subkey in subject.subkeys.values() for sig in _filter_sigs(subkey.__sig__) ]
elif signature.signer in {self.fingerprint.keyid} | set(self.subkeys):
sspairs += [(signature, subject)]
if len(sspairs) == 0:
raise PGPError("No signatures to verify")
# finally, start verifying signatures
sigv = SignatureVerification()
for sig, subj in sspairs:
if self.fingerprint.keyid != sig.signer and sig.signer in self.subkeys:
warnings.warn("Signature was signed with this key's subkey: {:s}. "
"Verifying with subkey...".format(sig.signer),
stacklevel=2)
sigv &= self.subkeys[sig.signer].verify(subj, sig)
else:
verified = self._key.verify(sig.hashdata(subj), sig.__sig__, getattr(hashes, sig.hash_algorithm.name)())
if verified is NotImplemented:
raise NotImplementedError(sig.key_algorithm)
sigv.add_sigsubj(sig, self.fingerprint.keyid, subj, verified)
return sigv
@KeyAction(KeyFlags.EncryptCommunications, KeyFlags.EncryptStorage, is_public=True)
def encrypt(self, message, sessionkey=None, **prefs):
"""
Encrypt a PGPMessage using this key.
:param message: The message to encrypt.
:type message: :py:obj:`PGPMessage`
:optional param sessionkey: Provide a session key to use when encrypting something. Default is ``None``.
If ``None``, a session key of the appropriate length will be generated randomly.
.. warning::
Care should be taken when making use of this option! Session keys *absolutely need*
to be unpredictable! Use the ``gen_key()`` method on the desired
:py:obj:`~constants.SymmetricKeyAlgorithm` to generate the session key!
:type sessionkey: ``bytes``, ``str``
:raises: :py:exc:`~errors.PGPEncryptionError` if encryption failed for any reason.
:returns: A new :py:obj:`PGPMessage` with the encrypted contents of ``message``
The following optional keyword arguments can be used with :py:meth:`PGPKey.encrypt`:
:keyword cipher: Specifies the symmetric block cipher to use when encrypting the message.
:type cipher: :py:obj:`~constants.SymmetricKeyAlgorithm`
:keyword user: Specifies the User ID to use as the recipient for this encryption operation, for the purposes of
preference defaults and selection validation.
:type user: ``str``, ``unicode``
"""
user = prefs.pop('user', None)
uid = None
if user is not None:
uid = self.get_uid(user)
else:
uid = next(iter(self.userids), None)
if uid is None and self.parent is not None:
uid = next(iter(self.parent.userids), None)
cipher_algo = prefs.pop('cipher', uid.selfsig.cipherprefs[0])
if cipher_algo not in uid.selfsig.cipherprefs:
warnings.warn("Selected symmetric algorithm not in key preferences", stacklevel=3)
if message.is_compressed and message._compression not in uid.selfsig.compprefs:
warnings.warn("Selected compression algorithm not in key preferences", stacklevel=3)
if sessionkey is None:
sessionkey = cipher_algo.gen_key()
# set up a new PKESessionKeyV3
pkesk = PKESessionKeyV3()
pkesk.encrypter = bytearray(binascii.unhexlify(self.fingerprint.keyid.encode('latin-1')))
pkesk.pkalg = self.key_algorithm
# pkesk.encrypt_sk(self.__key__, cipher_algo, sessionkey)
pkesk.encrypt_sk(self._key, cipher_algo, sessionkey)
if message.is_encrypted: # pragma: no cover
_m = message
else:
_m = PGPMessage()
skedata = IntegrityProtectedSKEDataV1()
skedata.encrypt(sessionkey, cipher_algo, message.__bytes__())
_m |= skedata
_m |= pkesk
return _m
@KeyAction(is_unlocked=True, is_public=False)
def decrypt(self, message):
"""
Decrypt a PGPMessage using this key.
:param message: An encrypted :py:obj:`PGPMessage`
:raises: :py:exc:`~errors.PGPError` if the key is not private, or protected but not unlocked.
:raises: :py:exc:`~errors.PGPDecryptionError` if decryption fails for any other reason.
:returns: A new :py:obj:`PGPMessage` with the decrypted contents of ``message``.
"""
if not message.is_encrypted:
warnings.warn("This message is not encrypted", stacklevel=3)
return message
if self.fingerprint.keyid not in message.encrypters:
sks = set(self.subkeys)
mis = set(message.encrypters)
if sks & mis:
skid = list(sks & mis)[0]
warnings.warn("Message was encrypted with this key's subkey: {:s}. "
"Decrypting with that...".format(skid),
stacklevel=2)
return self.subkeys[skid].decrypt(message)
raise PGPError("Cannot decrypt the provided message with this key")
pkesk = next(pk for pk in message._sessionkeys if pk.pkalg == self.key_algorithm and pk.encrypter == self.fingerprint.keyid)
alg, key = pkesk.decrypt_sk(self._key)
# now that we have the symmetric cipher used and the key, we can decrypt the actual message
decmsg = PGPMessage()
decmsg.parse(message.message.decrypt(key, alg))
return decmsg
def parse(self, data):
unarmored = self.ascii_unarmor(data)
data = unarmored['body']
if unarmored['magic'] is not None and 'KEY' not in unarmored['magic']:
raise ValueError('Expected: KEY. Got: {}'.format(str(unarmored['magic'])))
if unarmored['headers'] is not None:
self.ascii_headers = unarmored['headers']
# parse packets
# keys will hold other keys parsed here
keys = collections.OrderedDict()
# orphaned will hold all non-opaque orphaned packets
orphaned = []
# last holds the last non-signature thing processed
##TODO: see issue #141 and fix this better
getpkt = lambda d: Packet(d) if len(d) > 0 else None # flake8: noqa
# some packets are filtered out
getpkt = filter(lambda p: p.header.tag != PacketTag.Trust, iter(functools.partial(getpkt, data), None))
def pktgrouper():
class PktGrouper(object):
def __init__(self):
self.last = None
def __call__(self, pkt):
if pkt.header.tag != PacketTag.Signature:
self.last = '{:02X}_{:s}'.format(id(pkt), pkt.__class__.__name__)
return self.last
return PktGrouper()
while True:
# print(type(p) for p in getpkt)
for group in iter(group for _, group in itertools.groupby(getpkt, key=pktgrouper()) if not _.endswith('Opaque')):
pkt = next(group)
# deal with pkt first
if isinstance(pkt, Key):
pgpobj = (self if self._key is None else PGPKey()) | pkt
elif isinstance(pkt, (UserID, UserAttribute)):
pgpobj = PGPUID() | pkt
else: # pragma: no cover
break
# add signatures to whatever we got
[ operator.ior(pgpobj, PGPSignature() | sig) for sig in group if not isinstance(sig, Opaque) ]
# and file away pgpobj
if isinstance(pgpobj, PGPKey):
if pgpobj.is_primary:
keys[(pgpobj.fingerprint.keyid, pgpobj.is_public)] = pgpobj
else:
keys[next(reversed(keys))] |= pgpobj
elif isinstance(pgpobj, PGPUID):
# parent is likely the most recently parsed primary key
keys[next(reversed(keys))] |= pgpobj
else: # pragma: no cover
break
else:
# finished normally
break
# this will only be reached called if the inner loop hit a break
warnings.warn("Warning: Orphaned packet detected! {:s}".format(repr(pkt)), stacklevel=2) # pragma: no cover
orphaned.append(pkt) # pragma: no cover
for pkt in group: # pragma: no cover
orphaned.append(pkt)
# remove the reference to self from keys
[ keys.pop((getattr(self, 'fingerprint.keyid', '~'), None), t) for t in (True, False) ]
# return {'keys': keys, 'orphaned': orphaned}
return keys
class PGPKeyring(collections.Container, collections.Iterable, collections.Sized):
def __init__(self, *args):
"""
PGPKeyring objects represent in-memory keyrings that can contain any combination of supported private and public
keys. It can not currently be conveniently exported to a format that can be understood by GnuPG.
"""
super(PGPKeyring, self).__init__()
self._keys = {}
self._pubkeys = collections.deque()
self._privkeys = collections.deque()
self._aliases = collections.deque([{}])
self.load(*args)
def __contains__(self, alias):
aliases = set().union(*self._aliases)
if isinstance(alias, six.string_types):
return alias in aliases or alias.replace(' ', '') in aliases
return alias in aliases # pragma: no cover
def __len__(self):
return len(self._keys)
def __iter__(self): # pragma: no cover
for pgpkey in itertools.chain(self._pubkeys, self._privkeys):
yield pgpkey
def _get_key(self, alias):
for m in self._aliases:
if alias in m:
return self._keys[m[alias]]
if alias.replace(' ', '') in m:
return self._keys[m[alias.replace(' ', '')]]
raise KeyError(alias)
def _get_keys(self, alias):
return [self._keys[m[alias]] for m in self._aliases if alias in m]
def _sort_alias(self, alias):
# remove alias from all levels of _aliases, and sort by created time and key half
# so the order of _aliases from left to right:
# - newer keys come before older ones
# - private keys come before public ones
#
# this list is sorted in the opposite direction from that, because they will be placed into self._aliases
# from right to left.
pkids = sorted(list(set().union(m.pop(alias) for m in self._aliases if alias in m)),
key=lambda pkid: (self._keys[pkid].created, self._keys[pkid].is_public))
# drop the now-sorted aliases into place
for depth, pkid in enumerate(pkids):
self._aliases[depth][alias] = pkid
# finally, remove any empty dicts left over
while {} in self._aliases: # pragma: no cover
self._aliases.remove({})
def _add_alias(self, alias, pkid):
# brand new alias never seen before!
if alias not in self:
self._aliases[-1][alias] = pkid
# this is a duplicate alias->key link; ignore it
elif alias in self and pkid in set(m[alias] for m in self._aliases if alias in m):
pass # pragma: no cover
# this is an alias that already exists, but points to a key that is not already referenced by it
else:
adepth = len(self._aliases) - len([None for m in self._aliases if alias in m]) - 1
# all alias maps have this alias, so increase total depth by 1
if adepth == -1:
self._aliases.appendleft({})
adepth = 0
self._aliases[adepth][alias] = pkid
self._sort_alias(alias)
def _add_key(self, pgpkey):
pkid = id(pgpkey)
if pkid not in self._keys:
self._keys[pkid] = pgpkey
# add to _{pub,priv}keys if this is either a primary key, or a subkey without one
if pgpkey.parent is None:
if pgpkey.is_public:
self._pubkeys.append(pkid)
else:
self._privkeys.append(pkid)
# aliases
self._add_alias(pgpkey.fingerprint, pkid)
self._add_alias(pgpkey.fingerprint.keyid, pkid)
self._add_alias(pgpkey.fingerprint.shortid, pkid)
for uid in pgpkey.userids:
self._add_alias(uid.name, pkid)
if uid.comment:
self._add_alias(uid.comment, pkid)
if uid.email:
self._add_alias(uid.email, pkid)
# subkeys
for subkey in pgpkey.subkeys.values():
self._add_key(subkey)
def load(self, *args):
"""
Load all keys provided into this keyring object.
:param \*args: Each arg in ``args`` can be any of the formats supported by :py:meth:`PGPKey.from_path` and
:py:meth:`PGPKey.from_blob`, or a ``list`` or ``tuple`` of these.
:type \*args: ``list``, ``tuple``, ``str``, ``unicode``, ``bytes``, ``bytearray``
:returns: a ``set`` containing the unique fingerprints of all of the keys that were loaded during this operation.
"""
def _preiter(first, iterable):
yield first
for item in iterable:
yield item
loaded = set()
for key in iter(item for ilist in iter(ilist if isinstance(ilist, (tuple, list)) else [ilist] for ilist in args)
for item in ilist):
if os.path.isfile(key):
_key, keys = PGPKey.from_file(key)
else:
_key, keys = PGPKey.from_blob(key)
for ik in _preiter(_key, keys.values()):
self._add_key(ik)
loaded |= {ik.fingerprint} | {isk.fingerprint for isk in ik.subkeys.values()}
return list(loaded)
@contextlib.contextmanager
def key(self, identifier):
"""
A context-manager method. Yields the first :py:obj:`PGPKey` object that matches the provided identifier.
:param identifier: The identifier to use to select a loaded key.
:type identifier: :py:exc:`PGPMessage`, :py:exc:`PGPSignature`, ``str``
:raises: :py:exc:`KeyError` if there is no loaded key that satisfies the identifier.
"""
if isinstance(identifier, PGPMessage):
for issuer in identifier.issuers:
if issuer in self:
identifier = issuer
break
if isinstance(identifier, PGPSignature):
identifier = identifier.signer
yield self._get_key(identifier)
def fingerprints(self, keyhalf='any', keytype='any'):
"""
List loaded fingerprints with some optional filtering.
:param str keyhalf: Can be 'any', 'public', or 'private'. If 'public', or 'private', the fingerprints of keys of the
the other type will not be included in the results.
:param str keytype: Can be 'any', 'primary', or 'sub'. If 'primary' or 'sub', the fingerprints of keys of the
the other type will not be included in the results.
:returns: a ``set`` of fingerprints of keys matching the filters specified.
"""
return {pk.fingerprint for pk in self._keys.values()
if pk.is_primary in [True if keytype in ['primary', 'any'] else None,
False if keytype in ['sub', 'any'] else None]
if pk.is_public in [True if keyhalf in ['public', 'any'] else None,
False if keyhalf in ['private', 'any'] else None]}
def unload(self, key):
"""
Unload a loaded key and its subkeys.
The easiest way to do this is to select a key using :py:meth:`PGPKeyring.key` first::
with keyring.key("DSA von TestKey") as key:
keyring.unload(key)
:param key: The key to unload.
:type key: :py:obj:`PGPKey`
"""
assert isinstance(key, PGPKey)
pkid = id(key)
if pkid in self._keys:
# remove references
[ kd.remove(pkid) for kd in [self._pubkeys, self._privkeys] if pkid in kd ]
# remove the key
self._keys.pop(pkid)
# remove aliases
for m, a in [ (m, a) for m in self._aliases for a, p in m.items() if p == pkid ]:
m.pop(a)
# do a re-sort of this alias if it was not unique
if a in self:
self._sort_alias(a)
# if key is a primary key, unload its subkeys as well
if key.is_primary:
[ self.unload(sk) for sk in key.subkeys.values() ]
|