| Index: tools/telemetry/third_party/webpagereplay/certutils.py
|
| diff --git a/tools/telemetry/third_party/webpagereplay/certutils.py b/tools/telemetry/third_party/webpagereplay/certutils.py
|
| deleted file mode 100644
|
| index c64e4e0dc6bd566a027d21f259d204bf645601cf..0000000000000000000000000000000000000000
|
| --- a/tools/telemetry/third_party/webpagereplay/certutils.py
|
| +++ /dev/null
|
| @@ -1,256 +0,0 @@
|
| -# Copyright 2014 Google Inc. All Rights Reserved.
|
| -#
|
| -# Licensed under the Apache License, Version 2.0 (the "License");
|
| -# you may not use this file except in compliance with the License.
|
| -# You may obtain a copy of the License at
|
| -#
|
| -# http://www.apache.org/licenses/LICENSE-2.0
|
| -#
|
| -# Unless required by applicable law or agreed to in writing, software
|
| -# distributed under the License is distributed on an "AS IS" BASIS,
|
| -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| -# See the License for the specific language governing permissions and
|
| -# limitations under the License.
|
| -
|
| -"""Routines to generate root and server certificates.
|
| -
|
| -Certificate Naming Conventions:
|
| - ca_cert: crypto.X509 for the certificate authority (w/ both the pub &
|
| - priv keys)
|
| - cert: a crypto.X509 certificate (w/ just the pub key)
|
| - cert_str: a certificate string (w/ just the pub cert)
|
| - key: a private crypto.PKey (from ca or pem)
|
| - ca_cert_str: a certificae authority string (w/ both the pub & priv certs)
|
| -"""
|
| -
|
| -import logging
|
| -import os
|
| -import socket
|
| -import time
|
| -
|
| -openssl_import_error = None
|
| -
|
| -Error = None
|
| -SSL_METHOD = None
|
| -SysCallError = None
|
| -VERIFY_PEER = None
|
| -ZeroReturnError = None
|
| -FILETYPE_PEM = None
|
| -
|
| -try:
|
| - from OpenSSL import crypto, SSL
|
| -
|
| - Error = SSL.Error
|
| - SSL_METHOD = SSL.SSLv23_METHOD
|
| - SysCallError = SSL.SysCallError
|
| - VERIFY_PEER = SSL.VERIFY_PEER
|
| - ZeroReturnError = SSL.ZeroReturnError
|
| - FILETYPE_PEM = crypto.FILETYPE_PEM
|
| -except ImportError, e:
|
| - openssl_import_error = e
|
| -
|
| -
|
| -def get_ssl_context(method=SSL_METHOD):
|
| - # One of: One of SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, or TLSv1_METHOD
|
| - if openssl_import_error:
|
| - raise openssl_import_error # pylint: disable=raising-bad-type
|
| - return SSL.Context(method)
|
| -
|
| -
|
| -class WrappedConnection(object):
|
| -
|
| - def __init__(self, obj):
|
| - self._wrapped_obj = obj
|
| -
|
| - def __getattr__(self, attr):
|
| - if attr in self.__dict__:
|
| - return getattr(self, attr)
|
| - return getattr(self._wrapped_obj, attr)
|
| -
|
| - def recv(self, buflen=1024, flags=0):
|
| - try:
|
| - return self._wrapped_obj.recv(buflen, flags)
|
| - except SSL.SysCallError, e:
|
| - if e.args[1] == 'Unexpected EOF':
|
| - return ''
|
| - raise
|
| - except SSL.ZeroReturnError:
|
| - return ''
|
| -
|
| -
|
| -def get_ssl_connection(context, connection):
|
| - return WrappedConnection(SSL.Connection(context, connection))
|
| -
|
| -
|
| -def load_privatekey(key, filetype=FILETYPE_PEM):
|
| - """Loads obj private key object from string."""
|
| - return crypto.load_privatekey(filetype, key)
|
| -
|
| -
|
| -def load_cert(cert_str, filetype=FILETYPE_PEM):
|
| - """Loads obj cert object from string."""
|
| - return crypto.load_certificate(filetype, cert_str)
|
| -
|
| -
|
| -def _dump_privatekey(key, filetype=FILETYPE_PEM):
|
| - """Dumps obj private key object to string."""
|
| - return crypto.dump_privatekey(filetype, key)
|
| -
|
| -
|
| -def _dump_cert(cert, filetype=FILETYPE_PEM):
|
| - """Dumps obj cert object to string."""
|
| - return crypto.dump_certificate(filetype, cert)
|
| -
|
| -
|
| -def generate_dummy_ca_cert(subject='_WebPageReplayCert'):
|
| - """Generates dummy certificate authority.
|
| -
|
| - Args:
|
| - subject: a string representing the desired root cert issuer
|
| - Returns:
|
| - A tuple of the public key and the private key strings for the root
|
| - certificate
|
| - """
|
| - if openssl_import_error:
|
| - raise openssl_import_error # pylint: disable=raising-bad-type
|
| -
|
| - key = crypto.PKey()
|
| - key.generate_key(crypto.TYPE_RSA, 1024)
|
| -
|
| - ca_cert = crypto.X509()
|
| - ca_cert.set_serial_number(int(time.time()*10000))
|
| - ca_cert.set_version(2)
|
| - ca_cert.get_subject().CN = subject
|
| - ca_cert.get_subject().O = subject
|
| - ca_cert.gmtime_adj_notBefore(-60 * 60 * 24 * 365 * 2)
|
| - ca_cert.gmtime_adj_notAfter(60 * 60 * 24 * 365 * 2)
|
| - ca_cert.set_issuer(ca_cert.get_subject())
|
| - ca_cert.set_pubkey(key)
|
| - ca_cert.add_extensions([
|
| - crypto.X509Extension('basicConstraints', True, 'CA:TRUE'),
|
| - crypto.X509Extension('nsCertType', True, 'sslCA'),
|
| - crypto.X509Extension('extendedKeyUsage', True,
|
| - ('serverAuth,clientAuth,emailProtection,'
|
| - 'timeStamping,msCodeInd,msCodeCom,msCTLSign,'
|
| - 'msSGC,msEFS,nsSGC')),
|
| - crypto.X509Extension('keyUsage', False, 'keyCertSign, cRLSign'),
|
| - crypto.X509Extension('subjectKeyIdentifier', False, 'hash',
|
| - subject=ca_cert),
|
| - ])
|
| - ca_cert.sign(key, 'sha256')
|
| - key_str = _dump_privatekey(key)
|
| - ca_cert_str = _dump_cert(ca_cert)
|
| - return ca_cert_str, key_str
|
| -
|
| -
|
| -def get_host_cert(host, port=443):
|
| - """Contacts the host and returns its certificate."""
|
| - host_certs = []
|
| - def verify_cb(conn, cert, errnum, depth, ok):
|
| - host_certs.append(cert)
|
| - # Return True to indicates that the certificate was ok.
|
| - return True
|
| -
|
| - context = SSL.Context(SSL.SSLv23_METHOD)
|
| - context.set_verify(SSL.VERIFY_PEER, verify_cb) # Demand a certificate
|
| - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
| - connection = SSL.Connection(context, s)
|
| - try:
|
| - connection.connect((host, port))
|
| - connection.send('')
|
| - except SSL.SysCallError:
|
| - pass
|
| - except socket.gaierror:
|
| - logging.debug('Host name is not valid')
|
| - finally:
|
| - connection.shutdown()
|
| - connection.close()
|
| - if not host_certs:
|
| - logging.warning('Unable to get host certificate from %s:%s', host, port)
|
| - return ''
|
| - return _dump_cert(host_certs[-1])
|
| -
|
| -
|
| -def write_dummy_ca_cert(ca_cert_str, key_str, cert_path):
|
| - """Writes four certificate files.
|
| -
|
| - For example, if cert_path is "mycert.pem":
|
| - mycert.pem - CA plus private key
|
| - mycert-cert.pem - CA in PEM format
|
| - mycert-cert.cer - CA for Android
|
| - mycert-cert.p12 - CA in PKCS12 format for Windows devices
|
| - Args:
|
| - cert_path: path string such as "mycert.pem"
|
| - ca_cert_str: certificate string
|
| - key_str: private key string
|
| - """
|
| - dirname = os.path.dirname(cert_path)
|
| - if dirname and not os.path.exists(dirname):
|
| - os.makedirs(dirname)
|
| -
|
| - root_path = os.path.splitext(cert_path)[0]
|
| - ca_cert_path = root_path + '-cert.pem'
|
| - android_cer_path = root_path + '-cert.cer'
|
| - windows_p12_path = root_path + '-cert.p12'
|
| -
|
| - # Dump the CA plus private key
|
| - with open(cert_path, 'w') as f:
|
| - f.write(key_str)
|
| - f.write(ca_cert_str)
|
| -
|
| - # Dump the certificate in PEM format
|
| - with open(ca_cert_path, 'w') as f:
|
| - f.write(ca_cert_str)
|
| -
|
| - # Create a .cer file with the same contents for Android
|
| - with open(android_cer_path, 'w') as f:
|
| - f.write(ca_cert_str)
|
| -
|
| - ca_cert = load_cert(ca_cert_str)
|
| - key = load_privatekey(key_str)
|
| - # Dump the certificate in PKCS12 format for Windows devices
|
| - with open(windows_p12_path, 'w') as f:
|
| - p12 = crypto.PKCS12()
|
| - p12.set_certificate(ca_cert)
|
| - p12.set_privatekey(key)
|
| - f.write(p12.export())
|
| -
|
| -
|
| -def generate_cert(root_ca_cert_str, server_cert_str, server_host):
|
| - """Generates a cert_str with the sni field in server_cert_str signed by the
|
| - root_ca_cert_str.
|
| -
|
| - Args:
|
| - root_ca_cert_str: PEM formatted string representing the root cert
|
| - server_cert_str: PEM formatted string representing cert
|
| - server_host: host name to use if there is no server_cert_str
|
| - Returns:
|
| - a PEM formatted certificate string
|
| - """
|
| - if openssl_import_error:
|
| - raise openssl_import_error # pylint: disable=raising-bad-type
|
| -
|
| - common_name = server_host
|
| - if server_cert_str:
|
| - cert = load_cert(server_cert_str)
|
| - common_name = cert.get_subject().commonName
|
| - else:
|
| - cert = crypto.X509()
|
| -
|
| - ca_cert = load_cert(root_ca_cert_str)
|
| - key = load_privatekey(root_ca_cert_str)
|
| -
|
| - req = crypto.X509Req()
|
| - req.get_subject().CN = common_name
|
| - req.set_pubkey(ca_cert.get_pubkey())
|
| - req.sign(key, 'sha256')
|
| -
|
| - cert.gmtime_adj_notBefore(-60 * 60)
|
| - cert.gmtime_adj_notAfter(60 * 60 * 24 * 30)
|
| - cert.set_issuer(ca_cert.get_subject())
|
| - cert.set_subject(req.get_subject())
|
| - cert.set_serial_number(int(time.time()*10000))
|
| - cert.set_pubkey(req.get_pubkey())
|
| - cert.sign(key, 'sha256')
|
| -
|
| - return _dump_cert(cert)
|
|
|