Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(306)

Side by Side Diff: third_party/google-endpoints/jwkest/jwe.py

Issue 2666783008: Add google-endpoints to third_party/. (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # from future import standard_library
2 # standard_library.install_aliases()
3 try:
4 from builtins import object
5 except ImportError:
6 pass
7
8 import struct
9 import io
10 import logging
11 import zlib
12 import six
13
14 from Crypto import Random
15 from Crypto.Hash import SHA
16 from Crypto.Util.number import bytes_to_long
17 from Crypto.Util.number import long_to_bytes
18 from Crypto.Cipher import PKCS1_v1_5
19 from Crypto.Cipher import PKCS1_OAEP
20
21 from jwkest import b64d, as_bytes
22 from jwkest import b64e
23 from jwkest import JWKESTException
24 from jwkest import MissingKey
25 from jwkest.aes_gcm import AES_GCM
26 from jwkest.aes_key_wrap import aes_wrap_key
27 from jwkest.aes_key_wrap import aes_unwrap_key
28 from jwkest.ecc import NISTEllipticCurve
29 from jwkest.extra import aes_cbc_hmac_encrypt
30 from jwkest.extra import ecdh_derive_key
31 from jwkest.extra import aes_cbc_hmac_decrypt
32 from jwkest.jwk import intarr2str
33 from jwkest.jwk import ECKey
34 from jwkest.jws import JWx
35 from jwkest.jwt import JWT, b64encode_item
36
37 logger = logging.getLogger(__name__)
38
39 __author__ = 'rohe0002'
40
41 ENC = 1
42 DEC = 0
43
44
45 class JWEException(JWKESTException):
46 pass
47
48
49 class CannotDecode(JWEException):
50 pass
51
52
53 class NotSupportedAlgorithm(JWEException):
54 pass
55
56
57 class MethodNotSupported(JWEException):
58 pass
59
60
61 class ParameterError(JWEException):
62 pass
63
64
65 class NoSuitableEncryptionKey(JWEException):
66 pass
67
68
69 class NoSuitableDecryptionKey(JWEException):
70 pass
71
72
73 class DecryptionFailed(JWEException):
74 pass
75
76
77 class WrongEncryptionAlgorithm(JWEException):
78 pass
79
80
81 # ---------------------------------------------------------------------------
82 # Base class
83
84 KEYLEN = {
85 "A128GCM": 128,
86 "A192GCM": 192,
87 "A256GCM": 256,
88 "A128CBC-HS256": 256,
89 "A192CBC-HS384": 384,
90 "A256CBC-HS512": 512
91 }
92
93
94 class Encrypter(object):
95 """Abstract base class for encryption algorithms."""
96
97 def __init__(self, with_digest=False):
98 self.with_digest = with_digest
99
100 def encrypt(self, msg, key):
101 """Encrypt ``msg`` with ``key`` and return the encrypted message."""
102 raise NotImplementedError
103
104 def decrypt(self, msg, key):
105 """Return decrypted message."""
106 raise NotImplementedError
107
108
109 class RSAEncrypter(Encrypter):
110 def encrypt(self, msg, key, padding="pkcs1_padding"):
111 if padding == "pkcs1_padding":
112 cipher = PKCS1_v1_5.new(key)
113 if self.with_digest: # add a SHA digest to the message
114 h = SHA.new(msg)
115 msg += h.digest()
116 elif padding == "pkcs1_oaep_padding":
117 cipher = PKCS1_OAEP.new(key)
118 else:
119 raise Exception("Unsupported padding")
120 return cipher.encrypt(msg)
121
122 def decrypt(self, ciphertext, key, padding="pkcs1_padding"):
123 if padding == "pkcs1_padding":
124 cipher = PKCS1_v1_5.new(key)
125 if self.with_digest:
126 dsize = SHA.digest_size
127 else:
128 dsize = 0
129 sentinel = Random.new().read(32 + dsize)
130 text = cipher.decrypt(ciphertext, sentinel)
131 if dsize:
132 _digest = text[-dsize:]
133 _msg = text[:-dsize]
134 digest = SHA.new(_msg).digest()
135 if digest == _digest:
136 text = _msg
137 else:
138 raise DecryptionFailed()
139 else:
140 if text == sentinel:
141 raise DecryptionFailed()
142 elif padding == "pkcs1_oaep_padding":
143 cipher = PKCS1_OAEP.new(key)
144 text = cipher.decrypt(ciphertext)
145 else:
146 raise Exception("Unsupported padding")
147
148 return text
149
150
151 # ---------------------------------------------------------------------------
152
153
154 def int2bigendian(n):
155 return [ord(c) for c in struct.pack('>I', n)]
156
157
158 def party_value(pv):
159 if pv:
160 s = b64e(pv)
161 r = int2bigendian(len(s))
162 r.extend(s)
163 return r
164 else:
165 return [0, 0, 0, 0]
166
167
168 def _hash_input(cmk, enc, label, rond=1, length=128, hashsize=256,
169 epu="", epv=""):
170 r = [0, 0, 0, rond]
171 r.extend(cmk)
172 r.extend([0, 0, 0, length])
173 r.extend([ord(c) for c in enc])
174 r.extend(party_value(epu))
175 r.extend(party_value(epv))
176 r.extend(label)
177 return r
178
179
180 # ---------------------------------------------------------------------------
181
182 def cipher_filter(cipher, inf, outf):
183 while 1:
184 buf = inf.read()
185 if not buf:
186 break
187 outf.write(cipher.update(buf))
188 outf.write(cipher.final())
189 return outf.getvalue()
190
191
192 def aes_enc(key, txt):
193 pbuf = io.StringIO(txt)
194 cbuf = io.StringIO()
195 ciphertext = cipher_filter(key, pbuf, cbuf)
196 pbuf.close()
197 cbuf.close()
198 return ciphertext
199
200
201 def aes_dec(key, ciptxt):
202 pbuf = io.StringIO()
203 cbuf = io.StringIO(ciptxt)
204 plaintext = cipher_filter(key, cbuf, pbuf)
205 pbuf.close()
206 cbuf.close()
207 return plaintext
208
209
210 def keysize(spec):
211 if spec.startswith("HS"):
212 return int(spec[2:])
213 elif spec.startswith("CS"):
214 return int(spec[2:])
215 elif spec.startswith("A"):
216 return int(spec[1:4])
217 return 0
218
219
220 ENC2ALG = {"A128CBC": "aes_128_cbc", "A192CBC": "aes_192_cbc",
221 "A256CBC": "aes_256_cbc"}
222
223 SUPPORTED = {
224 "alg": ["RSA1_5", "RSA-OAEP", "A128KW", "A192KW", "A256KW",
225 "ECDH-ES", "ECDH-ES+A128KW", "ECDH-ES+A192KW", "ECDH-ES+A256KW"],
226 "enc": ["A128CBC-HS256", "A192CBC-HS384", "A256CBC-HS512",
227 # "A128GCM", "A192GCM",
228 "A256GCM"],
229 }
230
231
232 def alg2keytype(alg):
233 if alg.startswith("RSA"):
234 return "RSA"
235 elif alg.startswith("A"):
236 return "oct"
237 elif alg.startswith("ECDH"):
238 return "EC"
239 else:
240 return None
241
242
243 # =============================================================================
244
245 ENCALGLEN1 = {
246 "A128GCM": 16,
247 "A192GCM": 24,
248 "A256GCM": 32
249 }
250
251 ENCALGLEN2 = {
252 "A128CBC-HS256": 32,
253 "A192CBC-HS384": 48,
254 "A256CBC-HS512": 64,
255 }
256
257
258 class JWEnc(JWT):
259 def b64_protected_header(self):
260 return self.b64part[0]
261
262 def b64_encrypted_key(self):
263 return self.b64part[1]
264
265 def b64_initialization_vector(self):
266 return self.b64part[2]
267
268 def b64_ciphertext(self):
269 return self.b64part[3]
270
271 def b64_authentication_tag(self):
272 return self.b64part[4]
273
274 def protected_header(self):
275 return self.part[0]
276
277 def encrypted_key(self):
278 return self.part[1]
279
280 def initialization_vector(self):
281 return self.part[2]
282
283 def ciphertext(self):
284 return self.part[3]
285
286 def authentication_tag(self):
287 return self.part[4]
288
289 def b64_encode_header(self):
290 return b64encode_item(self.headers)
291
292 def is_jwe(self):
293 if "typ" in self.headers and self.headers["typ"].lower() == "jwe":
294 return True
295
296 try:
297 assert "alg" in self.headers and "enc" in self.headers
298 except AssertionError:
299 return False
300 else:
301 for typ in ["alg", "enc"]:
302 try:
303 assert self.headers[typ] in SUPPORTED[typ]
304 except AssertionError:
305 logger.debug("Not supported %s algorithm: %s" % (
306 typ, self.headers[typ]))
307 return False
308 return True
309
310
311 class JWe(JWx):
312 @staticmethod
313 def _generate_key_and_iv(encalg, cek="", iv=""):
314 if cek and iv:
315 return cek, iv
316
317 try:
318 _key = Random.get_random_bytes(ENCALGLEN1[encalg])
319 _iv = Random.get_random_bytes(12)
320 except KeyError:
321 try:
322 _key = Random.get_random_bytes(ENCALGLEN2[encalg])
323 _iv = Random.get_random_bytes(16)
324 except KeyError:
325 raise Exception("Unsupported encryption algorithm %s" % encalg)
326 if cek:
327 _key = cek
328 if iv:
329 _iv = iv
330
331 return _key, _iv
332
333 def alg2keytype(self, alg):
334 return alg2keytype(alg)
335
336 def enc_setup(self, enc_alg, msg, auth_data, key=None, iv=""):
337 """ Encrypt JWE content.
338
339 :param enc_alg: The JWE "enc" value specifying the encryption algorithm
340 :param msg: The plain text message
341 :param auth_data: Additional authenticated data
342 :param key: Key (CEK)
343 :return: Tuple (ciphertext, tag), both as bytes
344 """
345
346 key, iv = self._generate_key_and_iv(enc_alg, key, iv)
347
348 if enc_alg == "A256GCM":
349 gcm = AES_GCM(bytes_to_long(key))
350 ctxt, tag = gcm.encrypt(bytes_to_long(iv), msg, auth_data)
351 tag = long_to_bytes(tag)
352 elif enc_alg in ["A128CBC-HS256", "A192CBC-HS384", "A256CBC-HS512"]:
353 assert enc_alg in SUPPORTED["enc"]
354 ctxt, tag = aes_cbc_hmac_encrypt(key, iv, auth_data, msg)
355 else:
356 raise NotSupportedAlgorithm(enc_alg)
357
358 return ctxt, tag, key
359
360 @staticmethod
361 def _decrypt(enc, key, ctxt, auth_data, iv, tag):
362 """ Decrypt JWE content.
363
364 :param enc: The JWE "enc" value specifying the encryption algorithm
365 :param key: Key (CEK)
366 :param iv : Initialization vector
367 :param auth_data: Additional authenticated data (AAD)
368 :param ctxt : Ciphertext
369 :param tag: Authentication tag
370 :return: plain text message or None if decryption failed
371 """
372 if enc in ["A128GCM", "A192GCM", "A256GCM"]:
373 gcm = AES_GCM(bytes_to_long(key))
374 try:
375 text = gcm.decrypt(bytes_to_long(iv), ctxt, bytes_to_long(tag),
376 auth_data)
377 return text, True
378 except DecryptionFailed:
379 return None, False
380 elif enc in ["A128CBC-HS256", "A192CBC-HS384", "A256CBC-HS512"]:
381 return aes_cbc_hmac_decrypt(key, iv, auth_data, ctxt, tag)
382 else:
383 raise Exception("Unsupported encryption algorithm %s" % enc)
384
385
386 class JWE_SYM(JWe):
387 args = JWe.args[:]
388 args.append("enc")
389
390 def encrypt(self, key, iv="", cek="", **kwargs):
391 """
392
393 :param key: Shared symmetric key
394 :param iv: initialization vector
395 :param cek:
396 :param kwargs: Extra keyword arguments, just ignore for now.
397 :return:
398 """
399 _msg = self.msg
400
401 _args = self._dict
402 try:
403 _args["kid"] = kwargs["kid"]
404 except KeyError:
405 pass
406
407 jwe = JWEnc(**_args)
408
409 # If no iv and cek are given generate them
410 cek, iv = self._generate_key_and_iv(self["enc"], cek, iv)
411 if isinstance(key, six.binary_type):
412 kek = key
413 else:
414 kek = intarr2str(key)
415
416 # The iv for this function must be 64 bit
417 # Which is certainly different from the one above
418 jek = aes_wrap_key(kek, cek)
419
420 _enc = self["enc"]
421
422 ctxt, tag, cek = self.enc_setup(_enc, _msg.encode(),
423 jwe.b64_encode_header(),
424 cek, iv=iv)
425 return jwe.pack(parts=[jek, iv, ctxt, tag])
426
427 def decrypt(self, token, key=None, cek=None):
428 if not key and not cek:
429 raise MissingKey("On of key or cek must be specified")
430
431 jwe = JWEnc().unpack(token)
432
433 if not cek:
434 jek = jwe.encrypted_key()
435 # The iv for this function must be 64 bit
436 cek = aes_unwrap_key(key, jek)
437
438 msg = self._decrypt(
439 jwe.headers["enc"], cek, jwe.ciphertext(),
440 jwe.b64_protected_header(),
441 jwe.initialization_vector(), jwe.authentication_tag())
442
443 if "zip" in self and self["zip"] == "DEF":
444 msg = zlib.decompress(msg)
445
446 return msg
447
448
449 class JWE_RSA(JWe):
450 args = ["msg", "alg", "enc", "epk", "zip", "jku", "jwk", "x5u", "x5t",
451 "x5c", "kid", "typ", "cty", "apu", "crit"]
452
453 def encrypt(self, key, iv="", cek="", **kwargs):
454 """
455 Produces a JWE using RSA algorithms
456
457 :param key: RSA key
458 :param context:
459 :param iv:
460 :param cek:
461 :return: A jwe
462 """
463
464 _msg = as_bytes(self.msg)
465 if "zip" in self:
466 if self["zip"] == "DEF":
467 _msg = zlib.compress(_msg)
468 else:
469 raise ParameterError("Zip has unknown value: %s" % self["zip"])
470
471 _enc = self["enc"]
472 cek, iv = self._generate_key_and_iv(_enc, cek, iv)
473
474 logger.debug("cek: %s, iv: %s" % ([c for c in cek], [c for c in iv]))
475
476 _encrypt = RSAEncrypter(self.with_digest).encrypt
477
478 _alg = self["alg"]
479 if _alg == "RSA-OAEP":
480 jwe_enc_key = _encrypt(cek, key, 'pkcs1_oaep_padding')
481 elif _alg == "RSA1_5":
482 jwe_enc_key = _encrypt(cek, key)
483 else:
484 raise NotSupportedAlgorithm(_alg)
485
486 jwe = JWEnc(**self.headers())
487
488 enc_header = jwe.b64_encode_header()
489
490 ctxt, tag, key = self.enc_setup(_enc, _msg, enc_header, cek, iv)
491 return jwe.pack(parts=[jwe_enc_key, iv, ctxt, tag])
492
493 def decrypt(self, token, key):
494 """ Decrypts a JWT
495
496 :param token: The JWT
497 :param key: A key to use for decrypting
498 :return: The decrypted message
499 """
500 jwe = JWEnc().unpack(token)
501 self.jwt = jwe.encrypted_key()
502 jek = jwe.encrypted_key()
503
504 _decrypt = RSAEncrypter(self.with_digest).decrypt
505
506 _alg = jwe.headers["alg"]
507 if _alg == "RSA-OAEP":
508 cek = _decrypt(jek, key, 'pkcs1_oaep_padding')
509 elif _alg == "RSA1_5":
510 cek = _decrypt(jek, key)
511 else:
512 raise NotSupportedAlgorithm(_alg)
513
514 enc = jwe.headers["enc"]
515 try:
516 assert enc in SUPPORTED["enc"]
517 except AssertionError:
518 raise NotSupportedAlgorithm(enc)
519
520 msg, flag = self._decrypt(enc, cek, jwe.ciphertext(),
521 jwe.b64_protected_header(),
522 jwe.initialization_vector(),
523 jwe.authentication_tag())
524 if flag is False:
525 raise DecryptionFailed()
526
527 if "zip" in jwe.headers and jwe.headers["zip"] == "DEF":
528 msg = zlib.decompress(msg)
529
530 return msg
531
532
533 class JWE_EC(JWe):
534 def enc_setup(self, msg, auth_data, key=None, **kwargs):
535
536 encrypted_key = ""
537 # Generate the input parameters
538 try:
539 apu = b64d(kwargs["apu"])
540 except KeyError:
541 apu = b64d(Random.get_random_bytes(16))
542 try:
543 apv = b64d(kwargs["apv"])
544 except KeyError:
545 apv = b64d(Random.get_random_bytes(16))
546
547 # Generate an ephemeral key pair
548 curve = NISTEllipticCurve.by_name(key.crv)
549 if "epk" in kwargs:
550 eprivk = ECKey(kwargs["epk"])
551 else:
552 (eprivk, epk) = curve.key_pair()
553 params = {
554 "apu": b64e(apu),
555 "apv": b64e(apv),
556 }
557
558 cek, iv = self._generate_key_and_iv(self.enc)
559 if self.alg == "ECDH-ES":
560 try:
561 dk_len = KEYLEN[self.enc]
562 except KeyError:
563 raise Exception(
564 "Unknown key length for algorithm %s" % self.enc)
565
566 cek = ecdh_derive_key(curve, eprivk, key, apu, apv, self.enc,
567 dk_len)
568 elif self.alg in ["ECDH-ES+A128KW", "ECDH-ES+A192KW", "ECDH-ES+A256KW"]:
569 _pre, _post = self.alg.split("+")
570 klen = int(_post[1:4])
571 kek = ecdh_derive_key(curve, eprivk, key, apu, apv, _post, klen)
572 encrypted_key = aes_wrap_key(kek, cek)
573 else:
574 raise Exception("Unsupported algorithm %s" % self.alg)
575
576 return cek, encrypted_key, iv, params
577
578
579 class JWE(JWx):
580 args = ["alg", "enc", "epk", "zip", "jku", "jwk", "x5u", "x5t",
581 "x5c", "kid", "typ", "cty", "apu", "crit"]
582
583 """
584 :param msg: The message
585 :param alg: Algorithm
586 :param enc: Encryption Method
587 :param epk: Ephemeral Public Key
588 :param zip: Compression Algorithm
589 :param jku: a URI that refers to a resource for a set of JSON-encoded
590 public keys, one of which corresponds to the key used to digitally
591 sign the JWS
592 :param jwk: A JSON Web Key that corresponds to the key used to
593 digitally sign the JWS
594 :param x5u: a URI that refers to a resource for the X.509 public key
595 certificate or certificate chain [RFC5280] corresponding to the key
596 used to digitally sign the JWS.
597 :param x5t: a base64url encoded SHA-1 thumbprint (a.k.a. digest) of the
598 DER encoding of the X.509 certificate [RFC5280] corresponding to
599 the key used to digitally sign the JWS.
600 :param x5c: the X.509 public key certificate or certificate chain
601 corresponding to the key used to digitally sign the JWS.
602 :param kid: Key ID a hint indicating which key was used to secure the
603 JWS.
604 :param typ: the type of this object. 'JWS' == JWS Compact Serialization
605 'JWS+JSON' == JWS JSON Serialization
606 :param cty: Content Type
607 :param apu: Agreement PartyUInfo
608 :param crit: indicates which extensions that are being used and MUST
609 be understood and processed.
610 :return: A class instance
611 """
612
613 def encrypt(self, keys=None, cek="", iv="", **kwargs):
614 """
615
616 :param keys: A set of possibly usable keys
617 :param context: If the other party's public or my private key should be
618 used for encryption
619 :param cek: Content master key
620 :param iv: Initialization vector
621 :param kwargs: Extra key word arguments
622 :return: Encrypted message
623 """
624 _alg = self["alg"]
625 if _alg.startswith("RSA") and _alg in ["RSA-OAEP", "RSA1_5"]:
626 encrypter = JWE_RSA(self.msg, **self._dict)
627 elif _alg.startswith("A") and _alg.endswith("KW"):
628 encrypter = JWE_SYM(self.msg, **self._dict)
629 else:
630 logger.error("'{}' is not a supported algorithm".format(_alg))
631 raise NotSupportedAlgorithm
632
633 if keys:
634 keys = self._pick_keys(keys, use="enc")
635 else:
636 keys = self._pick_keys(self._get_keys(), use="enc")
637
638 if not keys:
639 logger.error(
640 "Could not find any suitable encryption key for alg='{"
641 "}'".format(_alg))
642 raise NoSuitableEncryptionKey(_alg)
643
644 if cek:
645 kwargs["cek"] = cek
646 if iv:
647 kwargs["iv"] = iv
648
649 for key in keys:
650 _key = key.encryption_key(alg=_alg, private=True)
651
652 if key.kid:
653 encrypter["kid"] = key.kid
654
655 try:
656 token = encrypter.encrypt(_key, **kwargs)
657 except TypeError as err:
658 raise err
659 else:
660 logger.debug(
661 "Encrypted message using key with kid={}".format(key.kid))
662 return token
663
664 logger.error("Could not find any suitable encryption key")
665 raise NoSuitableEncryptionKey()
666
667 def decrypt(self, token, keys=None, alg=None):
668 jwe = JWEnc().unpack(token)
669 # header, ek, eiv, ctxt, tag = token.split(b".")
670 # self.parse_header(header)
671
672 _alg = jwe.headers["alg"]
673 if alg and alg != _alg:
674 raise WrongEncryptionAlgorithm()
675
676 if _alg in ["RSA-OAEP", "RSA1_5"]:
677 decrypter = JWE_RSA(**self._dict)
678 elif _alg.startswith("A") and _alg.endswith("KW"):
679 decrypter = JWE_SYM(self.msg, **self._dict)
680 else:
681 raise NotSupportedAlgorithm
682
683 if keys:
684 keys = self._pick_keys(keys, use="enc", alg=_alg)
685 else:
686 keys = self._pick_keys(self._get_keys(), use="enc", alg=_alg)
687
688 if not keys:
689 raise NoSuitableDecryptionKey(_alg)
690
691 for key in keys:
692 _key = key.encryption_key(alg=_alg, private=False)
693 try:
694 msg = decrypter.decrypt(as_bytes(token), _key)
695 except (KeyError, DecryptionFailed):
696 pass
697 else:
698 logger.debug(
699 "Decrypted message using key with kid=%s" % key.kid)
700 return msg
701
702 raise DecryptionFailed(
703 "No available key that could decrypt the message")
704
705
706 def factory(token):
707 _jwt = JWEnc().unpack(token)
708 if _jwt.is_jwe():
709 _jwe = JWE()
710 _jwe.jwt = _jwt
711 return _jwe
712 else:
713 return None
OLDNEW
« no previous file with comments | « third_party/google-endpoints/jwkest/extra.py ('k') | third_party/google-endpoints/jwkest/jwk.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698