| OLD | NEW |
| (Empty) | |
| 1 # Copyright 2016 Google Inc. All rights reserved. |
| 2 # |
| 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 # you may not use this file except in compliance with the License. |
| 5 # You may obtain a copy of the License at |
| 6 # |
| 7 # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 # |
| 9 # Unless required by applicable law or agreed to in writing, software |
| 10 # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 # See the License for the specific language governing permissions and |
| 13 # limitations under the License. |
| 14 |
| 15 """Pure Python crypto-related routines for oauth2client. |
| 16 |
| 17 Uses the ``rsa``, ``pyasn1`` and ``pyasn1_modules`` packages |
| 18 to parse PEM files storing PKCS#1 or PKCS#8 keys as well as |
| 19 certificates. |
| 20 """ |
| 21 |
| 22 from pyasn1.codec.der import decoder |
| 23 from pyasn1_modules import pem |
| 24 from pyasn1_modules.rfc2459 import Certificate |
| 25 from pyasn1_modules.rfc5208 import PrivateKeyInfo |
| 26 import rsa |
| 27 import six |
| 28 |
| 29 from oauth2client._helpers import _from_bytes |
| 30 from oauth2client._helpers import _to_bytes |
| 31 |
| 32 |
| 33 _PKCS12_ERROR = r"""\ |
| 34 PKCS12 format is not supported by the RSA library. |
| 35 Either install PyOpenSSL, or please convert .p12 format |
| 36 to .pem format: |
| 37 $ cat key.p12 | \ |
| 38 > openssl pkcs12 -nodes -nocerts -passin pass:notasecret | \ |
| 39 > openssl rsa > key.pem |
| 40 """ |
| 41 |
| 42 _POW2 = (128, 64, 32, 16, 8, 4, 2, 1) |
| 43 _PKCS1_MARKER = ('-----BEGIN RSA PRIVATE KEY-----', |
| 44 '-----END RSA PRIVATE KEY-----') |
| 45 _PKCS8_MARKER = ('-----BEGIN PRIVATE KEY-----', |
| 46 '-----END PRIVATE KEY-----') |
| 47 _PKCS8_SPEC = PrivateKeyInfo() |
| 48 |
| 49 |
| 50 def _bit_list_to_bytes(bit_list): |
| 51 """Converts an iterable of 1's and 0's to bytes. |
| 52 |
| 53 Combines the list 8 at a time, treating each group of 8 bits |
| 54 as a single byte. |
| 55 """ |
| 56 num_bits = len(bit_list) |
| 57 byte_vals = bytearray() |
| 58 for start in six.moves.xrange(0, num_bits, 8): |
| 59 curr_bits = bit_list[start:start + 8] |
| 60 char_val = sum(val * digit |
| 61 for val, digit in zip(_POW2, curr_bits)) |
| 62 byte_vals.append(char_val) |
| 63 return bytes(byte_vals) |
| 64 |
| 65 |
| 66 class RsaVerifier(object): |
| 67 """Verifies the signature on a message. |
| 68 |
| 69 Args: |
| 70 pubkey: rsa.key.PublicKey (or equiv), The public key to verify with. |
| 71 """ |
| 72 |
| 73 def __init__(self, pubkey): |
| 74 self._pubkey = pubkey |
| 75 |
| 76 def verify(self, message, signature): |
| 77 """Verifies a message against a signature. |
| 78 |
| 79 Args: |
| 80 message: string or bytes, The message to verify. If string, will be |
| 81 encoded to bytes as utf-8. |
| 82 signature: string or bytes, The signature on the message. If |
| 83 string, will be encoded to bytes as utf-8. |
| 84 |
| 85 Returns: |
| 86 True if message was signed by the private key associated with the |
| 87 public key that this object was constructed with. |
| 88 """ |
| 89 message = _to_bytes(message, encoding='utf-8') |
| 90 try: |
| 91 return rsa.pkcs1.verify(message, signature, self._pubkey) |
| 92 except (ValueError, rsa.pkcs1.VerificationError): |
| 93 return False |
| 94 |
| 95 @classmethod |
| 96 def from_string(cls, key_pem, is_x509_cert): |
| 97 """Construct an RsaVerifier instance from a string. |
| 98 |
| 99 Args: |
| 100 key_pem: string, public key in PEM format. |
| 101 is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it |
| 102 is expected to be an RSA key in PEM format. |
| 103 |
| 104 Returns: |
| 105 RsaVerifier instance. |
| 106 |
| 107 Raises: |
| 108 ValueError: if the key_pem can't be parsed. In either case, error |
| 109 will begin with 'No PEM start marker'. If |
| 110 ``is_x509_cert`` is True, will fail to find the |
| 111 "-----BEGIN CERTIFICATE-----" error, otherwise fails |
| 112 to find "-----BEGIN RSA PUBLIC KEY-----". |
| 113 """ |
| 114 key_pem = _to_bytes(key_pem) |
| 115 if is_x509_cert: |
| 116 der = rsa.pem.load_pem(key_pem, 'CERTIFICATE') |
| 117 asn1_cert, remaining = decoder.decode(der, asn1Spec=Certificate()) |
| 118 if remaining != b'': |
| 119 raise ValueError('Unused bytes', remaining) |
| 120 |
| 121 cert_info = asn1_cert['tbsCertificate']['subjectPublicKeyInfo'] |
| 122 key_bytes = _bit_list_to_bytes(cert_info['subjectPublicKey']) |
| 123 pubkey = rsa.PublicKey.load_pkcs1(key_bytes, 'DER') |
| 124 else: |
| 125 pubkey = rsa.PublicKey.load_pkcs1(key_pem, 'PEM') |
| 126 return cls(pubkey) |
| 127 |
| 128 |
| 129 class RsaSigner(object): |
| 130 """Signs messages with a private key. |
| 131 |
| 132 Args: |
| 133 pkey: rsa.key.PrivateKey (or equiv), The private key to sign with. |
| 134 """ |
| 135 |
| 136 def __init__(self, pkey): |
| 137 self._key = pkey |
| 138 |
| 139 def sign(self, message): |
| 140 """Signs a message. |
| 141 |
| 142 Args: |
| 143 message: bytes, Message to be signed. |
| 144 |
| 145 Returns: |
| 146 string, The signature of the message for the given key. |
| 147 """ |
| 148 message = _to_bytes(message, encoding='utf-8') |
| 149 return rsa.pkcs1.sign(message, self._key, 'SHA-256') |
| 150 |
| 151 @classmethod |
| 152 def from_string(cls, key, password='notasecret'): |
| 153 """Construct an RsaSigner instance from a string. |
| 154 |
| 155 Args: |
| 156 key: string, private key in PEM format. |
| 157 password: string, password for private key file. Unused for PEM |
| 158 files. |
| 159 |
| 160 Returns: |
| 161 RsaSigner instance. |
| 162 |
| 163 Raises: |
| 164 ValueError if the key cannot be parsed as PKCS#1 or PKCS#8 in |
| 165 PEM format. |
| 166 """ |
| 167 key = _from_bytes(key) # pem expects str in Py3 |
| 168 marker_id, key_bytes = pem.readPemBlocksFromFile( |
| 169 six.StringIO(key), _PKCS1_MARKER, _PKCS8_MARKER) |
| 170 |
| 171 if marker_id == 0: |
| 172 pkey = rsa.key.PrivateKey.load_pkcs1(key_bytes, |
| 173 format='DER') |
| 174 elif marker_id == 1: |
| 175 key_info, remaining = decoder.decode( |
| 176 key_bytes, asn1Spec=_PKCS8_SPEC) |
| 177 if remaining != b'': |
| 178 raise ValueError('Unused bytes', remaining) |
| 179 pkey_info = key_info.getComponentByName('privateKey') |
| 180 pkey = rsa.key.PrivateKey.load_pkcs1(pkey_info.asOctets(), |
| 181 format='DER') |
| 182 else: |
| 183 raise ValueError('No key could be detected.') |
| 184 |
| 185 return cls(pkey) |
| OLD | NEW |