OLD | NEW |
(Empty) | |
| 1 # -*- coding: utf-8 -*- |
| 2 # |
| 3 # SelfTest/Hash/common.py: Common code for Crypto.SelfTest.Hash |
| 4 # |
| 5 # Written in 2008 by Dwayne C. Litzenberger <dlitz@dlitz.net> |
| 6 # |
| 7 # =================================================================== |
| 8 # The contents of this file are dedicated to the public domain. To |
| 9 # the extent that dedication to the public domain is not available, |
| 10 # everyone is granted a worldwide, perpetual, royalty-free, |
| 11 # non-exclusive license to exercise all rights associated with the |
| 12 # contents of this file for any purpose whatsoever. |
| 13 # No rights are reserved. |
| 14 # |
| 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
| 16 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
| 17 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND |
| 18 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS |
| 19 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN |
| 20 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN |
| 21 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
| 22 # SOFTWARE. |
| 23 # =================================================================== |
| 24 |
| 25 """Self-testing for PyCrypto hash modules""" |
| 26 |
| 27 __revision__ = "$Id$" |
| 28 |
| 29 import sys |
| 30 import unittest |
| 31 from binascii import a2b_hex, b2a_hex |
| 32 from Crypto.Util.py3compat import * |
| 33 |
| 34 # For compatibility with Python 2.1 and Python 2.2 |
| 35 if sys.hexversion < 0x02030000: |
| 36 # Python 2.1 doesn't have a dict() function |
| 37 # Python 2.2 dict() function raises TypeError if you do dict(MD5='blah') |
| 38 def dict(**kwargs): |
| 39 return kwargs.copy() |
| 40 else: |
| 41 dict = dict |
| 42 |
| 43 class _NoDefault: pass # sentinel object |
| 44 def _extract(d, k, default=_NoDefault): |
| 45 """Get an item from a dictionary, and remove it from the dictionary.""" |
| 46 try: |
| 47 retval = d[k] |
| 48 except KeyError: |
| 49 if default is _NoDefault: |
| 50 raise |
| 51 return default |
| 52 del d[k] |
| 53 return retval |
| 54 |
| 55 # Generic cipher test case |
| 56 class CipherSelfTest(unittest.TestCase): |
| 57 |
| 58 def __init__(self, module, params): |
| 59 unittest.TestCase.__init__(self) |
| 60 self.module = module |
| 61 |
| 62 # Extract the parameters |
| 63 params = params.copy() |
| 64 self.description = _extract(params, 'description') |
| 65 self.key = b(_extract(params, 'key')) |
| 66 self.plaintext = b(_extract(params, 'plaintext')) |
| 67 self.ciphertext = b(_extract(params, 'ciphertext')) |
| 68 self.module_name = _extract(params, 'module_name', None) |
| 69 |
| 70 mode = _extract(params, 'mode', None) |
| 71 self.mode_name = str(mode) |
| 72 if mode is not None: |
| 73 # Block cipher |
| 74 self.mode = getattr(self.module, "MODE_" + mode) |
| 75 self.iv = _extract(params, 'iv', None) |
| 76 if self.iv is not None: self.iv = b(self.iv) |
| 77 |
| 78 # Only relevant for OPENPGP mode |
| 79 self.encrypted_iv = _extract(params, 'encrypted_iv', None) |
| 80 if self.encrypted_iv is not None: |
| 81 self.encrypted_iv = b(self.encrypted_iv) |
| 82 else: |
| 83 # Stream cipher |
| 84 self.mode = None |
| 85 self.iv = None |
| 86 |
| 87 self.extra_params = params |
| 88 |
| 89 def shortDescription(self): |
| 90 return self.description |
| 91 |
| 92 def _new(self, do_decryption=0): |
| 93 params = self.extra_params.copy() |
| 94 |
| 95 # Handle CTR mode parameters. By default, we use Counter.new(self.modul
e.block_size) |
| 96 if hasattr(self.module, "MODE_CTR") and self.mode == self.module.MODE_CT
R: |
| 97 from Crypto.Util import Counter |
| 98 ctr_class = _extract(params, 'ctr_class', Counter.new) |
| 99 ctr_params = _extract(params, 'ctr_params', {}).copy() |
| 100 if ctr_params.has_key('prefix'): ctr_params['prefix'] = a2b_hex(b(ct
r_params['prefix'])) |
| 101 if ctr_params.has_key('suffix'): ctr_params['suffix'] = a2b_hex(b(ct
r_params['suffix'])) |
| 102 if not ctr_params.has_key('nbits'): |
| 103 ctr_params['nbits'] = 8*(self.module.block_size - len(ctr_params
.get('prefix', '')) - len(ctr_params.get('suffix', ''))) |
| 104 params['counter'] = ctr_class(**ctr_params) |
| 105 |
| 106 if self.mode is None: |
| 107 # Stream cipher |
| 108 return self.module.new(a2b_hex(self.key), **params) |
| 109 elif self.iv is None: |
| 110 # Block cipher without iv |
| 111 return self.module.new(a2b_hex(self.key), self.mode, **params) |
| 112 else: |
| 113 # Block cipher with iv |
| 114 if do_decryption and self.mode == self.module.MODE_OPENPGP: |
| 115 # In PGP mode, the IV to feed for decryption is the *encrypted*
one |
| 116 return self.module.new(a2b_hex(self.key), self.mode, a2b_hex(sel
f.encrypted_iv), **params) |
| 117 else: |
| 118 return self.module.new(a2b_hex(self.key), self.mode, a2b_hex(sel
f.iv), **params) |
| 119 |
| 120 def runTest(self): |
| 121 plaintext = a2b_hex(self.plaintext) |
| 122 ciphertext = a2b_hex(self.ciphertext) |
| 123 |
| 124 ct1 = b2a_hex(self._new().encrypt(plaintext)) |
| 125 pt1 = b2a_hex(self._new(1).decrypt(ciphertext)) |
| 126 ct2 = b2a_hex(self._new().encrypt(plaintext)) |
| 127 pt2 = b2a_hex(self._new(1).decrypt(ciphertext)) |
| 128 |
| 129 if hasattr(self.module, "MODE_OPENPGP") and self.mode == self.module.MOD
E_OPENPGP: |
| 130 # In PGP mode, data returned by the first encrypt() |
| 131 # is prefixed with the encrypted IV. |
| 132 # Here we check it and then remove it from the ciphertexts. |
| 133 eilen = len(self.encrypted_iv) |
| 134 self.assertEqual(self.encrypted_iv, ct1[:eilen]) |
| 135 self.assertEqual(self.encrypted_iv, ct2[:eilen]) |
| 136 ct1 = ct1[eilen:] |
| 137 ct2 = ct2[eilen:] |
| 138 |
| 139 self.assertEqual(self.ciphertext, ct1) # encrypt |
| 140 self.assertEqual(self.ciphertext, ct2) # encrypt (second time) |
| 141 self.assertEqual(self.plaintext, pt1) # decrypt |
| 142 self.assertEqual(self.plaintext, pt2) # decrypt (second time) |
| 143 |
| 144 class CipherStreamingSelfTest(CipherSelfTest): |
| 145 |
| 146 def shortDescription(self): |
| 147 desc = self.module_name |
| 148 if self.mode is not None: |
| 149 desc += " in %s mode" % (self.mode_name,) |
| 150 return "%s should behave like a stream cipher" % (desc,) |
| 151 |
| 152 def runTest(self): |
| 153 plaintext = a2b_hex(self.plaintext) |
| 154 ciphertext = a2b_hex(self.ciphertext) |
| 155 |
| 156 # The cipher should work like a stream cipher |
| 157 |
| 158 # Test counter mode encryption, 3 bytes at a time |
| 159 ct3 = [] |
| 160 cipher = self._new() |
| 161 for i in range(0, len(plaintext), 3): |
| 162 ct3.append(cipher.encrypt(plaintext[i:i+3])) |
| 163 ct3 = b2a_hex(b("").join(ct3)) |
| 164 self.assertEqual(self.ciphertext, ct3) # encryption (3 bytes at a time) |
| 165 |
| 166 # Test counter mode decryption, 3 bytes at a time |
| 167 pt3 = [] |
| 168 cipher = self._new() |
| 169 for i in range(0, len(ciphertext), 3): |
| 170 pt3.append(cipher.encrypt(ciphertext[i:i+3])) |
| 171 # PY3K: This is meant to be text, do not change to bytes (data) |
| 172 pt3 = b2a_hex(b("").join(pt3)) |
| 173 self.assertEqual(self.plaintext, pt3) # decryption (3 bytes at a time) |
| 174 |
| 175 class CTRSegfaultTest(unittest.TestCase): |
| 176 |
| 177 def __init__(self, module, params): |
| 178 unittest.TestCase.__init__(self) |
| 179 self.module = module |
| 180 self.key = b(params['key']) |
| 181 self.module_name = params.get('module_name', None) |
| 182 |
| 183 def shortDescription(self): |
| 184 return """Regression test: %s.new(key, %s.MODE_CTR) should raise TypeErr
or, not segfault""" % (self.module_name, self.module_name) |
| 185 |
| 186 def runTest(self): |
| 187 self.assertRaises(TypeError, self.module.new, a2b_hex(self.key), self.mo
dule.MODE_CTR) |
| 188 |
| 189 class CTRWraparoundTest(unittest.TestCase): |
| 190 |
| 191 def __init__(self, module, params): |
| 192 unittest.TestCase.__init__(self) |
| 193 self.module = module |
| 194 self.key = b(params['key']) |
| 195 self.module_name = params.get('module_name', None) |
| 196 |
| 197 def shortDescription(self): |
| 198 return """Regression test: %s with MODE_CTR should raise OverflowError o
n wraparound when shortcut used""" % (self.module_name,) |
| 199 |
| 200 def runTest(self): |
| 201 from Crypto.Util import Counter |
| 202 |
| 203 for disable_shortcut in (0, 1): # (False, True) Test CTR-mode shortcut a
nd PyObject_CallObject code paths |
| 204 for little_endian in (0, 1): # (False, True) Test both endiannesses |
| 205 ctr = Counter.new(8*self.module.block_size, initial_value=2L**(8
*self.module.block_size)-1, little_endian=little_endian, disable_shortcut=disabl
e_shortcut) |
| 206 cipher = self.module.new(a2b_hex(self.key), self.module.MODE_CTR
, counter=ctr) |
| 207 block = b("\x00") * self.module.block_size |
| 208 cipher.encrypt(block) |
| 209 self.assertRaises(OverflowError, cipher.encrypt, block) |
| 210 |
| 211 class CFBSegmentSizeTest(unittest.TestCase): |
| 212 |
| 213 def __init__(self, module, params): |
| 214 unittest.TestCase.__init__(self) |
| 215 self.module = module |
| 216 self.key = b(params['key']) |
| 217 self.description = params['description'] |
| 218 |
| 219 def shortDescription(self): |
| 220 return self.description |
| 221 |
| 222 def runTest(self): |
| 223 """Regression test: m.new(key, m.MODE_CFB, segment_size=N) should requir
e segment_size to be a multiple of 8 bits""" |
| 224 for i in range(1, 8): |
| 225 self.assertRaises(ValueError, self.module.new, a2b_hex(self.key), se
lf.module.MODE_CFB, segment_size=i) |
| 226 self.module.new(a2b_hex(self.key), self.module.MODE_CFB, "\0"*self.modul
e.block_size, segment_size=8) # should succeed |
| 227 |
| 228 class RoundtripTest(unittest.TestCase): |
| 229 def __init__(self, module, params): |
| 230 from Crypto import Random |
| 231 unittest.TestCase.__init__(self) |
| 232 self.module = module |
| 233 self.iv = Random.get_random_bytes(module.block_size) |
| 234 self.key = b(params['key']) |
| 235 self.plaintext = 100 * b(params['plaintext']) |
| 236 self.module_name = params.get('module_name', None) |
| 237 |
| 238 def shortDescription(self): |
| 239 return """%s .decrypt() output of .encrypt() should not be garbled""" %
(self.module_name,) |
| 240 |
| 241 def runTest(self): |
| 242 for mode in (self.module.MODE_ECB, self.module.MODE_CBC, self.module.MOD
E_CFB, self.module.MODE_OFB, self.module.MODE_OPENPGP): |
| 243 encryption_cipher = self.module.new(a2b_hex(self.key), mode, self.iv
) |
| 244 ciphertext = encryption_cipher.encrypt(self.plaintext) |
| 245 |
| 246 if mode != self.module.MODE_OPENPGP: |
| 247 decryption_cipher = self.module.new(a2b_hex(self.key), mode, sel
f.iv) |
| 248 else: |
| 249 eiv = ciphertext[:self.module.block_size+2] |
| 250 ciphertext = ciphertext[self.module.block_size+2:] |
| 251 decryption_cipher = self.module.new(a2b_hex(self.key), mode, eiv
) |
| 252 decrypted_plaintext = decryption_cipher.decrypt(ciphertext) |
| 253 self.assertEqual(self.plaintext, decrypted_plaintext) |
| 254 |
| 255 class PGPTest(unittest.TestCase): |
| 256 def __init__(self, module, params): |
| 257 unittest.TestCase.__init__(self) |
| 258 self.module = module |
| 259 self.key = b(params['key']) |
| 260 |
| 261 def shortDescription(self): |
| 262 return "MODE_PGP was implemented incorrectly and insecurely. It's comple
tely banished now." |
| 263 |
| 264 def runTest(self): |
| 265 self.assertRaises(ValueError, self.module.new, a2b_hex(self.key), |
| 266 self.module.MODE_PGP) |
| 267 |
| 268 class IVLengthTest(unittest.TestCase): |
| 269 def __init__(self, module, params): |
| 270 unittest.TestCase.__init__(self) |
| 271 self.module = module |
| 272 self.key = b(params['key']) |
| 273 |
| 274 def shortDescription(self): |
| 275 return "Check that all modes except MODE_ECB and MODE_CTR require an IV
of the proper length" |
| 276 |
| 277 def runTest(self): |
| 278 self.assertRaises(ValueError, self.module.new, a2b_hex(self.key), |
| 279 self.module.MODE_CBC, "") |
| 280 self.assertRaises(ValueError, self.module.new, a2b_hex(self.key), |
| 281 self.module.MODE_CFB, "") |
| 282 self.assertRaises(ValueError, self.module.new, a2b_hex(self.key), |
| 283 self.module.MODE_OFB, "") |
| 284 self.assertRaises(ValueError, self.module.new, a2b_hex(self.key), |
| 285 self.module.MODE_OPENPGP, "") |
| 286 self.module.new(a2b_hex(self.key), self.module.MODE_ECB, "") |
| 287 self.module.new(a2b_hex(self.key), self.module.MODE_CTR, "", counter=sel
f._dummy_counter) |
| 288 |
| 289 def _dummy_counter(self): |
| 290 return "\0" * self.module.block_size |
| 291 |
| 292 def make_block_tests(module, module_name, test_data): |
| 293 tests = [] |
| 294 extra_tests_added = 0 |
| 295 for i in range(len(test_data)): |
| 296 row = test_data[i] |
| 297 |
| 298 # Build the "params" dictionary |
| 299 params = {'mode': 'ECB'} |
| 300 if len(row) == 3: |
| 301 (params['plaintext'], params['ciphertext'], params['key']) = row |
| 302 elif len(row) == 4: |
| 303 (params['plaintext'], params['ciphertext'], params['key'], params['d
escription']) = row |
| 304 elif len(row) == 5: |
| 305 (params['plaintext'], params['ciphertext'], params['key'], params['d
escription'], extra_params) = row |
| 306 params.update(extra_params) |
| 307 else: |
| 308 raise AssertionError("Unsupported tuple size %d" % (len(row),)) |
| 309 |
| 310 # Build the display-name for the test |
| 311 p2 = params.copy() |
| 312 p_key = _extract(p2, 'key') |
| 313 p_plaintext = _extract(p2, 'plaintext') |
| 314 p_ciphertext = _extract(p2, 'ciphertext') |
| 315 p_description = _extract(p2, 'description', None) |
| 316 p_mode = p2.get('mode', 'ECB') |
| 317 if p_mode == 'ECB': |
| 318 _extract(p2, 'mode', 'ECB') |
| 319 |
| 320 if p_description is not None: |
| 321 description = p_description |
| 322 elif p_mode == 'ECB' and not p2: |
| 323 description = "p=%s, k=%s" % (p_plaintext, p_key) |
| 324 else: |
| 325 description = "p=%s, k=%s, %r" % (p_plaintext, p_key, p2) |
| 326 name = "%s #%d: %s" % (module_name, i+1, description) |
| 327 params['description'] = name |
| 328 params['module_name'] = module_name |
| 329 |
| 330 # Add extra test(s) to the test suite before the current test |
| 331 if not extra_tests_added: |
| 332 tests += [ |
| 333 CTRSegfaultTest(module, params), |
| 334 CTRWraparoundTest(module, params), |
| 335 CFBSegmentSizeTest(module, params), |
| 336 RoundtripTest(module, params), |
| 337 PGPTest(module, params), |
| 338 IVLengthTest(module, params), |
| 339 ] |
| 340 extra_tests_added = 1 |
| 341 |
| 342 # Add the current test to the test suite |
| 343 tests.append(CipherSelfTest(module, params)) |
| 344 |
| 345 # When using CTR mode, test that the interface behaves like a stream cip
her |
| 346 if p_mode == 'CTR': |
| 347 tests.append(CipherStreamingSelfTest(module, params)) |
| 348 |
| 349 # When using CTR mode, test the non-shortcut code path. |
| 350 if p_mode == 'CTR' and not params.has_key('ctr_class'): |
| 351 params2 = params.copy() |
| 352 params2['description'] += " (shortcut disabled)" |
| 353 ctr_params2 = params.get('ctr_params', {}).copy() |
| 354 params2['ctr_params'] = ctr_params2 |
| 355 if not params2['ctr_params'].has_key('disable_shortcut'): |
| 356 params2['ctr_params']['disable_shortcut'] = 1 |
| 357 tests.append(CipherSelfTest(module, params2)) |
| 358 return tests |
| 359 |
| 360 def make_stream_tests(module, module_name, test_data): |
| 361 tests = [] |
| 362 for i in range(len(test_data)): |
| 363 row = test_data[i] |
| 364 |
| 365 # Build the "params" dictionary |
| 366 params = {} |
| 367 if len(row) == 3: |
| 368 (params['plaintext'], params['ciphertext'], params['key']) = row |
| 369 elif len(row) == 4: |
| 370 (params['plaintext'], params['ciphertext'], params['key'], params['d
escription']) = row |
| 371 elif len(row) == 5: |
| 372 (params['plaintext'], params['ciphertext'], params['key'], params['d
escription'], extra_params) = row |
| 373 params.update(extra_params) |
| 374 else: |
| 375 raise AssertionError("Unsupported tuple size %d" % (len(row),)) |
| 376 |
| 377 # Build the display-name for the test |
| 378 p2 = params.copy() |
| 379 p_key = _extract(p2, 'key') |
| 380 p_plaintext = _extract(p2, 'plaintext') |
| 381 p_ciphertext = _extract(p2, 'ciphertext') |
| 382 p_description = _extract(p2, 'description', None) |
| 383 |
| 384 if p_description is not None: |
| 385 description = p_description |
| 386 elif not p2: |
| 387 description = "p=%s, k=%s" % (p_plaintext, p_key) |
| 388 else: |
| 389 description = "p=%s, k=%s, %r" % (p_plaintext, p_key, p2) |
| 390 name = "%s #%d: %s" % (module_name, i+1, description) |
| 391 params['description'] = name |
| 392 params['module_name'] = module_name |
| 393 |
| 394 # Add the test to the test suite |
| 395 tests.append(CipherSelfTest(module, params)) |
| 396 tests.append(CipherStreamingSelfTest(module, params)) |
| 397 return tests |
| 398 |
| 399 # vim:set ts=4 sw=4 sts=4 expandtab: |
OLD | NEW |