OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/env python |
| 2 |
| 3 """ |
| 4 Copyright (C) 2013 Bo Zhu http://about.bozhu.me |
| 5 |
| 6 Permission is hereby granted, free of charge, to any person obtaining a |
| 7 copy of this software and associated documentation files (the "Software"), |
| 8 to deal in the Software without restriction, including without limitation |
| 9 the rights to use, copy, modify, merge, publish, distribute, sublicense, |
| 10 and/or sell copies of the Software, and to permit persons to whom the |
| 11 Software is furnished to do so, subject to the following conditions: |
| 12 |
| 13 The above copyright notice and this permission notice shall be included in |
| 14 all copies or substantial portions of the Software. |
| 15 |
| 16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| 17 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| 18 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL |
| 19 THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| 20 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING |
| 21 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER |
| 22 DEALINGS IN THE SOFTWARE. |
| 23 """ |
| 24 from __future__ import print_function |
| 25 from __future__ import division |
| 26 try: |
| 27 from builtins import str |
| 28 from builtins import hex |
| 29 from builtins import range |
| 30 from builtins import object |
| 31 except ImportError: |
| 32 pass |
| 33 #from past.utils import old_div |
| 34 |
| 35 from Crypto.Cipher import AES |
| 36 from Crypto.Util import Counter |
| 37 from Crypto.Util.number import long_to_bytes, bytes_to_long |
| 38 |
| 39 |
| 40 # GF(2^128) defined by 1 + a + a^2 + a^7 + a^128 |
| 41 # Please note the MSB is x0 and LSB is x127 |
| 42 def gf_2_128_mul(x, y): |
| 43 assert x < (1 << 128) |
| 44 assert y < (1 << 128) |
| 45 res = 0 |
| 46 for i in range(127, -1, -1): |
| 47 res ^= x * ((y >> i) & 1) # branchless |
| 48 x = (x >> 1) ^ ((x & 1) * 0xE1000000000000000000000000000000) |
| 49 assert res < 1 << 128 |
| 50 return res |
| 51 |
| 52 |
| 53 class InvalidInputException(Exception): |
| 54 def __init__(self, msg): |
| 55 self.msg = msg |
| 56 |
| 57 def __str__(self): |
| 58 return str(self.msg) |
| 59 |
| 60 |
| 61 class InvalidTagException(Exception): |
| 62 def __str__(self): |
| 63 return 'The authenticaiton tag is invalid.' |
| 64 |
| 65 |
| 66 # Galois/Counter Mode with AES-128 and 96-bit IV |
| 67 class AES_GCM(object): |
| 68 def __init__(self, master_key): |
| 69 self.prev_init_value = None |
| 70 self._master_key = "" |
| 71 self._aes_ecb = None |
| 72 self._auth_key = 0 |
| 73 self._pre_table = None |
| 74 self.change_key(master_key) |
| 75 |
| 76 def change_key(self, master_key): |
| 77 #RLB: Need to allow 192-, 256-bit keys |
| 78 #if master_key >= (1 << 128): |
| 79 # raise InvalidInputException('Master key should be 128-bit') |
| 80 |
| 81 self._master_key = long_to_bytes(master_key, 16) |
| 82 self._aes_ecb = AES.new(self._master_key, AES.MODE_ECB) |
| 83 self._auth_key = bytes_to_long(self._aes_ecb.encrypt(b'\x00' * 16)) |
| 84 |
| 85 # precompute the table for multiplication in finite field |
| 86 table = [] # for 8-bit |
| 87 for i in range(16): |
| 88 row = [] |
| 89 for j in range(256): |
| 90 row.append(gf_2_128_mul(self._auth_key, j << (8 * i))) |
| 91 table.append(tuple(row)) |
| 92 self._pre_table = tuple(table) |
| 93 |
| 94 self.prev_init_value = None # reset |
| 95 |
| 96 def __times_auth_key(self, val): |
| 97 res = 0 |
| 98 for i in range(16): |
| 99 res ^= self._pre_table[i][val & 0xFF] |
| 100 val >>= 8 |
| 101 return res |
| 102 |
| 103 def __ghash(self, aad, txt): |
| 104 len_aad = len(aad) |
| 105 len_txt = len(txt) |
| 106 |
| 107 # padding |
| 108 if 0 == len_aad % 16: |
| 109 data = aad |
| 110 else: |
| 111 data = aad + b'\x00' * (16 - len_aad % 16) |
| 112 if 0 == len_txt % 16: |
| 113 data += txt |
| 114 else: |
| 115 data += txt + b'\x00' * (16 - len_txt % 16) |
| 116 |
| 117 tag = 0 |
| 118 assert len(data) % 16 == 0 |
| 119 for i in range(int(len(data) / 16)): |
| 120 tag ^= bytes_to_long(data[i * 16: (i + 1) * 16]) |
| 121 tag = self.__times_auth_key(tag) |
| 122 # print 'X\t', hex(tag) |
| 123 tag ^= ((8 * len_aad) << 64) | (8 * len_txt) |
| 124 tag = self.__times_auth_key(tag) |
| 125 |
| 126 return tag |
| 127 |
| 128 def encrypt(self, init_value, plaintext, auth_data=b''): |
| 129 if init_value >= (1 << 96): |
| 130 raise InvalidInputException('IV should be 96-bit') |
| 131 # a naive checking for IV reuse |
| 132 if init_value == self.prev_init_value: |
| 133 raise InvalidInputException('IV must not be reused!') |
| 134 self.prev_init_value = init_value |
| 135 |
| 136 len_plaintext = len(plaintext) |
| 137 # len_auth_data = len(auth_data) |
| 138 |
| 139 if len_plaintext > 0: |
| 140 counter = Counter.new( |
| 141 nbits=32, |
| 142 prefix=long_to_bytes(init_value, 12), |
| 143 initial_value=2, # notice this |
| 144 allow_wraparound=True) |
| 145 aes_ctr = AES.new(self._master_key, AES.MODE_CTR, counter=counter) |
| 146 |
| 147 if 0 != len_plaintext % 16: |
| 148 padded_plaintext = plaintext + \ |
| 149 b'\x00' * (16 - len_plaintext % 16) |
| 150 else: |
| 151 padded_plaintext = plaintext |
| 152 ciphertext = aes_ctr.encrypt(padded_plaintext)[:len_plaintext] |
| 153 |
| 154 else: |
| 155 ciphertext = b'' |
| 156 |
| 157 auth_tag = self.__ghash(auth_data, ciphertext) |
| 158 # print 'GHASH\t', hex(auth_tag) |
| 159 auth_tag ^= bytes_to_long(self._aes_ecb.encrypt( |
| 160 long_to_bytes((init_value << 32) | 1, 16))) |
| 161 |
| 162 # assert len(ciphertext) == len(plaintext) |
| 163 assert auth_tag < (1 << 128) |
| 164 return ciphertext, auth_tag |
| 165 |
| 166 def decrypt(self, init_value, ciphertext, auth_tag, auth_data=b''): |
| 167 if init_value >= (1 << 96): |
| 168 raise InvalidInputException('IV should be 96-bit') |
| 169 if auth_tag >= (1 << 128): |
| 170 raise InvalidInputException('Tag should be 128-bit') |
| 171 |
| 172 if auth_tag != self.__ghash(auth_data, ciphertext) ^ \ |
| 173 bytes_to_long(self._aes_ecb.encrypt( |
| 174 long_to_bytes((init_value << 32) | 1, 16))): |
| 175 raise InvalidTagException |
| 176 |
| 177 len_ciphertext = len(ciphertext) |
| 178 if len_ciphertext > 0: |
| 179 counter = Counter.new( |
| 180 nbits=32, |
| 181 prefix=long_to_bytes(init_value, 12), |
| 182 initial_value=2, |
| 183 allow_wraparound=True) |
| 184 aes_ctr = AES.new(self._master_key, AES.MODE_CTR, counter=counter) |
| 185 |
| 186 if 0 != len_ciphertext % 16: |
| 187 padded_ciphertext = ciphertext + \ |
| 188 b'\x00' * (16 - len_ciphertext % 16) |
| 189 else: |
| 190 padded_ciphertext = ciphertext |
| 191 plaintext = aes_ctr.decrypt(padded_ciphertext)[:len_ciphertext] |
| 192 |
| 193 else: |
| 194 plaintext = b'' |
| 195 |
| 196 return plaintext |
| 197 |
| 198 |
| 199 if __name__ == '__main__': |
| 200 master_key = 0xfeffe9928665731c6d6a8f9467308308 |
| 201 plaintext = b'\xd9\x31\x32\x25\xf8\x84\x06\xe5' + \ |
| 202 b'\xa5\x59\x09\xc5\xaf\xf5\x26\x9a' + \ |
| 203 b'\x86\xa7\xa9\x53\x15\x34\xf7\xda' + \ |
| 204 b'\x2e\x4c\x30\x3d\x8a\x31\x8a\x72' + \ |
| 205 b'\x1c\x3c\x0c\x95\x95\x68\x09\x53' + \ |
| 206 b'\x2f\xcf\x0e\x24\x49\xa6\xb5\x25' + \ |
| 207 b'\xb1\x6a\xed\xf5\xaa\x0d\xe6\x57' + \ |
| 208 b'\xba\x63\x7b\x39' |
| 209 auth_data = b'\xfe\xed\xfa\xce\xde\xad\xbe\xef' + \ |
| 210 b'\xfe\xed\xfa\xce\xde\xad\xbe\xef' + \ |
| 211 b'\xab\xad\xda\xd2' |
| 212 init_value = 0xcafebabefacedbaddecaf888 |
| 213 ciphertext = b'\x42\x83\x1e\xc2\x21\x77\x74\x24' + \ |
| 214 b'\x4b\x72\x21\xb7\x84\xd0\xd4\x9c' + \ |
| 215 b'\xe3\xaa\x21\x2f\x2c\x02\xa4\xe0' + \ |
| 216 b'\x35\xc1\x7e\x23\x29\xac\xa1\x2e' + \ |
| 217 b'\x21\xd5\x14\xb2\x54\x66\x93\x1c' + \ |
| 218 b'\x7d\x8f\x6a\x5a\xac\x84\xaa\x05' + \ |
| 219 b'\x1b\xa3\x0b\x39\x6a\x0a\xac\x97' + \ |
| 220 b'\x3d\x58\xe0\x91' |
| 221 auth_tag = 0x5bc94fbc3221a5db94fae95ae7121a47 |
| 222 |
| 223 print('plaintext:', hex(bytes_to_long(plaintext))) |
| 224 |
| 225 my_gcm = AES_GCM(master_key) |
| 226 encrypted, new_tag = my_gcm.encrypt(init_value, plaintext, auth_data) |
| 227 print('encrypted:', hex(bytes_to_long(encrypted))) |
| 228 print('auth tag: ', hex(new_tag)) |
| 229 |
| 230 try: |
| 231 decrypted = my_gcm.decrypt(init_value, encrypted, new_tag + 1, |
| 232 auth_data) |
| 233 except InvalidTagException: |
| 234 decrypted = my_gcm.decrypt(init_value, encrypted, new_tag, auth_data) |
| 235 print('decrypted:', hex(bytes_to_long(decrypted))) |
OLD | NEW |