| OLD | NEW |
| (Empty) |
| 1 # Copyright 2014 Google Inc. All Rights Reserved. | |
| 2 # | |
| 3 # Licensed under the Apache License, Version 2.0 (the "License"); | |
| 4 # you may not use this file except in compliance with the License. | |
| 5 # You may obtain a copy of the License at | |
| 6 # | |
| 7 # http://www.apache.org/licenses/LICENSE-2.0 | |
| 8 # | |
| 9 # Unless required by applicable law or agreed to in writing, software | |
| 10 # distributed under the License is distributed on an "AS IS" BASIS, | |
| 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 12 # See the License for the specific language governing permissions and | |
| 13 # limitations under the License. | |
| 14 | |
| 15 """Routines to generate root and server certificates. | |
| 16 | |
| 17 Certificate Naming Conventions: | |
| 18 ca_cert: crypto.X509 for the certificate authority (w/ both the pub & | |
| 19 priv keys) | |
| 20 cert: a crypto.X509 certificate (w/ just the pub key) | |
| 21 cert_str: a certificate string (w/ just the pub cert) | |
| 22 key: a private crypto.PKey (from ca or pem) | |
| 23 ca_cert_str: a certificae authority string (w/ both the pub & priv certs) | |
| 24 """ | |
| 25 | |
| 26 import logging | |
| 27 import os | |
| 28 import platform | |
| 29 import socket | |
| 30 import subprocess | |
| 31 import time | |
| 32 | |
| 33 openssl_import_error = None | |
| 34 | |
| 35 Error = None | |
| 36 SSL_METHOD = None | |
| 37 SysCallError = None | |
| 38 VERIFY_PEER = None | |
| 39 ZeroReturnError = None | |
| 40 FILETYPE_PEM = None | |
| 41 | |
| 42 try: | |
| 43 from OpenSSL import crypto, SSL | |
| 44 | |
| 45 Error = SSL.Error | |
| 46 SSL_METHOD = SSL.SSLv23_METHOD | |
| 47 SysCallError = SSL.SysCallError | |
| 48 VERIFY_PEER = SSL.VERIFY_PEER | |
| 49 ZeroReturnError = SSL.ZeroReturnError | |
| 50 FILETYPE_PEM = crypto.FILETYPE_PEM | |
| 51 except ImportError, e: | |
| 52 openssl_import_error = e | |
| 53 | |
| 54 | |
| 55 def get_ssl_context(method=SSL_METHOD): | |
| 56 # One of: One of SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, or TLSv1_METHOD | |
| 57 if openssl_import_error: | |
| 58 raise openssl_import_error # pylint: disable=raising-bad-type | |
| 59 return SSL.Context(method) | |
| 60 | |
| 61 | |
| 62 class WrappedConnection(object): | |
| 63 | |
| 64 def __init__(self, obj): | |
| 65 self._wrapped_obj = obj | |
| 66 | |
| 67 def __getattr__(self, attr): | |
| 68 if attr in self.__dict__: | |
| 69 return getattr(self, attr) | |
| 70 return getattr(self._wrapped_obj, attr) | |
| 71 | |
| 72 def recv(self, buflen=1024, flags=0): | |
| 73 try: | |
| 74 return self._wrapped_obj.recv(buflen, flags) | |
| 75 except SSL.SysCallError, e: | |
| 76 if e.args[1] == 'Unexpected EOF': | |
| 77 return '' | |
| 78 raise | |
| 79 except SSL.ZeroReturnError: | |
| 80 return '' | |
| 81 | |
| 82 | |
| 83 def get_ssl_connection(context, connection): | |
| 84 return WrappedConnection(SSL.Connection(context, connection)) | |
| 85 | |
| 86 | |
| 87 def load_privatekey(key, filetype=FILETYPE_PEM): | |
| 88 """Loads obj private key object from string.""" | |
| 89 return crypto.load_privatekey(filetype, key) | |
| 90 | |
| 91 | |
| 92 def load_cert(cert_str, filetype=FILETYPE_PEM): | |
| 93 """Loads obj cert object from string.""" | |
| 94 return crypto.load_certificate(filetype, cert_str) | |
| 95 | |
| 96 | |
| 97 def _dump_privatekey(key, filetype=FILETYPE_PEM): | |
| 98 """Dumps obj private key object to string.""" | |
| 99 return crypto.dump_privatekey(filetype, key) | |
| 100 | |
| 101 | |
| 102 def _dump_cert(cert, filetype=FILETYPE_PEM): | |
| 103 """Dumps obj cert object to string.""" | |
| 104 return crypto.dump_certificate(filetype, cert) | |
| 105 | |
| 106 | |
| 107 def generate_dummy_ca_cert(subject='_WebPageReplayCert'): | |
| 108 """Generates dummy certificate authority. | |
| 109 | |
| 110 Args: | |
| 111 subject: a string representing the desired root cert issuer | |
| 112 Returns: | |
| 113 A tuple of the public key and the private key strings for the root | |
| 114 certificate | |
| 115 """ | |
| 116 if openssl_import_error: | |
| 117 raise openssl_import_error # pylint: disable=raising-bad-type | |
| 118 | |
| 119 key = crypto.PKey() | |
| 120 key.generate_key(crypto.TYPE_RSA, 1024) | |
| 121 | |
| 122 ca_cert = crypto.X509() | |
| 123 ca_cert.set_serial_number(int(time.time()*10000)) | |
| 124 ca_cert.set_version(2) | |
| 125 ca_cert.get_subject().CN = subject | |
| 126 ca_cert.get_subject().O = subject | |
| 127 ca_cert.gmtime_adj_notBefore(-60 * 60 * 24 * 365 * 2) | |
| 128 ca_cert.gmtime_adj_notAfter(60 * 60 * 24 * 365 * 2) | |
| 129 ca_cert.set_issuer(ca_cert.get_subject()) | |
| 130 ca_cert.set_pubkey(key) | |
| 131 ca_cert.add_extensions([ | |
| 132 crypto.X509Extension('basicConstraints', True, 'CA:TRUE'), | |
| 133 crypto.X509Extension('nsCertType', True, 'sslCA'), | |
| 134 crypto.X509Extension('extendedKeyUsage', True, | |
| 135 ('serverAuth,clientAuth,emailProtection,' | |
| 136 'timeStamping,msCodeInd,msCodeCom,msCTLSign,' | |
| 137 'msSGC,msEFS,nsSGC')), | |
| 138 crypto.X509Extension('keyUsage', False, 'keyCertSign, cRLSign'), | |
| 139 crypto.X509Extension('subjectKeyIdentifier', False, 'hash', | |
| 140 subject=ca_cert), | |
| 141 ]) | |
| 142 ca_cert.sign(key, 'sha256') | |
| 143 key_str = _dump_privatekey(key) | |
| 144 ca_cert_str = _dump_cert(ca_cert) | |
| 145 return ca_cert_str, key_str | |
| 146 | |
| 147 | |
| 148 def get_host_cert(host, port=443): | |
| 149 """Contacts the host and returns its certificate.""" | |
| 150 host_certs = [] | |
| 151 def verify_cb(conn, cert, errnum, depth, ok): | |
| 152 host_certs.append(cert) | |
| 153 # Return True to indicates that the certificate was ok. | |
| 154 return True | |
| 155 | |
| 156 context = SSL.Context(SSL.SSLv23_METHOD) | |
| 157 context.set_verify(SSL.VERIFY_PEER, verify_cb) # Demand a certificate | |
| 158 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| 159 connection = SSL.Connection(context, s) | |
| 160 try: | |
| 161 connection.connect((host, port)) | |
| 162 connection.send('') | |
| 163 except SSL.SysCallError: | |
| 164 pass | |
| 165 except socket.gaierror: | |
| 166 logging.debug('Host name is not valid') | |
| 167 finally: | |
| 168 connection.shutdown() | |
| 169 connection.close() | |
| 170 if not host_certs: | |
| 171 logging.warning('Unable to get host certificate from %s:%s', host, port) | |
| 172 return '' | |
| 173 return _dump_cert(host_certs[-1]) | |
| 174 | |
| 175 | |
| 176 def write_dummy_ca_cert(ca_cert_str, key_str, cert_path): | |
| 177 """Writes four certificate files. | |
| 178 | |
| 179 For example, if cert_path is "mycert.pem": | |
| 180 mycert.pem - CA plus private key | |
| 181 mycert-cert.pem - CA in PEM format | |
| 182 mycert-cert.cer - CA for Android | |
| 183 mycert-cert.p12 - CA in PKCS12 format for Windows devices | |
| 184 Args: | |
| 185 cert_path: path string such as "mycert.pem" | |
| 186 ca_cert_str: certificate string | |
| 187 key_str: private key string | |
| 188 """ | |
| 189 dirname = os.path.dirname(cert_path) | |
| 190 if dirname and not os.path.exists(dirname): | |
| 191 os.makedirs(dirname) | |
| 192 | |
| 193 root_path = os.path.splitext(cert_path)[0] | |
| 194 ca_cert_path = root_path + '-cert.pem' | |
| 195 android_cer_path = root_path + '-cert.cer' | |
| 196 windows_p12_path = root_path + '-cert.p12' | |
| 197 | |
| 198 # Dump the CA plus private key | |
| 199 with open(cert_path, 'w') as f: | |
| 200 f.write(key_str) | |
| 201 f.write(ca_cert_str) | |
| 202 | |
| 203 # Dump the certificate in PEM format | |
| 204 with open(ca_cert_path, 'w') as f: | |
| 205 f.write(ca_cert_str) | |
| 206 | |
| 207 # Create a .cer file with the same contents for Android | |
| 208 with open(android_cer_path, 'w') as f: | |
| 209 f.write(ca_cert_str) | |
| 210 | |
| 211 ca_cert = load_cert(ca_cert_str) | |
| 212 key = load_privatekey(key_str) | |
| 213 # Dump the certificate in PKCS12 format for Windows devices | |
| 214 with open(windows_p12_path, 'w') as f: | |
| 215 p12 = crypto.PKCS12() | |
| 216 p12.set_certificate(ca_cert) | |
| 217 p12.set_privatekey(key) | |
| 218 f.write(p12.export()) | |
| 219 | |
| 220 | |
| 221 def generate_cert(root_ca_cert_str, server_cert_str, server_host): | |
| 222 """Generates a cert_str with the sni field in server_cert_str signed by the | |
| 223 root_ca_cert_str. | |
| 224 | |
| 225 Args: | |
| 226 root_ca_cert_str: PEM formatted string representing the root cert | |
| 227 server_cert_str: PEM formatted string representing cert | |
| 228 server_host: host name to use if there is no server_cert_str | |
| 229 Returns: | |
| 230 a PEM formatted certificate string | |
| 231 """ | |
| 232 EXTENSION_WHITELIST = set(['subjectAltName']) | |
| 233 | |
| 234 if openssl_import_error: | |
| 235 raise openssl_import_error # pylint: disable=raising-bad-type | |
| 236 | |
| 237 common_name = server_host | |
| 238 reused_extensions = [] | |
| 239 if server_cert_str: | |
| 240 original_cert = load_cert(server_cert_str) | |
| 241 common_name = original_cert.get_subject().commonName | |
| 242 for i in xrange(original_cert.get_extension_count()): | |
| 243 original_cert_extension = original_cert.get_extension(i) | |
| 244 if original_cert_extension.get_short_name() in EXTENSION_WHITELIST: | |
| 245 reused_extensions.append(original_cert_extension) | |
| 246 | |
| 247 ca_cert = load_cert(root_ca_cert_str) | |
| 248 ca_key = load_privatekey(root_ca_cert_str) | |
| 249 | |
| 250 cert = crypto.X509() | |
| 251 cert.get_subject().CN = common_name | |
| 252 cert.gmtime_adj_notBefore(-60 * 60) | |
| 253 cert.gmtime_adj_notAfter(60 * 60 * 24 * 30) | |
| 254 cert.set_issuer(ca_cert.get_subject()) | |
| 255 cert.set_serial_number(int(time.time()*10000)) | |
| 256 cert.set_pubkey(ca_key) | |
| 257 cert.add_extensions(reused_extensions) | |
| 258 cert.sign(ca_key, 'sha256') | |
| 259 | |
| 260 return _dump_cert(cert) | |
| 261 | |
| 262 | |
| 263 def install_cert_in_nssdb(home_directory_path, certificate_path): | |
| 264 """Installs a certificate into the ~/.pki/nssdb database. | |
| 265 | |
| 266 Args: | |
| 267 home_directory_path: Path of the home directory where to install | |
| 268 certificate_path: Path of a CA in PEM format | |
| 269 """ | |
| 270 assert os.path.isdir(home_directory_path) | |
| 271 assert platform.system() == 'Linux', \ | |
| 272 'SSL certification authority has only been tested for linux.' | |
| 273 if (os.path.abspath(home_directory_path) == | |
| 274 os.path.abspath(os.environ['HOME'])): | |
| 275 raise Exception('Modifying $HOME/.pki/nssdb compromises your machine.') | |
| 276 | |
| 277 cert_database_path = os.path.join(home_directory_path, '.pki', 'nssdb') | |
| 278 def certutil(args): | |
| 279 cmd = ['certutil', '--empty-password', '-d', 'sql:' + cert_database_path] | |
| 280 cmd.extend(args) | |
| 281 logging.info(subprocess.list2cmdline(cmd)) | |
| 282 subprocess.check_call(cmd) | |
| 283 | |
| 284 if not os.path.isdir(cert_database_path): | |
| 285 os.makedirs(cert_database_path) | |
| 286 certutil(['-N']) | |
| 287 | |
| 288 certutil(['-A', '-t', 'PC,,', '-n', certificate_path, '-i', certificate_path]) | |
| OLD | NEW |