| Index: third_party/google-endpoints/Crypto/SelfTest/Random/test__UserFriendlyRNG.py
|
| diff --git a/third_party/google-endpoints/Crypto/SelfTest/Random/test__UserFriendlyRNG.py b/third_party/google-endpoints/Crypto/SelfTest/Random/test__UserFriendlyRNG.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..771a6635dfed8cc9991f876c95003252e5898b93
|
| --- /dev/null
|
| +++ b/third_party/google-endpoints/Crypto/SelfTest/Random/test__UserFriendlyRNG.py
|
| @@ -0,0 +1,171 @@
|
| +# -*- coding: utf-8 -*-
|
| +# Self-tests for the user-friendly Crypto.Random interface
|
| +#
|
| +# Written in 2013 by Dwayne C. Litzenberger <dlitz@dlitz.net>
|
| +#
|
| +# ===================================================================
|
| +# The contents of this file are dedicated to the public domain. To
|
| +# the extent that dedication to the public domain is not available,
|
| +# everyone is granted a worldwide, perpetual, royalty-free,
|
| +# non-exclusive license to exercise all rights associated with the
|
| +# contents of this file for any purpose whatsoever.
|
| +# No rights are reserved.
|
| +#
|
| +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
| +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
| +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
| +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
|
| +# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
| +# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
| +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
| +# SOFTWARE.
|
| +# ===================================================================
|
| +
|
| +"""Self-test suite for generic Crypto.Random stuff """
|
| +
|
| +from __future__ import nested_scopes
|
| +
|
| +__revision__ = "$Id$"
|
| +
|
| +import binascii
|
| +import pprint
|
| +import unittest
|
| +import os
|
| +import time
|
| +import sys
|
| +if sys.version_info[0] == 2 and sys.version_info[1] == 1:
|
| + from Crypto.Util.py21compat import *
|
| +from Crypto.Util.py3compat import *
|
| +
|
| +try:
|
| + import multiprocessing
|
| +except ImportError:
|
| + multiprocessing = None
|
| +
|
| +import Crypto.Random._UserFriendlyRNG
|
| +import Crypto.Random.random
|
| +
|
| +class RNGForkTest(unittest.TestCase):
|
| +
|
| + def _get_reseed_count(self):
|
| + """
|
| + Get `FortunaAccumulator.reseed_count`, the global count of the
|
| + number of times that the PRNG has been reseeded.
|
| + """
|
| + rng_singleton = Crypto.Random._UserFriendlyRNG._get_singleton()
|
| + rng_singleton._lock.acquire()
|
| + try:
|
| + return rng_singleton._fa.reseed_count
|
| + finally:
|
| + rng_singleton._lock.release()
|
| +
|
| + def runTest(self):
|
| + # Regression test for CVE-2013-1445. We had a bug where, under the
|
| + # right conditions, two processes might see the same random sequence.
|
| +
|
| + if sys.platform.startswith('win'): # windows can't fork
|
| + assert not hasattr(os, 'fork') # ... right?
|
| + return
|
| +
|
| + # Wait 150 ms so that we don't trigger the rate-limit prematurely.
|
| + time.sleep(0.15)
|
| +
|
| + reseed_count_before = self._get_reseed_count()
|
| +
|
| + # One or both of these calls together should trigger a reseed right here.
|
| + Crypto.Random._UserFriendlyRNG._get_singleton().reinit()
|
| + Crypto.Random.get_random_bytes(1)
|
| +
|
| + reseed_count_after = self._get_reseed_count()
|
| + self.assertNotEqual(reseed_count_before, reseed_count_after) # sanity check: test should reseed parent before forking
|
| +
|
| + rfiles = []
|
| + for i in range(10):
|
| + rfd, wfd = os.pipe()
|
| + if os.fork() == 0:
|
| + # child
|
| + os.close(rfd)
|
| + f = os.fdopen(wfd, "wb")
|
| +
|
| + Crypto.Random.atfork()
|
| +
|
| + data = Crypto.Random.get_random_bytes(16)
|
| +
|
| + f.write(data)
|
| + f.close()
|
| + os._exit(0)
|
| + # parent
|
| + os.close(wfd)
|
| + rfiles.append(os.fdopen(rfd, "rb"))
|
| +
|
| + results = []
|
| + results_dict = {}
|
| + for f in rfiles:
|
| + data = binascii.hexlify(f.read())
|
| + results.append(data)
|
| + results_dict[data] = 1
|
| + f.close()
|
| +
|
| + if len(results) != len(results_dict.keys()):
|
| + raise AssertionError("RNG output duplicated across fork():\n%s" %
|
| + (pprint.pformat(results)))
|
| +
|
| +
|
| +# For RNGMultiprocessingForkTest
|
| +def _task_main(q):
|
| + a = Crypto.Random.get_random_bytes(16)
|
| + time.sleep(0.1) # wait 100 ms
|
| + b = Crypto.Random.get_random_bytes(16)
|
| + q.put(binascii.b2a_hex(a))
|
| + q.put(binascii.b2a_hex(b))
|
| + q.put(None) # Wait for acknowledgment
|
| +
|
| +
|
| +class RNGMultiprocessingForkTest(unittest.TestCase):
|
| +
|
| + def runTest(self):
|
| + # Another regression test for CVE-2013-1445. This is basically the
|
| + # same as RNGForkTest, but less compatible with old versions of Python,
|
| + # and a little easier to read.
|
| +
|
| + n_procs = 5
|
| + manager = multiprocessing.Manager()
|
| + queues = [manager.Queue(1) for i in range(n_procs)]
|
| +
|
| + # Reseed the pool
|
| + time.sleep(0.15)
|
| + Crypto.Random._UserFriendlyRNG._get_singleton().reinit()
|
| + Crypto.Random.get_random_bytes(1)
|
| +
|
| + # Start the child processes
|
| + pool = multiprocessing.Pool(processes=n_procs, initializer=Crypto.Random.atfork)
|
| + map_result = pool.map_async(_task_main, queues)
|
| +
|
| + # Get the results, ensuring that no pool processes are reused.
|
| + aa = [queues[i].get(30) for i in range(n_procs)]
|
| + bb = [queues[i].get(30) for i in range(n_procs)]
|
| + res = list(zip(aa, bb))
|
| +
|
| + # Shut down the pool
|
| + map_result.get(30)
|
| + pool.close()
|
| + pool.join()
|
| +
|
| + # Check that the results are unique
|
| + if len(set(aa)) != len(aa) or len(set(res)) != len(res):
|
| + raise AssertionError("RNG output duplicated across fork():\n%s" %
|
| + (pprint.pformat(res),))
|
| +
|
| +
|
| +def get_tests(config={}):
|
| + tests = []
|
| + tests += [RNGForkTest()]
|
| + if multiprocessing is not None:
|
| + tests += [RNGMultiprocessingForkTest()]
|
| + return tests
|
| +
|
| +if __name__ == '__main__':
|
| + suite = lambda: unittest.TestSuite(get_tests())
|
| + unittest.main(defaultTest='suite')
|
| +
|
| +# vim:set ts=4 sw=4 sts=4 expandtab:
|
|
|