OLD | NEW |
(Empty) | |
| 1 import json |
| 2 import six |
| 3 from jwkest import b64d, as_unicode |
| 4 from jwkest import b64e |
| 5 from jwkest import BadSyntax |
| 6 |
| 7 __author__ = 'roland' |
| 8 |
| 9 |
| 10 def split_token(token): |
| 11 if not token.count(b"."): |
| 12 raise BadSyntax(token, |
| 13 "expected token to contain at least one dot") |
| 14 return tuple(token.split(b".")) |
| 15 |
| 16 |
| 17 def b2s_conv(item): |
| 18 if isinstance(item, bytes): |
| 19 return item.decode("utf-8") |
| 20 elif isinstance(item, (six.string_types, int, bool)): |
| 21 return item |
| 22 elif isinstance(item, list): |
| 23 return [b2s_conv(i) for i in item] |
| 24 elif isinstance(item, dict): |
| 25 return dict([(k, b2s_conv(v)) for k, v in item.items()]) |
| 26 |
| 27 |
| 28 def b64encode_item(item): |
| 29 if isinstance(item, bytes): |
| 30 return b64e(item) |
| 31 elif isinstance(item, str): |
| 32 return b64e(item.encode("utf-8")) |
| 33 elif isinstance(item, int): |
| 34 return b64e(item) |
| 35 else: |
| 36 return b64e(json.dumps(b2s_conv(item), |
| 37 separators=(",", ":")).encode("utf-8")) |
| 38 |
| 39 |
| 40 class JWT(object): |
| 41 def __init__(self, **headers): |
| 42 if not headers.get("alg"): |
| 43 headers["alg"] = None |
| 44 self.headers = headers |
| 45 self.b64part = [b64encode_item(headers)] |
| 46 self.part = [b64d(self.b64part[0])] |
| 47 |
| 48 def unpack(self, token): |
| 49 """ |
| 50 Unpacks a JWT into its parts and base64 decodes the parts |
| 51 individually |
| 52 |
| 53 :param token: The JWT |
| 54 """ |
| 55 if isinstance(token, six.string_types): |
| 56 try: |
| 57 token = token.encode("utf-8") |
| 58 except UnicodeDecodeError: |
| 59 pass |
| 60 |
| 61 part = split_token(token) |
| 62 self.b64part = part |
| 63 self.part = [b64d(p) for p in part] |
| 64 self.headers = json.loads(self.part[0].decode()) |
| 65 return self |
| 66 |
| 67 def pack(self, parts, headers=None): |
| 68 """ |
| 69 Packs components into a JWT |
| 70 |
| 71 :param returns: The string representation of a JWT |
| 72 """ |
| 73 if not headers: |
| 74 if self.headers: |
| 75 headers = self.headers |
| 76 else: |
| 77 headers = {'alg': 'none'} |
| 78 |
| 79 self.part = [self.part[0]] + parts |
| 80 _all = self.b64part = [self.b64part[0]] |
| 81 _all.extend([b64encode_item(p) for p in parts]) |
| 82 |
| 83 return ".".join([a.decode() for a in _all]) |
| 84 |
| 85 def payload(self): |
| 86 _msg = as_unicode(self.part[1]) |
| 87 |
| 88 # If not JSON web token assume JSON |
| 89 if "cty" in self.headers and self.headers["cty"].lower() != "jwt": |
| 90 pass |
| 91 else: |
| 92 try: |
| 93 _msg = json.loads(_msg) |
| 94 except ValueError: |
| 95 pass |
| 96 |
| 97 return _msg |
OLD | NEW |