OLD | NEW |
(Empty) | |
| 1 from __future__ import division |
| 2 try: |
| 3 from builtins import bytes |
| 4 except ImportError: |
| 5 pass |
| 6 #from past.utils import old_div |
| 7 from math import ceil |
| 8 from struct import pack, unpack |
| 9 from Crypto.Cipher import AES |
| 10 from Crypto.Hash import SHA256 |
| 11 from Crypto.Hash import SHA384 |
| 12 from Crypto.Hash import SHA512 |
| 13 from Crypto.Hash import HMAC |
| 14 |
| 15 |
| 16 LENMET = { |
| 17 32: (16, SHA256), |
| 18 48: (24, SHA384), |
| 19 64: (32, SHA512) |
| 20 } |
| 21 |
| 22 |
| 23 # PKCS#5 padding, since it's not in PyCrypto |
| 24 def pkcs5pad(x): |
| 25 """ |
| 26 Add PKCS#5 padding to an octet string |
| 27 |
| 28 :type x: bytes |
| 29 :rtype: bytes |
| 30 """ |
| 31 n = 16 - len(x) % 16 |
| 32 if n == 0: |
| 33 n = 16 |
| 34 ns = pack('B', n) |
| 35 return x + (ns * n) |
| 36 |
| 37 |
| 38 def pkcs5trim(x): |
| 39 """ |
| 40 Trim PKCS#5 padding from an octet string |
| 41 |
| 42 :type x: bytes |
| 43 :rtype: bytes |
| 44 """ |
| 45 n = unpack('B', x[-1:])[0] |
| 46 # Should never have more than 16 bytes of padding |
| 47 # ... since we're only using this with AES |
| 48 if n > 16: |
| 49 raise Exception("Mal-formed PKCS#5 padding") |
| 50 return x[:-n] |
| 51 |
| 52 |
| 53 def get_keys_seclen_dgst(key, iv): |
| 54 # Validate input |
| 55 if len(iv) != 16: |
| 56 raise Exception("IV for AES-CBC must be 16 octets long") |
| 57 |
| 58 # Select the digest to use based on key length |
| 59 try: |
| 60 seclen, dgst = LENMET[len(key)] |
| 61 except KeyError: |
| 62 raise Exception("Invalid CBC+HMAC key length: %s bytes" % len(key)) |
| 63 |
| 64 # Split the key |
| 65 ka = key[:seclen] |
| 66 ke = key[seclen:] |
| 67 |
| 68 return ka, ke, seclen, dgst |
| 69 |
| 70 |
| 71 def aes_cbc_hmac_encrypt(key, iv, aad, pt): |
| 72 """ |
| 73 Perform authenticated encryption with the combined AES-CBC |
| 74 and HMAC algorithm. |
| 75 |
| 76 :param key: key; length MUST be 32, 48, or 64 octets |
| 77 :param iv: Initialization vector; length MUST be 16 octets |
| 78 :param aad: Additional authenticated data |
| 79 :param pt: Plaintext |
| 80 :return: (ciphertext, tag) tuple, with each as bytes |
| 81 """ |
| 82 |
| 83 ka, ke, seclen, dgst = get_keys_seclen_dgst(key, iv) |
| 84 |
| 85 # Encrypt |
| 86 cipher = AES.new(ke, AES.MODE_CBC, iv) |
| 87 ct = cipher.encrypt(pkcs5pad(pt)) |
| 88 |
| 89 # MAC A || IV || E || AL |
| 90 al = pack("!Q", 8*len(aad)) |
| 91 mac_input = aad + iv + ct + al |
| 92 h = HMAC.new(ka, digestmod=dgst) |
| 93 h.update(mac_input) |
| 94 tag = h.digest()[:seclen] |
| 95 return ct, tag |
| 96 |
| 97 |
| 98 def aes_cbc_hmac_decrypt(key, iv, aad, ct, tag): |
| 99 """ |
| 100 Perform authenticated decryption with the combined AES-CBC |
| 101 and HMAC algorithm. |
| 102 |
| 103 :param key : Key; length MUST be 32, 48, or 64 octets |
| 104 :param iv : Initialization vector; length MUST be 16 octets |
| 105 :param aad: Additional authenticated data |
| 106 :param ct : Plaintext |
| 107 :param tag: Authentication tag |
| 108 :return: (plaintext, result) tuple, with plaintext as bytes |
| 109 and result as boolean |
| 110 """ |
| 111 |
| 112 ka, ke, seclen, dgst = get_keys_seclen_dgst(key, iv) |
| 113 |
| 114 # Verify A || IV || E || AL |
| 115 al = pack("!Q", 8*len(aad)) |
| 116 if isinstance(aad, str): |
| 117 aad = aad.encode("utf-8") |
| 118 mac_input = aad + iv + ct + al |
| 119 h = HMAC.new(ka, digestmod=dgst) |
| 120 h.update(mac_input) |
| 121 candidate = h.digest()[:seclen] |
| 122 |
| 123 # Decrypt if verified |
| 124 if candidate == tag: |
| 125 cipher = AES.new(ke, AES.MODE_CBC, iv) |
| 126 pt = pkcs5trim(cipher.decrypt(ct)) |
| 127 return pt, True |
| 128 else: |
| 129 return None, False |
| 130 |
| 131 |
| 132 def concat_sha256(secret, dk_len, other_info): |
| 133 """ |
| 134 The Concat KDF, using SHA256 as the hash function. |
| 135 |
| 136 Note: Does not validate that otherInfo meets the requirements of |
| 137 SP800-56A. |
| 138 |
| 139 :param secret: The shared secret value |
| 140 :param dk_len: Length of key to be derived, in bits |
| 141 :param other_info: Other info to be incorporated (see SP800-56A) |
| 142 :return: The derived key |
| 143 """ |
| 144 dkm = b'' |
| 145 dk_bytes = int(ceil(dk_len / 8.0)) |
| 146 counter = 0 |
| 147 while len(dkm) < dk_bytes: |
| 148 counter += 1 |
| 149 counter_bytes = pack("!I", counter) |
| 150 dkm += SHA256.new(counter_bytes + secret + other_info ).digest() |
| 151 return dkm[:dk_bytes] |
| 152 |
| 153 |
| 154 def ecdh_derive_key(curve, key, epk, apu, apv, alg, dk_len): |
| 155 """ |
| 156 ECDH key derivation, as defined by JWA |
| 157 |
| 158 :param curve: Curve to be used for EC computations |
| 159 :param key : Elliptic curve private key |
| 160 :param epk : Elliptic curve public key (long, long) |
| 161 :param apu : PartyUInfo |
| 162 :param apv : PartyVInfo |
| 163 :param alg : Algorithm identifier |
| 164 :param dk_len: Length of key to be derived, in bits |
| 165 :return: The derived key |
| 166 """ |
| 167 # Compute shared secret |
| 168 Z = curve.dh_z(key, epk) |
| 169 # Derive the key |
| 170 # AlgorithmID || PartyUInfo || PartyVInfo || SuppPubInfo |
| 171 otherInfo = bytes(alg) + \ |
| 172 pack("!I", len(apu)) + apu + \ |
| 173 pack("!I", len(apv)) + apv + \ |
| 174 pack("!I", dk_len) |
| 175 return concat_sha256(Z, dk_len, otherInfo) |
OLD | NEW |