| Index: Tools/Scripts/webkitpy/thirdparty/wpt/wpt/tools/sslutils/openssl.py
|
| diff --git a/Tools/Scripts/webkitpy/thirdparty/wpt/wpt/tools/sslutils/openssl.py b/Tools/Scripts/webkitpy/thirdparty/wpt/wpt/tools/sslutils/openssl.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..5b571c0f116d591c6f6d40443b5cf9486a4ae36f
|
| --- /dev/null
|
| +++ b/Tools/Scripts/webkitpy/thirdparty/wpt/wpt/tools/sslutils/openssl.py
|
| @@ -0,0 +1,401 @@
|
| +import functools
|
| +import os
|
| +import shutil
|
| +import subprocess
|
| +import tempfile
|
| +from datetime import datetime
|
| +
|
| +class OpenSSL(object):
|
| + def __init__(self, logger, binary, base_path, conf_path, hosts, duration,
|
| + base_conf_path=None):
|
| + """Context manager for interacting with OpenSSL.
|
| + Creates a config file for the duration of the context.
|
| +
|
| + :param logger: stdlib logger or python structured logger
|
| + :param binary: path to openssl binary
|
| + :param base_path: path to directory for storing certificates
|
| + :param conf_path: path for configuration file storing configuration data
|
| + :param hosts: list of hosts to include in configuration (or None if not
|
| + generating host certificates)
|
| + :param duration: Certificate duration in days"""
|
| +
|
| + self.base_path = base_path
|
| + self.binary = binary
|
| + self.conf_path = conf_path
|
| + self.base_conf_path = base_conf_path
|
| + self.logger = logger
|
| + self.proc = None
|
| + self.cmd = []
|
| + self.hosts = hosts
|
| + self.duration = duration
|
| +
|
| + def __enter__(self):
|
| + with open(self.conf_path, "w") as f:
|
| + f.write(get_config(self.base_path, self.hosts, self.duration))
|
| + return self
|
| +
|
| + def __exit__(self, *args, **kwargs):
|
| + os.unlink(self.conf_path)
|
| +
|
| + def log(self, line):
|
| + if hasattr(self.logger, "process_output"):
|
| + self.logger.process_output(self.proc.pid if self.proc is not None else None,
|
| + line.decode("utf8", "replace"),
|
| + command=" ".join(self.cmd))
|
| + else:
|
| + self.logger.debug(line)
|
| +
|
| + def __call__(self, cmd, *args, **kwargs):
|
| + """Run a command using OpenSSL in the current context.
|
| +
|
| + :param cmd: The openssl subcommand to run
|
| + :param *args: Additional arguments to pass to the command
|
| + """
|
| + self.cmd = [self.binary, cmd]
|
| + if cmd != "x509":
|
| + self.cmd += ["-config", self.conf_path]
|
| + self.cmd += list(args)
|
| +
|
| + env = os.environ.copy()
|
| + if self.base_conf_path is not None:
|
| + env["OPENSSL_CONF"] = self.base_conf_path.encode("utf8")
|
| +
|
| + self.proc = subprocess.Popen(self.cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
|
| + env=env)
|
| + stdout, stderr = self.proc.communicate()
|
| + self.log(stdout)
|
| + if self.proc.returncode != 0:
|
| + raise subprocess.CalledProcessError(self.proc.returncode, self.cmd,
|
| + output=stdout)
|
| +
|
| + self.cmd = []
|
| + self.proc = None
|
| + return stdout
|
| +
|
| +
|
| +def make_subject(common_name,
|
| + country=None,
|
| + state=None,
|
| + locality=None,
|
| + organization=None,
|
| + organization_unit=None):
|
| + args = [("country", "C"),
|
| + ("state", "ST"),
|
| + ("locality", "L"),
|
| + ("organization", "O"),
|
| + ("organization_unit", "OU"),
|
| + ("common_name", "CN")]
|
| +
|
| + rv = []
|
| +
|
| + for var, key in args:
|
| + value = locals()[var]
|
| + if value is not None:
|
| + rv.append("/%s=%s" % (key, value.replace("/", "\\/")))
|
| +
|
| + return "".join(rv)
|
| +
|
| +def make_alt_names(hosts):
|
| + rv = []
|
| + for name in hosts:
|
| + rv.append("DNS:%s" % name)
|
| + return ",".join(rv)
|
| +
|
| +def get_config(root_dir, hosts, duration=30):
|
| + if hosts is None:
|
| + san_line = ""
|
| + else:
|
| + san_line = "subjectAltName = %s" % make_alt_names(hosts)
|
| +
|
| + if os.path.sep == "\\":
|
| + # This seems to be needed for the Shining Light OpenSSL on
|
| + # Windows, at least.
|
| + root_dir = root_dir.replace("\\", "\\\\")
|
| +
|
| + rv = """[ ca ]
|
| +default_ca = CA_default
|
| +
|
| +[ CA_default ]
|
| +dir = %(root_dir)s
|
| +certs = $dir
|
| +new_certs_dir = $certs
|
| +crl_dir = $dir%(sep)scrl
|
| +database = $dir%(sep)sindex.txt
|
| +private_key = $dir%(sep)scakey.pem
|
| +certificate = $dir%(sep)scacert.pem
|
| +serial = $dir%(sep)sserial
|
| +crldir = $dir%(sep)scrl
|
| +crlnumber = $dir%(sep)scrlnumber
|
| +crl = $crldir%(sep)scrl.pem
|
| +RANDFILE = $dir%(sep)sprivate%(sep)s.rand
|
| +x509_extensions = usr_cert
|
| +name_opt = ca_default
|
| +cert_opt = ca_default
|
| +default_days = %(duration)d
|
| +default_crl_days = %(duration)d
|
| +default_md = sha256
|
| +preserve = no
|
| +policy = policy_anything
|
| +copy_extensions = copy
|
| +
|
| +[ policy_anything ]
|
| +countryName = optional
|
| +stateOrProvinceName = optional
|
| +localityName = optional
|
| +organizationName = optional
|
| +organizationalUnitName = optional
|
| +commonName = supplied
|
| +emailAddress = optional
|
| +
|
| +[ req ]
|
| +default_bits = 2048
|
| +default_keyfile = privkey.pem
|
| +distinguished_name = req_distinguished_name
|
| +attributes = req_attributes
|
| +x509_extensions = v3_ca
|
| +
|
| +# Passwords for private keys if not present they will be prompted for
|
| +# input_password = secret
|
| +# output_password = secret
|
| +string_mask = utf8only
|
| +req_extensions = v3_req
|
| +
|
| +[ req_distinguished_name ]
|
| +countryName = Country Name (2 letter code)
|
| +countryName_default = AU
|
| +countryName_min = 2
|
| +countryName_max = 2
|
| +stateOrProvinceName = State or Province Name (full name)
|
| +stateOrProvinceName_default =
|
| +localityName = Locality Name (eg, city)
|
| +0.organizationName = Organization Name
|
| +0.organizationName_default = Web Platform Tests
|
| +organizationalUnitName = Organizational Unit Name (eg, section)
|
| +#organizationalUnitName_default =
|
| +commonName = Common Name (e.g. server FQDN or YOUR name)
|
| +commonName_max = 64
|
| +emailAddress = Email Address
|
| +emailAddress_max = 64
|
| +
|
| +[ req_attributes ]
|
| +
|
| +[ usr_cert ]
|
| +basicConstraints=CA:false
|
| +subjectKeyIdentifier=hash
|
| +authorityKeyIdentifier=keyid,issuer
|
| +
|
| +[ v3_req ]
|
| +basicConstraints = CA:FALSE
|
| +keyUsage = nonRepudiation, digitalSignature, keyEncipherment
|
| +extendedKeyUsage = serverAuth
|
| +%(san_line)s
|
| +
|
| +[ v3_ca ]
|
| +basicConstraints = CA:true
|
| +subjectKeyIdentifier=hash
|
| +authorityKeyIdentifier=keyid:always,issuer:always
|
| +keyUsage = keyCertSign
|
| +""" % {"root_dir": root_dir,
|
| + "san_line": san_line,
|
| + "duration": duration,
|
| + "sep": os.path.sep.replace("\\", "\\\\")}
|
| +
|
| + return rv
|
| +
|
| +class OpenSSLEnvironment(object):
|
| + ssl_enabled = True
|
| +
|
| + def __init__(self, logger, openssl_binary="openssl", base_path=None,
|
| + password="web-platform-tests", force_regenerate=False,
|
| + duration=30, base_conf_path=None):
|
| + """SSL environment that creates a local CA and host certificate using OpenSSL.
|
| +
|
| + By default this will look in base_path for existing certificates that are still
|
| + valid and only create new certificates if there aren't any. This behaviour can
|
| + be adjusted using the force_regenerate option.
|
| +
|
| + :param logger: a stdlib logging compatible logger or mozlog structured logger
|
| + :param openssl_binary: Path to the OpenSSL binary
|
| + :param base_path: Path in which certificates will be stored. If None, a temporary
|
| + directory will be used and removed when the server shuts down
|
| + :param password: Password to use
|
| + :param force_regenerate: Always create a new certificate even if one already exists.
|
| + """
|
| + self.logger = logger
|
| +
|
| + self.temporary = False
|
| + if base_path is None:
|
| + base_path = tempfile.mkdtemp()
|
| + self.temporary = True
|
| +
|
| + self.base_path = os.path.abspath(base_path)
|
| + self.password = password
|
| + self.force_regenerate = force_regenerate
|
| + self.duration = duration
|
| + self.base_conf_path = base_conf_path
|
| +
|
| + self.path = None
|
| + self.binary = openssl_binary
|
| + self.openssl = None
|
| +
|
| + self._ca_cert_path = None
|
| + self._ca_key_path = None
|
| + self.host_certificates = {}
|
| +
|
| + def __enter__(self):
|
| + if not os.path.exists(self.base_path):
|
| + os.makedirs(self.base_path)
|
| +
|
| + path = functools.partial(os.path.join, self.base_path)
|
| +
|
| + with open(path("index.txt"), "w"):
|
| + pass
|
| + with open(path("serial"), "w") as f:
|
| + f.write("01")
|
| +
|
| + self.path = path
|
| +
|
| + return self
|
| +
|
| + def __exit__(self, *args, **kwargs):
|
| + if self.temporary:
|
| + shutil.rmtree(self.base_path)
|
| +
|
| + def _config_openssl(self, hosts):
|
| + conf_path = self.path("openssl.cfg")
|
| + return OpenSSL(self.logger, self.binary, self.base_path, conf_path, hosts,
|
| + self.duration, self.base_conf_path)
|
| +
|
| + def ca_cert_path(self):
|
| + """Get the path to the CA certificate file, generating a
|
| + new one if needed"""
|
| + if self._ca_cert_path is None and not self.force_regenerate:
|
| + self._load_ca_cert()
|
| + if self._ca_cert_path is None:
|
| + self._generate_ca()
|
| + return self._ca_cert_path
|
| +
|
| + def _load_ca_cert(self):
|
| + key_path = self.path("cakey.pem")
|
| + cert_path = self.path("cacert.pem")
|
| +
|
| + if self.check_key_cert(key_path, cert_path, None):
|
| + self.logger.info("Using existing CA cert")
|
| + self._ca_key_path, self._ca_cert_path = key_path, cert_path
|
| +
|
| + def check_key_cert(self, key_path, cert_path, hosts):
|
| + """Check that a key and cert file exist and are valid"""
|
| + if not os.path.exists(key_path) or not os.path.exists(cert_path):
|
| + return False
|
| +
|
| + with self._config_openssl(hosts) as openssl:
|
| + end_date_str = openssl("x509",
|
| + "-noout",
|
| + "-enddate",
|
| + "-in", cert_path).split("=", 1)[1].strip()
|
| + # Not sure if this works in other locales
|
| + end_date = datetime.strptime(end_date_str, "%b %d %H:%M:%S %Y %Z")
|
| + # Should have some buffer here e.g. 1 hr
|
| + if end_date < datetime.now():
|
| + return False
|
| +
|
| + #TODO: check the key actually signed the cert.
|
| + return True
|
| +
|
| + def _generate_ca(self):
|
| + path = self.path
|
| + self.logger.info("Generating new CA in %s" % self.base_path)
|
| +
|
| + key_path = path("cakey.pem")
|
| + req_path = path("careq.pem")
|
| + cert_path = path("cacert.pem")
|
| +
|
| + with self._config_openssl(None) as openssl:
|
| + openssl("req",
|
| + "-batch",
|
| + "-new",
|
| + "-newkey", "rsa:2048",
|
| + "-keyout", key_path,
|
| + "-out", req_path,
|
| + "-subj", make_subject("web-platform-tests"),
|
| + "-passout", "pass:%s" % self.password)
|
| +
|
| + openssl("ca",
|
| + "-batch",
|
| + "-create_serial",
|
| + "-keyfile", key_path,
|
| + "-passin", "pass:%s" % self.password,
|
| + "-selfsign",
|
| + "-extensions", "v3_ca",
|
| + "-in", req_path,
|
| + "-out", cert_path)
|
| +
|
| + os.unlink(req_path)
|
| +
|
| + self._ca_key_path, self._ca_cert_path = key_path, cert_path
|
| +
|
| + def host_cert_path(self, hosts):
|
| + """Get a tuple of (private key path, certificate path) for a host,
|
| + generating new ones if necessary.
|
| +
|
| + hosts must be a list of all hosts to appear on the certificate, with
|
| + the primary hostname first."""
|
| + hosts = tuple(hosts)
|
| + if hosts not in self.host_certificates:
|
| + if not self.force_regenerate:
|
| + key_cert = self._load_host_cert(hosts)
|
| + else:
|
| + key_cert = None
|
| + if key_cert is None:
|
| + key, cert = self._generate_host_cert(hosts)
|
| + else:
|
| + key, cert = key_cert
|
| + self.host_certificates[hosts] = key, cert
|
| +
|
| + return self.host_certificates[hosts]
|
| +
|
| + def _load_host_cert(self, hosts):
|
| + host = hosts[0]
|
| + key_path = self.path("%s.key" % host)
|
| + cert_path = self.path("%s.pem" % host)
|
| +
|
| + # TODO: check that this cert was signed by the CA cert
|
| + if self.check_key_cert(key_path, cert_path, hosts):
|
| + self.logger.info("Using existing host cert")
|
| + return key_path, cert_path
|
| +
|
| + def _generate_host_cert(self, hosts):
|
| + host = hosts[0]
|
| + if self._ca_key_path is None:
|
| + self._generate_ca()
|
| + ca_key_path = self._ca_key_path
|
| +
|
| + assert os.path.exists(ca_key_path)
|
| +
|
| + path = self.path
|
| +
|
| + req_path = path("wpt.req")
|
| + cert_path = path("%s.pem" % host)
|
| + key_path = path("%s.key" % host)
|
| +
|
| + self.logger.info("Generating new host cert")
|
| +
|
| + with self._config_openssl(hosts) as openssl:
|
| + openssl("req",
|
| + "-batch",
|
| + "-newkey", "rsa:2048",
|
| + "-keyout", key_path,
|
| + "-in", ca_key_path,
|
| + "-nodes",
|
| + "-out", req_path)
|
| +
|
| + openssl("ca",
|
| + "-batch",
|
| + "-in", req_path,
|
| + "-passin", "pass:%s" % self.password,
|
| + "-subj", make_subject(host),
|
| + "-out", cert_path)
|
| +
|
| + os.unlink(req_path)
|
| +
|
| + return key_path, cert_path
|
|
|