Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(49)

Side by Side Diff: third_party/google-endpoints/Crypto/SelfTest/Random/test__UserFriendlyRNG.py

Issue 2666783008: Add google-endpoints to third_party/. (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # -*- coding: utf-8 -*-
2 # Self-tests for the user-friendly Crypto.Random interface
3 #
4 # Written in 2013 by Dwayne C. Litzenberger <dlitz@dlitz.net>
5 #
6 # ===================================================================
7 # The contents of this file are dedicated to the public domain. To
8 # the extent that dedication to the public domain is not available,
9 # everyone is granted a worldwide, perpetual, royalty-free,
10 # non-exclusive license to exercise all rights associated with the
11 # contents of this file for any purpose whatsoever.
12 # No rights are reserved.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
17 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
18 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
19 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
20 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 # SOFTWARE.
22 # ===================================================================
23
24 """Self-test suite for generic Crypto.Random stuff """
25
26 from __future__ import nested_scopes
27
28 __revision__ = "$Id$"
29
30 import binascii
31 import pprint
32 import unittest
33 import os
34 import time
35 import sys
36 if sys.version_info[0] == 2 and sys.version_info[1] == 1:
37 from Crypto.Util.py21compat import *
38 from Crypto.Util.py3compat import *
39
40 try:
41 import multiprocessing
42 except ImportError:
43 multiprocessing = None
44
45 import Crypto.Random._UserFriendlyRNG
46 import Crypto.Random.random
47
48 class RNGForkTest(unittest.TestCase):
49
50 def _get_reseed_count(self):
51 """
52 Get `FortunaAccumulator.reseed_count`, the global count of the
53 number of times that the PRNG has been reseeded.
54 """
55 rng_singleton = Crypto.Random._UserFriendlyRNG._get_singleton()
56 rng_singleton._lock.acquire()
57 try:
58 return rng_singleton._fa.reseed_count
59 finally:
60 rng_singleton._lock.release()
61
62 def runTest(self):
63 # Regression test for CVE-2013-1445. We had a bug where, under the
64 # right conditions, two processes might see the same random sequence.
65
66 if sys.platform.startswith('win'): # windows can't fork
67 assert not hasattr(os, 'fork') # ... right?
68 return
69
70 # Wait 150 ms so that we don't trigger the rate-limit prematurely.
71 time.sleep(0.15)
72
73 reseed_count_before = self._get_reseed_count()
74
75 # One or both of these calls together should trigger a reseed right here .
76 Crypto.Random._UserFriendlyRNG._get_singleton().reinit()
77 Crypto.Random.get_random_bytes(1)
78
79 reseed_count_after = self._get_reseed_count()
80 self.assertNotEqual(reseed_count_before, reseed_count_after) # sanity c heck: test should reseed parent before forking
81
82 rfiles = []
83 for i in range(10):
84 rfd, wfd = os.pipe()
85 if os.fork() == 0:
86 # child
87 os.close(rfd)
88 f = os.fdopen(wfd, "wb")
89
90 Crypto.Random.atfork()
91
92 data = Crypto.Random.get_random_bytes(16)
93
94 f.write(data)
95 f.close()
96 os._exit(0)
97 # parent
98 os.close(wfd)
99 rfiles.append(os.fdopen(rfd, "rb"))
100
101 results = []
102 results_dict = {}
103 for f in rfiles:
104 data = binascii.hexlify(f.read())
105 results.append(data)
106 results_dict[data] = 1
107 f.close()
108
109 if len(results) != len(results_dict.keys()):
110 raise AssertionError("RNG output duplicated across fork():\n%s" %
111 (pprint.pformat(results)))
112
113
114 # For RNGMultiprocessingForkTest
115 def _task_main(q):
116 a = Crypto.Random.get_random_bytes(16)
117 time.sleep(0.1) # wait 100 ms
118 b = Crypto.Random.get_random_bytes(16)
119 q.put(binascii.b2a_hex(a))
120 q.put(binascii.b2a_hex(b))
121 q.put(None) # Wait for acknowledgment
122
123
124 class RNGMultiprocessingForkTest(unittest.TestCase):
125
126 def runTest(self):
127 # Another regression test for CVE-2013-1445. This is basically the
128 # same as RNGForkTest, but less compatible with old versions of Python,
129 # and a little easier to read.
130
131 n_procs = 5
132 manager = multiprocessing.Manager()
133 queues = [manager.Queue(1) for i in range(n_procs)]
134
135 # Reseed the pool
136 time.sleep(0.15)
137 Crypto.Random._UserFriendlyRNG._get_singleton().reinit()
138 Crypto.Random.get_random_bytes(1)
139
140 # Start the child processes
141 pool = multiprocessing.Pool(processes=n_procs, initializer=Crypto.Random .atfork)
142 map_result = pool.map_async(_task_main, queues)
143
144 # Get the results, ensuring that no pool processes are reused.
145 aa = [queues[i].get(30) for i in range(n_procs)]
146 bb = [queues[i].get(30) for i in range(n_procs)]
147 res = list(zip(aa, bb))
148
149 # Shut down the pool
150 map_result.get(30)
151 pool.close()
152 pool.join()
153
154 # Check that the results are unique
155 if len(set(aa)) != len(aa) or len(set(res)) != len(res):
156 raise AssertionError("RNG output duplicated across fork():\n%s" %
157 (pprint.pformat(res),))
158
159
160 def get_tests(config={}):
161 tests = []
162 tests += [RNGForkTest()]
163 if multiprocessing is not None:
164 tests += [RNGMultiprocessingForkTest()]
165 return tests
166
167 if __name__ == '__main__':
168 suite = lambda: unittest.TestSuite(get_tests())
169 unittest.main(defaultTest='suite')
170
171 # vim:set ts=4 sw=4 sts=4 expandtab:
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698