OLD | NEW |
(Empty) | |
| 1 # -*- coding: utf-8 -*- |
| 2 # Self-tests for the user-friendly Crypto.Random interface |
| 3 # |
| 4 # Written in 2013 by Dwayne C. Litzenberger <dlitz@dlitz.net> |
| 5 # |
| 6 # =================================================================== |
| 7 # The contents of this file are dedicated to the public domain. To |
| 8 # the extent that dedication to the public domain is not available, |
| 9 # everyone is granted a worldwide, perpetual, royalty-free, |
| 10 # non-exclusive license to exercise all rights associated with the |
| 11 # contents of this file for any purpose whatsoever. |
| 12 # No rights are reserved. |
| 13 # |
| 14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
| 15 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
| 16 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND |
| 17 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS |
| 18 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN |
| 19 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN |
| 20 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
| 21 # SOFTWARE. |
| 22 # =================================================================== |
| 23 |
| 24 """Self-test suite for generic Crypto.Random stuff """ |
| 25 |
| 26 from __future__ import nested_scopes |
| 27 |
| 28 __revision__ = "$Id$" |
| 29 |
| 30 import binascii |
| 31 import pprint |
| 32 import unittest |
| 33 import os |
| 34 import time |
| 35 import sys |
| 36 if sys.version_info[0] == 2 and sys.version_info[1] == 1: |
| 37 from Crypto.Util.py21compat import * |
| 38 from Crypto.Util.py3compat import * |
| 39 |
| 40 try: |
| 41 import multiprocessing |
| 42 except ImportError: |
| 43 multiprocessing = None |
| 44 |
| 45 import Crypto.Random._UserFriendlyRNG |
| 46 import Crypto.Random.random |
| 47 |
| 48 class RNGForkTest(unittest.TestCase): |
| 49 |
| 50 def _get_reseed_count(self): |
| 51 """ |
| 52 Get `FortunaAccumulator.reseed_count`, the global count of the |
| 53 number of times that the PRNG has been reseeded. |
| 54 """ |
| 55 rng_singleton = Crypto.Random._UserFriendlyRNG._get_singleton() |
| 56 rng_singleton._lock.acquire() |
| 57 try: |
| 58 return rng_singleton._fa.reseed_count |
| 59 finally: |
| 60 rng_singleton._lock.release() |
| 61 |
| 62 def runTest(self): |
| 63 # Regression test for CVE-2013-1445. We had a bug where, under the |
| 64 # right conditions, two processes might see the same random sequence. |
| 65 |
| 66 if sys.platform.startswith('win'): # windows can't fork |
| 67 assert not hasattr(os, 'fork') # ... right? |
| 68 return |
| 69 |
| 70 # Wait 150 ms so that we don't trigger the rate-limit prematurely. |
| 71 time.sleep(0.15) |
| 72 |
| 73 reseed_count_before = self._get_reseed_count() |
| 74 |
| 75 # One or both of these calls together should trigger a reseed right here
. |
| 76 Crypto.Random._UserFriendlyRNG._get_singleton().reinit() |
| 77 Crypto.Random.get_random_bytes(1) |
| 78 |
| 79 reseed_count_after = self._get_reseed_count() |
| 80 self.assertNotEqual(reseed_count_before, reseed_count_after) # sanity c
heck: test should reseed parent before forking |
| 81 |
| 82 rfiles = [] |
| 83 for i in range(10): |
| 84 rfd, wfd = os.pipe() |
| 85 if os.fork() == 0: |
| 86 # child |
| 87 os.close(rfd) |
| 88 f = os.fdopen(wfd, "wb") |
| 89 |
| 90 Crypto.Random.atfork() |
| 91 |
| 92 data = Crypto.Random.get_random_bytes(16) |
| 93 |
| 94 f.write(data) |
| 95 f.close() |
| 96 os._exit(0) |
| 97 # parent |
| 98 os.close(wfd) |
| 99 rfiles.append(os.fdopen(rfd, "rb")) |
| 100 |
| 101 results = [] |
| 102 results_dict = {} |
| 103 for f in rfiles: |
| 104 data = binascii.hexlify(f.read()) |
| 105 results.append(data) |
| 106 results_dict[data] = 1 |
| 107 f.close() |
| 108 |
| 109 if len(results) != len(results_dict.keys()): |
| 110 raise AssertionError("RNG output duplicated across fork():\n%s" % |
| 111 (pprint.pformat(results))) |
| 112 |
| 113 |
| 114 # For RNGMultiprocessingForkTest |
| 115 def _task_main(q): |
| 116 a = Crypto.Random.get_random_bytes(16) |
| 117 time.sleep(0.1) # wait 100 ms |
| 118 b = Crypto.Random.get_random_bytes(16) |
| 119 q.put(binascii.b2a_hex(a)) |
| 120 q.put(binascii.b2a_hex(b)) |
| 121 q.put(None) # Wait for acknowledgment |
| 122 |
| 123 |
| 124 class RNGMultiprocessingForkTest(unittest.TestCase): |
| 125 |
| 126 def runTest(self): |
| 127 # Another regression test for CVE-2013-1445. This is basically the |
| 128 # same as RNGForkTest, but less compatible with old versions of Python, |
| 129 # and a little easier to read. |
| 130 |
| 131 n_procs = 5 |
| 132 manager = multiprocessing.Manager() |
| 133 queues = [manager.Queue(1) for i in range(n_procs)] |
| 134 |
| 135 # Reseed the pool |
| 136 time.sleep(0.15) |
| 137 Crypto.Random._UserFriendlyRNG._get_singleton().reinit() |
| 138 Crypto.Random.get_random_bytes(1) |
| 139 |
| 140 # Start the child processes |
| 141 pool = multiprocessing.Pool(processes=n_procs, initializer=Crypto.Random
.atfork) |
| 142 map_result = pool.map_async(_task_main, queues) |
| 143 |
| 144 # Get the results, ensuring that no pool processes are reused. |
| 145 aa = [queues[i].get(30) for i in range(n_procs)] |
| 146 bb = [queues[i].get(30) for i in range(n_procs)] |
| 147 res = list(zip(aa, bb)) |
| 148 |
| 149 # Shut down the pool |
| 150 map_result.get(30) |
| 151 pool.close() |
| 152 pool.join() |
| 153 |
| 154 # Check that the results are unique |
| 155 if len(set(aa)) != len(aa) or len(set(res)) != len(res): |
| 156 raise AssertionError("RNG output duplicated across fork():\n%s" % |
| 157 (pprint.pformat(res),)) |
| 158 |
| 159 |
| 160 def get_tests(config={}): |
| 161 tests = [] |
| 162 tests += [RNGForkTest()] |
| 163 if multiprocessing is not None: |
| 164 tests += [RNGMultiprocessingForkTest()] |
| 165 return tests |
| 166 |
| 167 if __name__ == '__main__': |
| 168 suite = lambda: unittest.TestSuite(get_tests()) |
| 169 unittest.main(defaultTest='suite') |
| 170 |
| 171 # vim:set ts=4 sw=4 sts=4 expandtab: |
OLD | NEW |