/usr/lib/python3/dist-packages/Crypto/SelfTest/Random/test__UserFriendlyRNG.py is in python3-crypto 2.6.1-8ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 | # -*- coding: utf-8 -*-
# Self-tests for the user-friendly Crypto.Random interface
#
# Written in 2013 by Dwayne C. Litzenberger <dlitz@dlitz.net>
#
# ===================================================================
# The contents of this file are dedicated to the public domain. To
# the extent that dedication to the public domain is not available,
# everyone is granted a worldwide, perpetual, royalty-free,
# non-exclusive license to exercise all rights associated with the
# contents of this file for any purpose whatsoever.
# No rights are reserved.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# ===================================================================
"""Self-test suite for generic Crypto.Random stuff """
__revision__ = "$Id$"
import binascii
import pprint
import unittest
import os
import time
import sys
if sys.version_info[0] == 2 and sys.version_info[1] == 1:
from Crypto.Util.py21compat import *
from Crypto.Util.py3compat import *
try:
import multiprocessing
# multiprocessing.Pool uses classes from multiprocessing.synchronize, so we
# need to check if multiprocessing.semaphore will work. Otherwise creating a
# multiprocessing.Pool instance will fail with an ImportError. See Python
# bug #3770 for details.
import multiprocessing.synchronize
except ImportError:
multiprocessing = None
import Crypto.Random._UserFriendlyRNG
import Crypto.Random.random
class RNGForkTest(unittest.TestCase):
def _get_reseed_count(self):
"""
Get `FortunaAccumulator.reseed_count`, the global count of the
number of times that the PRNG has been reseeded.
"""
rng_singleton = Crypto.Random._UserFriendlyRNG._get_singleton()
rng_singleton._lock.acquire()
try:
return rng_singleton._fa.reseed_count
finally:
rng_singleton._lock.release()
def runTest(self):
# Regression test for CVE-2013-1445. We had a bug where, under the
# right conditions, two processes might see the same random sequence.
if sys.platform.startswith('win'): # windows can't fork
assert not hasattr(os, 'fork') # ... right?
return
# Wait 150 ms so that we don't trigger the rate-limit prematurely.
time.sleep(0.15)
reseed_count_before = self._get_reseed_count()
# One or both of these calls together should trigger a reseed right here.
Crypto.Random._UserFriendlyRNG._get_singleton().reinit()
Crypto.Random.get_random_bytes(1)
reseed_count_after = self._get_reseed_count()
self.assertNotEqual(reseed_count_before, reseed_count_after) # sanity check: test should reseed parent before forking
rfiles = []
for i in range(10):
rfd, wfd = os.pipe()
if os.fork() == 0:
# child
os.close(rfd)
f = os.fdopen(wfd, "wb")
Crypto.Random.atfork()
data = Crypto.Random.get_random_bytes(16)
f.write(data)
f.close()
os._exit(0)
# parent
os.close(wfd)
rfiles.append(os.fdopen(rfd, "rb"))
results = []
results_dict = {}
for f in rfiles:
data = binascii.hexlify(f.read())
results.append(data)
results_dict[data] = 1
f.close()
if len(results) != len(list(results_dict.keys())):
raise AssertionError("RNG output duplicated across fork():\n%s" %
(pprint.pformat(results)))
# For RNGMultiprocessingForkTest
def _task_main(q):
a = Crypto.Random.get_random_bytes(16)
time.sleep(0.1) # wait 100 ms
b = Crypto.Random.get_random_bytes(16)
q.put(binascii.b2a_hex(a))
q.put(binascii.b2a_hex(b))
q.put(None) # Wait for acknowledgment
class RNGMultiprocessingForkTest(unittest.TestCase):
def runTest(self):
# Another regression test for CVE-2013-1445. This is basically the
# same as RNGForkTest, but less compatible with old versions of Python,
# and a little easier to read.
n_procs = 5
manager = multiprocessing.Manager()
queues = [manager.Queue(1) for i in range(n_procs)]
# Reseed the pool
time.sleep(0.15)
Crypto.Random._UserFriendlyRNG._get_singleton().reinit()
Crypto.Random.get_random_bytes(1)
# Start the child processes
pool = multiprocessing.Pool(processes=n_procs, initializer=Crypto.Random.atfork)
map_result = pool.map_async(_task_main, queues)
# Get the results, ensuring that no pool processes are reused.
aa = [queues[i].get(30) for i in range(n_procs)]
bb = [queues[i].get(30) for i in range(n_procs)]
res = list(zip(aa, bb))
# Shut down the pool
map_result.get(30)
pool.close()
pool.join()
# Check that the results are unique
if len(set(aa)) != len(aa) or len(set(res)) != len(res):
raise AssertionError("RNG output duplicated across fork():\n%s" %
(pprint.pformat(res),))
def get_tests(config={}):
tests = []
tests += [RNGForkTest()]
if multiprocessing is not None:
tests += [RNGMultiprocessingForkTest()]
return tests
if __name__ == '__main__':
suite = lambda: unittest.TestSuite(get_tests())
unittest.main(defaultTest='suite')
# vim:set ts=4 sw=4 sts=4 expandtab:
|