Index: third_party/google-endpoints/jwkest/jwe.py |
diff --git a/third_party/google-endpoints/jwkest/jwe.py b/third_party/google-endpoints/jwkest/jwe.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..3d6425612e9e724230938aa17d7d2630ca43766c |
--- /dev/null |
+++ b/third_party/google-endpoints/jwkest/jwe.py |
@@ -0,0 +1,713 @@ |
+# from future import standard_library |
+# standard_library.install_aliases() |
+try: |
+ from builtins import object |
+except ImportError: |
+ pass |
+ |
+import struct |
+import io |
+import logging |
+import zlib |
+import six |
+ |
+from Crypto import Random |
+from Crypto.Hash import SHA |
+from Crypto.Util.number import bytes_to_long |
+from Crypto.Util.number import long_to_bytes |
+from Crypto.Cipher import PKCS1_v1_5 |
+from Crypto.Cipher import PKCS1_OAEP |
+ |
+from jwkest import b64d, as_bytes |
+from jwkest import b64e |
+from jwkest import JWKESTException |
+from jwkest import MissingKey |
+from jwkest.aes_gcm import AES_GCM |
+from jwkest.aes_key_wrap import aes_wrap_key |
+from jwkest.aes_key_wrap import aes_unwrap_key |
+from jwkest.ecc import NISTEllipticCurve |
+from jwkest.extra import aes_cbc_hmac_encrypt |
+from jwkest.extra import ecdh_derive_key |
+from jwkest.extra import aes_cbc_hmac_decrypt |
+from jwkest.jwk import intarr2str |
+from jwkest.jwk import ECKey |
+from jwkest.jws import JWx |
+from jwkest.jwt import JWT, b64encode_item |
+ |
+logger = logging.getLogger(__name__) |
+ |
+__author__ = 'rohe0002' |
+ |
+ENC = 1 |
+DEC = 0 |
+ |
+ |
+class JWEException(JWKESTException): |
+ pass |
+ |
+ |
+class CannotDecode(JWEException): |
+ pass |
+ |
+ |
+class NotSupportedAlgorithm(JWEException): |
+ pass |
+ |
+ |
+class MethodNotSupported(JWEException): |
+ pass |
+ |
+ |
+class ParameterError(JWEException): |
+ pass |
+ |
+ |
+class NoSuitableEncryptionKey(JWEException): |
+ pass |
+ |
+ |
+class NoSuitableDecryptionKey(JWEException): |
+ pass |
+ |
+ |
+class DecryptionFailed(JWEException): |
+ pass |
+ |
+ |
+class WrongEncryptionAlgorithm(JWEException): |
+ pass |
+ |
+ |
+# --------------------------------------------------------------------------- |
+# Base class |
+ |
+KEYLEN = { |
+ "A128GCM": 128, |
+ "A192GCM": 192, |
+ "A256GCM": 256, |
+ "A128CBC-HS256": 256, |
+ "A192CBC-HS384": 384, |
+ "A256CBC-HS512": 512 |
+} |
+ |
+ |
+class Encrypter(object): |
+ """Abstract base class for encryption algorithms.""" |
+ |
+ def __init__(self, with_digest=False): |
+ self.with_digest = with_digest |
+ |
+ def encrypt(self, msg, key): |
+ """Encrypt ``msg`` with ``key`` and return the encrypted message.""" |
+ raise NotImplementedError |
+ |
+ def decrypt(self, msg, key): |
+ """Return decrypted message.""" |
+ raise NotImplementedError |
+ |
+ |
+class RSAEncrypter(Encrypter): |
+ def encrypt(self, msg, key, padding="pkcs1_padding"): |
+ if padding == "pkcs1_padding": |
+ cipher = PKCS1_v1_5.new(key) |
+ if self.with_digest: # add a SHA digest to the message |
+ h = SHA.new(msg) |
+ msg += h.digest() |
+ elif padding == "pkcs1_oaep_padding": |
+ cipher = PKCS1_OAEP.new(key) |
+ else: |
+ raise Exception("Unsupported padding") |
+ return cipher.encrypt(msg) |
+ |
+ def decrypt(self, ciphertext, key, padding="pkcs1_padding"): |
+ if padding == "pkcs1_padding": |
+ cipher = PKCS1_v1_5.new(key) |
+ if self.with_digest: |
+ dsize = SHA.digest_size |
+ else: |
+ dsize = 0 |
+ sentinel = Random.new().read(32 + dsize) |
+ text = cipher.decrypt(ciphertext, sentinel) |
+ if dsize: |
+ _digest = text[-dsize:] |
+ _msg = text[:-dsize] |
+ digest = SHA.new(_msg).digest() |
+ if digest == _digest: |
+ text = _msg |
+ else: |
+ raise DecryptionFailed() |
+ else: |
+ if text == sentinel: |
+ raise DecryptionFailed() |
+ elif padding == "pkcs1_oaep_padding": |
+ cipher = PKCS1_OAEP.new(key) |
+ text = cipher.decrypt(ciphertext) |
+ else: |
+ raise Exception("Unsupported padding") |
+ |
+ return text |
+ |
+ |
+# --------------------------------------------------------------------------- |
+ |
+ |
+def int2bigendian(n): |
+ return [ord(c) for c in struct.pack('>I', n)] |
+ |
+ |
+def party_value(pv): |
+ if pv: |
+ s = b64e(pv) |
+ r = int2bigendian(len(s)) |
+ r.extend(s) |
+ return r |
+ else: |
+ return [0, 0, 0, 0] |
+ |
+ |
+def _hash_input(cmk, enc, label, rond=1, length=128, hashsize=256, |
+ epu="", epv=""): |
+ r = [0, 0, 0, rond] |
+ r.extend(cmk) |
+ r.extend([0, 0, 0, length]) |
+ r.extend([ord(c) for c in enc]) |
+ r.extend(party_value(epu)) |
+ r.extend(party_value(epv)) |
+ r.extend(label) |
+ return r |
+ |
+ |
+# --------------------------------------------------------------------------- |
+ |
+def cipher_filter(cipher, inf, outf): |
+ while 1: |
+ buf = inf.read() |
+ if not buf: |
+ break |
+ outf.write(cipher.update(buf)) |
+ outf.write(cipher.final()) |
+ return outf.getvalue() |
+ |
+ |
+def aes_enc(key, txt): |
+ pbuf = io.StringIO(txt) |
+ cbuf = io.StringIO() |
+ ciphertext = cipher_filter(key, pbuf, cbuf) |
+ pbuf.close() |
+ cbuf.close() |
+ return ciphertext |
+ |
+ |
+def aes_dec(key, ciptxt): |
+ pbuf = io.StringIO() |
+ cbuf = io.StringIO(ciptxt) |
+ plaintext = cipher_filter(key, cbuf, pbuf) |
+ pbuf.close() |
+ cbuf.close() |
+ return plaintext |
+ |
+ |
+def keysize(spec): |
+ if spec.startswith("HS"): |
+ return int(spec[2:]) |
+ elif spec.startswith("CS"): |
+ return int(spec[2:]) |
+ elif spec.startswith("A"): |
+ return int(spec[1:4]) |
+ return 0 |
+ |
+ |
+ENC2ALG = {"A128CBC": "aes_128_cbc", "A192CBC": "aes_192_cbc", |
+ "A256CBC": "aes_256_cbc"} |
+ |
+SUPPORTED = { |
+ "alg": ["RSA1_5", "RSA-OAEP", "A128KW", "A192KW", "A256KW", |
+ "ECDH-ES", "ECDH-ES+A128KW", "ECDH-ES+A192KW", "ECDH-ES+A256KW"], |
+ "enc": ["A128CBC-HS256", "A192CBC-HS384", "A256CBC-HS512", |
+ # "A128GCM", "A192GCM", |
+ "A256GCM"], |
+} |
+ |
+ |
+def alg2keytype(alg): |
+ if alg.startswith("RSA"): |
+ return "RSA" |
+ elif alg.startswith("A"): |
+ return "oct" |
+ elif alg.startswith("ECDH"): |
+ return "EC" |
+ else: |
+ return None |
+ |
+ |
+# ============================================================================= |
+ |
+ENCALGLEN1 = { |
+ "A128GCM": 16, |
+ "A192GCM": 24, |
+ "A256GCM": 32 |
+} |
+ |
+ENCALGLEN2 = { |
+ "A128CBC-HS256": 32, |
+ "A192CBC-HS384": 48, |
+ "A256CBC-HS512": 64, |
+} |
+ |
+ |
+class JWEnc(JWT): |
+ def b64_protected_header(self): |
+ return self.b64part[0] |
+ |
+ def b64_encrypted_key(self): |
+ return self.b64part[1] |
+ |
+ def b64_initialization_vector(self): |
+ return self.b64part[2] |
+ |
+ def b64_ciphertext(self): |
+ return self.b64part[3] |
+ |
+ def b64_authentication_tag(self): |
+ return self.b64part[4] |
+ |
+ def protected_header(self): |
+ return self.part[0] |
+ |
+ def encrypted_key(self): |
+ return self.part[1] |
+ |
+ def initialization_vector(self): |
+ return self.part[2] |
+ |
+ def ciphertext(self): |
+ return self.part[3] |
+ |
+ def authentication_tag(self): |
+ return self.part[4] |
+ |
+ def b64_encode_header(self): |
+ return b64encode_item(self.headers) |
+ |
+ def is_jwe(self): |
+ if "typ" in self.headers and self.headers["typ"].lower() == "jwe": |
+ return True |
+ |
+ try: |
+ assert "alg" in self.headers and "enc" in self.headers |
+ except AssertionError: |
+ return False |
+ else: |
+ for typ in ["alg", "enc"]: |
+ try: |
+ assert self.headers[typ] in SUPPORTED[typ] |
+ except AssertionError: |
+ logger.debug("Not supported %s algorithm: %s" % ( |
+ typ, self.headers[typ])) |
+ return False |
+ return True |
+ |
+ |
+class JWe(JWx): |
+ @staticmethod |
+ def _generate_key_and_iv(encalg, cek="", iv=""): |
+ if cek and iv: |
+ return cek, iv |
+ |
+ try: |
+ _key = Random.get_random_bytes(ENCALGLEN1[encalg]) |
+ _iv = Random.get_random_bytes(12) |
+ except KeyError: |
+ try: |
+ _key = Random.get_random_bytes(ENCALGLEN2[encalg]) |
+ _iv = Random.get_random_bytes(16) |
+ except KeyError: |
+ raise Exception("Unsupported encryption algorithm %s" % encalg) |
+ if cek: |
+ _key = cek |
+ if iv: |
+ _iv = iv |
+ |
+ return _key, _iv |
+ |
+ def alg2keytype(self, alg): |
+ return alg2keytype(alg) |
+ |
+ def enc_setup(self, enc_alg, msg, auth_data, key=None, iv=""): |
+ """ Encrypt JWE content. |
+ |
+ :param enc_alg: The JWE "enc" value specifying the encryption algorithm |
+ :param msg: The plain text message |
+ :param auth_data: Additional authenticated data |
+ :param key: Key (CEK) |
+ :return: Tuple (ciphertext, tag), both as bytes |
+ """ |
+ |
+ key, iv = self._generate_key_and_iv(enc_alg, key, iv) |
+ |
+ if enc_alg == "A256GCM": |
+ gcm = AES_GCM(bytes_to_long(key)) |
+ ctxt, tag = gcm.encrypt(bytes_to_long(iv), msg, auth_data) |
+ tag = long_to_bytes(tag) |
+ elif enc_alg in ["A128CBC-HS256", "A192CBC-HS384", "A256CBC-HS512"]: |
+ assert enc_alg in SUPPORTED["enc"] |
+ ctxt, tag = aes_cbc_hmac_encrypt(key, iv, auth_data, msg) |
+ else: |
+ raise NotSupportedAlgorithm(enc_alg) |
+ |
+ return ctxt, tag, key |
+ |
+ @staticmethod |
+ def _decrypt(enc, key, ctxt, auth_data, iv, tag): |
+ """ Decrypt JWE content. |
+ |
+ :param enc: The JWE "enc" value specifying the encryption algorithm |
+ :param key: Key (CEK) |
+ :param iv : Initialization vector |
+ :param auth_data: Additional authenticated data (AAD) |
+ :param ctxt : Ciphertext |
+ :param tag: Authentication tag |
+ :return: plain text message or None if decryption failed |
+ """ |
+ if enc in ["A128GCM", "A192GCM", "A256GCM"]: |
+ gcm = AES_GCM(bytes_to_long(key)) |
+ try: |
+ text = gcm.decrypt(bytes_to_long(iv), ctxt, bytes_to_long(tag), |
+ auth_data) |
+ return text, True |
+ except DecryptionFailed: |
+ return None, False |
+ elif enc in ["A128CBC-HS256", "A192CBC-HS384", "A256CBC-HS512"]: |
+ return aes_cbc_hmac_decrypt(key, iv, auth_data, ctxt, tag) |
+ else: |
+ raise Exception("Unsupported encryption algorithm %s" % enc) |
+ |
+ |
+class JWE_SYM(JWe): |
+ args = JWe.args[:] |
+ args.append("enc") |
+ |
+ def encrypt(self, key, iv="", cek="", **kwargs): |
+ """ |
+ |
+ :param key: Shared symmetric key |
+ :param iv: initialization vector |
+ :param cek: |
+ :param kwargs: Extra keyword arguments, just ignore for now. |
+ :return: |
+ """ |
+ _msg = self.msg |
+ |
+ _args = self._dict |
+ try: |
+ _args["kid"] = kwargs["kid"] |
+ except KeyError: |
+ pass |
+ |
+ jwe = JWEnc(**_args) |
+ |
+ # If no iv and cek are given generate them |
+ cek, iv = self._generate_key_and_iv(self["enc"], cek, iv) |
+ if isinstance(key, six.binary_type): |
+ kek = key |
+ else: |
+ kek = intarr2str(key) |
+ |
+ # The iv for this function must be 64 bit |
+ # Which is certainly different from the one above |
+ jek = aes_wrap_key(kek, cek) |
+ |
+ _enc = self["enc"] |
+ |
+ ctxt, tag, cek = self.enc_setup(_enc, _msg.encode(), |
+ jwe.b64_encode_header(), |
+ cek, iv=iv) |
+ return jwe.pack(parts=[jek, iv, ctxt, tag]) |
+ |
+ def decrypt(self, token, key=None, cek=None): |
+ if not key and not cek: |
+ raise MissingKey("On of key or cek must be specified") |
+ |
+ jwe = JWEnc().unpack(token) |
+ |
+ if not cek: |
+ jek = jwe.encrypted_key() |
+ # The iv for this function must be 64 bit |
+ cek = aes_unwrap_key(key, jek) |
+ |
+ msg = self._decrypt( |
+ jwe.headers["enc"], cek, jwe.ciphertext(), |
+ jwe.b64_protected_header(), |
+ jwe.initialization_vector(), jwe.authentication_tag()) |
+ |
+ if "zip" in self and self["zip"] == "DEF": |
+ msg = zlib.decompress(msg) |
+ |
+ return msg |
+ |
+ |
+class JWE_RSA(JWe): |
+ args = ["msg", "alg", "enc", "epk", "zip", "jku", "jwk", "x5u", "x5t", |
+ "x5c", "kid", "typ", "cty", "apu", "crit"] |
+ |
+ def encrypt(self, key, iv="", cek="", **kwargs): |
+ """ |
+ Produces a JWE using RSA algorithms |
+ |
+ :param key: RSA key |
+ :param context: |
+ :param iv: |
+ :param cek: |
+ :return: A jwe |
+ """ |
+ |
+ _msg = as_bytes(self.msg) |
+ if "zip" in self: |
+ if self["zip"] == "DEF": |
+ _msg = zlib.compress(_msg) |
+ else: |
+ raise ParameterError("Zip has unknown value: %s" % self["zip"]) |
+ |
+ _enc = self["enc"] |
+ cek, iv = self._generate_key_and_iv(_enc, cek, iv) |
+ |
+ logger.debug("cek: %s, iv: %s" % ([c for c in cek], [c for c in iv])) |
+ |
+ _encrypt = RSAEncrypter(self.with_digest).encrypt |
+ |
+ _alg = self["alg"] |
+ if _alg == "RSA-OAEP": |
+ jwe_enc_key = _encrypt(cek, key, 'pkcs1_oaep_padding') |
+ elif _alg == "RSA1_5": |
+ jwe_enc_key = _encrypt(cek, key) |
+ else: |
+ raise NotSupportedAlgorithm(_alg) |
+ |
+ jwe = JWEnc(**self.headers()) |
+ |
+ enc_header = jwe.b64_encode_header() |
+ |
+ ctxt, tag, key = self.enc_setup(_enc, _msg, enc_header, cek, iv) |
+ return jwe.pack(parts=[jwe_enc_key, iv, ctxt, tag]) |
+ |
+ def decrypt(self, token, key): |
+ """ Decrypts a JWT |
+ |
+ :param token: The JWT |
+ :param key: A key to use for decrypting |
+ :return: The decrypted message |
+ """ |
+ jwe = JWEnc().unpack(token) |
+ self.jwt = jwe.encrypted_key() |
+ jek = jwe.encrypted_key() |
+ |
+ _decrypt = RSAEncrypter(self.with_digest).decrypt |
+ |
+ _alg = jwe.headers["alg"] |
+ if _alg == "RSA-OAEP": |
+ cek = _decrypt(jek, key, 'pkcs1_oaep_padding') |
+ elif _alg == "RSA1_5": |
+ cek = _decrypt(jek, key) |
+ else: |
+ raise NotSupportedAlgorithm(_alg) |
+ |
+ enc = jwe.headers["enc"] |
+ try: |
+ assert enc in SUPPORTED["enc"] |
+ except AssertionError: |
+ raise NotSupportedAlgorithm(enc) |
+ |
+ msg, flag = self._decrypt(enc, cek, jwe.ciphertext(), |
+ jwe.b64_protected_header(), |
+ jwe.initialization_vector(), |
+ jwe.authentication_tag()) |
+ if flag is False: |
+ raise DecryptionFailed() |
+ |
+ if "zip" in jwe.headers and jwe.headers["zip"] == "DEF": |
+ msg = zlib.decompress(msg) |
+ |
+ return msg |
+ |
+ |
+class JWE_EC(JWe): |
+ def enc_setup(self, msg, auth_data, key=None, **kwargs): |
+ |
+ encrypted_key = "" |
+ # Generate the input parameters |
+ try: |
+ apu = b64d(kwargs["apu"]) |
+ except KeyError: |
+ apu = b64d(Random.get_random_bytes(16)) |
+ try: |
+ apv = b64d(kwargs["apv"]) |
+ except KeyError: |
+ apv = b64d(Random.get_random_bytes(16)) |
+ |
+ # Generate an ephemeral key pair |
+ curve = NISTEllipticCurve.by_name(key.crv) |
+ if "epk" in kwargs: |
+ eprivk = ECKey(kwargs["epk"]) |
+ else: |
+ (eprivk, epk) = curve.key_pair() |
+ params = { |
+ "apu": b64e(apu), |
+ "apv": b64e(apv), |
+ } |
+ |
+ cek, iv = self._generate_key_and_iv(self.enc) |
+ if self.alg == "ECDH-ES": |
+ try: |
+ dk_len = KEYLEN[self.enc] |
+ except KeyError: |
+ raise Exception( |
+ "Unknown key length for algorithm %s" % self.enc) |
+ |
+ cek = ecdh_derive_key(curve, eprivk, key, apu, apv, self.enc, |
+ dk_len) |
+ elif self.alg in ["ECDH-ES+A128KW", "ECDH-ES+A192KW", "ECDH-ES+A256KW"]: |
+ _pre, _post = self.alg.split("+") |
+ klen = int(_post[1:4]) |
+ kek = ecdh_derive_key(curve, eprivk, key, apu, apv, _post, klen) |
+ encrypted_key = aes_wrap_key(kek, cek) |
+ else: |
+ raise Exception("Unsupported algorithm %s" % self.alg) |
+ |
+ return cek, encrypted_key, iv, params |
+ |
+ |
+class JWE(JWx): |
+ args = ["alg", "enc", "epk", "zip", "jku", "jwk", "x5u", "x5t", |
+ "x5c", "kid", "typ", "cty", "apu", "crit"] |
+ |
+ """ |
+ :param msg: The message |
+ :param alg: Algorithm |
+ :param enc: Encryption Method |
+ :param epk: Ephemeral Public Key |
+ :param zip: Compression Algorithm |
+ :param jku: a URI that refers to a resource for a set of JSON-encoded |
+ public keys, one of which corresponds to the key used to digitally |
+ sign the JWS |
+ :param jwk: A JSON Web Key that corresponds to the key used to |
+ digitally sign the JWS |
+ :param x5u: a URI that refers to a resource for the X.509 public key |
+ certificate or certificate chain [RFC5280] corresponding to the key |
+ used to digitally sign the JWS. |
+ :param x5t: a base64url encoded SHA-1 thumbprint (a.k.a. digest) of the |
+ DER encoding of the X.509 certificate [RFC5280] corresponding to |
+ the key used to digitally sign the JWS. |
+ :param x5c: the X.509 public key certificate or certificate chain |
+ corresponding to the key used to digitally sign the JWS. |
+ :param kid: Key ID a hint indicating which key was used to secure the |
+ JWS. |
+ :param typ: the type of this object. 'JWS' == JWS Compact Serialization |
+ 'JWS+JSON' == JWS JSON Serialization |
+ :param cty: Content Type |
+ :param apu: Agreement PartyUInfo |
+ :param crit: indicates which extensions that are being used and MUST |
+ be understood and processed. |
+ :return: A class instance |
+ """ |
+ |
+ def encrypt(self, keys=None, cek="", iv="", **kwargs): |
+ """ |
+ |
+ :param keys: A set of possibly usable keys |
+ :param context: If the other party's public or my private key should be |
+ used for encryption |
+ :param cek: Content master key |
+ :param iv: Initialization vector |
+ :param kwargs: Extra key word arguments |
+ :return: Encrypted message |
+ """ |
+ _alg = self["alg"] |
+ if _alg.startswith("RSA") and _alg in ["RSA-OAEP", "RSA1_5"]: |
+ encrypter = JWE_RSA(self.msg, **self._dict) |
+ elif _alg.startswith("A") and _alg.endswith("KW"): |
+ encrypter = JWE_SYM(self.msg, **self._dict) |
+ else: |
+ logger.error("'{}' is not a supported algorithm".format(_alg)) |
+ raise NotSupportedAlgorithm |
+ |
+ if keys: |
+ keys = self._pick_keys(keys, use="enc") |
+ else: |
+ keys = self._pick_keys(self._get_keys(), use="enc") |
+ |
+ if not keys: |
+ logger.error( |
+ "Could not find any suitable encryption key for alg='{" |
+ "}'".format(_alg)) |
+ raise NoSuitableEncryptionKey(_alg) |
+ |
+ if cek: |
+ kwargs["cek"] = cek |
+ if iv: |
+ kwargs["iv"] = iv |
+ |
+ for key in keys: |
+ _key = key.encryption_key(alg=_alg, private=True) |
+ |
+ if key.kid: |
+ encrypter["kid"] = key.kid |
+ |
+ try: |
+ token = encrypter.encrypt(_key, **kwargs) |
+ except TypeError as err: |
+ raise err |
+ else: |
+ logger.debug( |
+ "Encrypted message using key with kid={}".format(key.kid)) |
+ return token |
+ |
+ logger.error("Could not find any suitable encryption key") |
+ raise NoSuitableEncryptionKey() |
+ |
+ def decrypt(self, token, keys=None, alg=None): |
+ jwe = JWEnc().unpack(token) |
+ # header, ek, eiv, ctxt, tag = token.split(b".") |
+ # self.parse_header(header) |
+ |
+ _alg = jwe.headers["alg"] |
+ if alg and alg != _alg: |
+ raise WrongEncryptionAlgorithm() |
+ |
+ if _alg in ["RSA-OAEP", "RSA1_5"]: |
+ decrypter = JWE_RSA(**self._dict) |
+ elif _alg.startswith("A") and _alg.endswith("KW"): |
+ decrypter = JWE_SYM(self.msg, **self._dict) |
+ else: |
+ raise NotSupportedAlgorithm |
+ |
+ if keys: |
+ keys = self._pick_keys(keys, use="enc", alg=_alg) |
+ else: |
+ keys = self._pick_keys(self._get_keys(), use="enc", alg=_alg) |
+ |
+ if not keys: |
+ raise NoSuitableDecryptionKey(_alg) |
+ |
+ for key in keys: |
+ _key = key.encryption_key(alg=_alg, private=False) |
+ try: |
+ msg = decrypter.decrypt(as_bytes(token), _key) |
+ except (KeyError, DecryptionFailed): |
+ pass |
+ else: |
+ logger.debug( |
+ "Decrypted message using key with kid=%s" % key.kid) |
+ return msg |
+ |
+ raise DecryptionFailed( |
+ "No available key that could decrypt the message") |
+ |
+ |
+def factory(token): |
+ _jwt = JWEnc().unpack(token) |
+ if _jwt.is_jwe(): |
+ _jwe = JWE() |
+ _jwe.jwt = _jwt |
+ return _jwe |
+ else: |
+ return None |