Index: third_party/google-endpoints/jwkest/jwt.py |
diff --git a/third_party/google-endpoints/jwkest/jwt.py b/third_party/google-endpoints/jwkest/jwt.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..7a54ac05d93e62292919136c0f9e437ed09e282d |
--- /dev/null |
+++ b/third_party/google-endpoints/jwkest/jwt.py |
@@ -0,0 +1,97 @@ |
+import json |
+import six |
+from jwkest import b64d, as_unicode |
+from jwkest import b64e |
+from jwkest import BadSyntax |
+ |
+__author__ = 'roland' |
+ |
+ |
+def split_token(token): |
+ if not token.count(b"."): |
+ raise BadSyntax(token, |
+ "expected token to contain at least one dot") |
+ return tuple(token.split(b".")) |
+ |
+ |
+def b2s_conv(item): |
+ if isinstance(item, bytes): |
+ return item.decode("utf-8") |
+ elif isinstance(item, (six.string_types, int, bool)): |
+ return item |
+ elif isinstance(item, list): |
+ return [b2s_conv(i) for i in item] |
+ elif isinstance(item, dict): |
+ return dict([(k, b2s_conv(v)) for k, v in item.items()]) |
+ |
+ |
+def b64encode_item(item): |
+ if isinstance(item, bytes): |
+ return b64e(item) |
+ elif isinstance(item, str): |
+ return b64e(item.encode("utf-8")) |
+ elif isinstance(item, int): |
+ return b64e(item) |
+ else: |
+ return b64e(json.dumps(b2s_conv(item), |
+ separators=(",", ":")).encode("utf-8")) |
+ |
+ |
+class JWT(object): |
+ def __init__(self, **headers): |
+ if not headers.get("alg"): |
+ headers["alg"] = None |
+ self.headers = headers |
+ self.b64part = [b64encode_item(headers)] |
+ self.part = [b64d(self.b64part[0])] |
+ |
+ def unpack(self, token): |
+ """ |
+ Unpacks a JWT into its parts and base64 decodes the parts |
+ individually |
+ |
+ :param token: The JWT |
+ """ |
+ if isinstance(token, six.string_types): |
+ try: |
+ token = token.encode("utf-8") |
+ except UnicodeDecodeError: |
+ pass |
+ |
+ part = split_token(token) |
+ self.b64part = part |
+ self.part = [b64d(p) for p in part] |
+ self.headers = json.loads(self.part[0].decode()) |
+ return self |
+ |
+ def pack(self, parts, headers=None): |
+ """ |
+ Packs components into a JWT |
+ |
+ :param returns: The string representation of a JWT |
+ """ |
+ if not headers: |
+ if self.headers: |
+ headers = self.headers |
+ else: |
+ headers = {'alg': 'none'} |
+ |
+ self.part = [self.part[0]] + parts |
+ _all = self.b64part = [self.b64part[0]] |
+ _all.extend([b64encode_item(p) for p in parts]) |
+ |
+ return ".".join([a.decode() for a in _all]) |
+ |
+ def payload(self): |
+ _msg = as_unicode(self.part[1]) |
+ |
+ # If not JSON web token assume JSON |
+ if "cty" in self.headers and self.headers["cty"].lower() != "jwt": |
+ pass |
+ else: |
+ try: |
+ _msg = json.loads(_msg) |
+ except ValueError: |
+ pass |
+ |
+ return _msg |