| Index: tools/telemetry/third_party/webpagereplay/sslproxy_test.py
|
| diff --git a/tools/telemetry/third_party/webpagereplay/sslproxy_test.py b/tools/telemetry/third_party/webpagereplay/sslproxy_test.py
|
| deleted file mode 100644
|
| index faea0bddcee9218f745bf82cd1165b96ed54f45f..0000000000000000000000000000000000000000
|
| --- a/tools/telemetry/third_party/webpagereplay/sslproxy_test.py
|
| +++ /dev/null
|
| @@ -1,194 +0,0 @@
|
| -# Copyright 2014 Google Inc. All Rights Reserved.
|
| -#
|
| -# Licensed under the Apache License, Version 2.0 (the "License");
|
| -# you may not use this file except in compliance with the License.
|
| -# You may obtain a copy of the License at
|
| -#
|
| -# http://www.apache.org/licenses/LICENSE-2.0
|
| -#
|
| -# Unless required by applicable law or agreed to in writing, software
|
| -# distributed under the License is distributed on an "AS IS" BASIS,
|
| -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| -# See the License for the specific language governing permissions and
|
| -# limitations under the License.
|
| -
|
| -"""Test routines to generate dummy certificates."""
|
| -
|
| -import BaseHTTPServer
|
| -import shutil
|
| -import signal
|
| -import socket
|
| -import tempfile
|
| -import threading
|
| -import time
|
| -import unittest
|
| -
|
| -import certutils
|
| -import sslproxy
|
| -
|
| -
|
| -class Client(object):
|
| -
|
| - def __init__(self, ca_cert_path, verify_cb, port, host_name='foo.com',
|
| - host='localhost'):
|
| - self.host_name = host_name
|
| - self.verify_cb = verify_cb
|
| - self.ca_cert_path = ca_cert_path
|
| - self.port = port
|
| - self.host_name = host_name
|
| - self.host = host
|
| - self.connection = None
|
| -
|
| - def run_request(self):
|
| - context = certutils.get_ssl_context()
|
| - context.set_verify(certutils.VERIFY_PEER, self.verify_cb) # Demand a cert
|
| - context.use_certificate_file(self.ca_cert_path)
|
| - context.load_verify_locations(self.ca_cert_path)
|
| -
|
| - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
| - self.connection = certutils.get_ssl_connection(context, s)
|
| - self.connection.connect((self.host, self.port))
|
| - self.connection.set_tlsext_host_name(self.host_name)
|
| -
|
| - try:
|
| - self.connection.send('\r\n\r\n')
|
| - finally:
|
| - self.connection.shutdown()
|
| - self.connection.close()
|
| -
|
| -
|
| -class Handler(BaseHTTPServer.BaseHTTPRequestHandler):
|
| - protocol_version = 'HTTP/1.1' # override BaseHTTPServer setting
|
| -
|
| - def handle_one_request(self):
|
| - """Handle a single HTTP request."""
|
| - self.raw_requestline = self.rfile.readline(65537)
|
| -
|
| -
|
| -class WrappedErrorHandler(Handler):
|
| - """Wraps handler to verify expected sslproxy errors are being raised."""
|
| -
|
| - def setup(self):
|
| - Handler.setup(self)
|
| - try:
|
| - sslproxy._SetUpUsingDummyCert(self)
|
| - except certutils.Error:
|
| - self.server.error_function = certutils.Error
|
| -
|
| - def finish(self):
|
| - Handler.finish(self)
|
| - self.connection.shutdown()
|
| - self.connection.close()
|
| -
|
| -
|
| -class DummyArchive(object):
|
| -
|
| - def __init__(self):
|
| - pass
|
| -
|
| -
|
| -class DummyFetch(object):
|
| -
|
| - def __init__(self):
|
| - self.http_archive = DummyArchive()
|
| -
|
| -
|
| -class Server(BaseHTTPServer.HTTPServer):
|
| - """SSL server."""
|
| -
|
| - def __init__(self, ca_cert_path, use_error_handler=False, port=0,
|
| - host='localhost'):
|
| - self.ca_cert_path = ca_cert_path
|
| - with open(ca_cert_path, 'r') as ca_file:
|
| - self.ca_cert_str = ca_file.read()
|
| - self.http_archive_fetch = DummyFetch()
|
| - if use_error_handler:
|
| - self.HANDLER = WrappedErrorHandler
|
| - else:
|
| - self.HANDLER = sslproxy.wrap_handler(Handler)
|
| - try:
|
| - BaseHTTPServer.HTTPServer.__init__(self, (host, port), self.HANDLER)
|
| - except Exception, e:
|
| - raise RuntimeError('Could not start HTTPSServer on port %d: %s'
|
| - % (port, e))
|
| -
|
| - def __enter__(self):
|
| - thread = threading.Thread(target=self.serve_forever)
|
| - thread.daemon = True
|
| - thread.start()
|
| - return self
|
| -
|
| - def cleanup(self):
|
| - try:
|
| - self.shutdown()
|
| - except KeyboardInterrupt:
|
| - pass
|
| -
|
| - def __exit__(self, type_, value_, traceback_):
|
| - self.cleanup()
|
| -
|
| - def get_certificate(self, host):
|
| - return certutils.generate_cert(self.ca_cert_str, '', host)
|
| -
|
| -
|
| -class TestClient(unittest.TestCase):
|
| - _temp_dir = None
|
| -
|
| - def setUp(self):
|
| - self._temp_dir = tempfile.mkdtemp(prefix='sslproxy_', dir='/tmp')
|
| - self.ca_cert_path = self._temp_dir + 'testCA.pem'
|
| - self.cert_path = self._temp_dir + 'testCA-cert.cer'
|
| - self.wrong_ca_cert_path = self._temp_dir + 'wrong.pem'
|
| - self.wrong_cert_path = self._temp_dir + 'wrong-cert.cer'
|
| -
|
| - # Write both pem and cer files for certificates
|
| - certutils.write_dummy_ca_cert(*certutils.generate_dummy_ca_cert(),
|
| - cert_path=self.ca_cert_path)
|
| - certutils.write_dummy_ca_cert(*certutils.generate_dummy_ca_cert(),
|
| - cert_path=self.ca_cert_path)
|
| -
|
| - def tearDown(self):
|
| - if self._temp_dir:
|
| - shutil.rmtree(self._temp_dir)
|
| -
|
| - def verify_cb(self, conn, cert, errnum, depth, ok):
|
| - """A callback that verifies the certificate authentication worked.
|
| -
|
| - Args:
|
| - conn: Connection object
|
| - cert: x509 object
|
| - errnum: possible error number
|
| - depth: error depth
|
| - ok: 1 if the authentication worked 0 if it didnt.
|
| - Returns:
|
| - 1 or 0 depending on if the verification worked
|
| - """
|
| - self.assertFalse(cert.has_expired())
|
| - self.assertGreater(time.strftime('%Y%m%d%H%M%SZ', time.gmtime()),
|
| - cert.get_notBefore())
|
| - return ok
|
| -
|
| - def test_no_host(self):
|
| - with Server(self.ca_cert_path) as server:
|
| - c = Client(self.cert_path, self.verify_cb, server.server_port, '')
|
| - self.assertRaises(certutils.Error, c.run_request)
|
| -
|
| - def test_client_connection(self):
|
| - with Server(self.ca_cert_path) as server:
|
| - c = Client(self.cert_path, self.verify_cb, server.server_port, 'foo.com')
|
| - c.run_request()
|
| -
|
| - c = Client(self.cert_path, self.verify_cb, server.server_port,
|
| - 'random.host')
|
| - c.run_request()
|
| -
|
| - def test_wrong_cert(self):
|
| - with Server(self.ca_cert_path, True) as server:
|
| - c = Client(self.wrong_cert_path, self.verify_cb, server.server_port,
|
| - 'foo.com')
|
| - self.assertRaises(certutils.Error, c.run_request)
|
| -
|
| -
|
| -if __name__ == '__main__':
|
| - signal.signal(signal.SIGINT, signal.SIG_DFL) # Exit on Ctrl-C
|
| - unittest.main()
|
|
|