Chromium Code Reviews| Index: net/data/verify_certificate_chain_unittest/common.py |
| diff --git a/net/data/verify_certificate_chain_unittest/common.py b/net/data/verify_certificate_chain_unittest/common.py |
| new file mode 100755 |
| index 0000000000000000000000000000000000000000..c84dd1785267fc1cb916825e689e10c2843043f1 |
| --- /dev/null |
| +++ b/net/data/verify_certificate_chain_unittest/common.py |
| @@ -0,0 +1,416 @@ |
| +#!/usr/bin/python |
| +# Copyright (c) 2015 The Chromium Authors. All rights reserved. |
| +# Use of this source code is governed by a BSD-style license that can be |
| +# found in the LICENSE file. |
| + |
| +"""Set of helpers to generate signed X.509 certificates. |
| + |
| +This works by shelling out calls to the 'openssl req' and 'openssl ca' |
| +commands, and passing the appropriate command line flags and configuration file |
| +(.cnf). |
| +""" |
| + |
| +import base64 |
| +import os |
| +import shutil |
| +import subprocess |
| +import sys |
| + |
| +sys.path.insert(0, os.path.dirname(__file__)) |
|
mattm
2015/10/29 01:47:18
This is unnecessary, can import files in the same
eroman
2015/10/31 00:34:25
Done.
|
| +import openssl_conf |
| + |
| +# Enum for the "type" of certificate that is to be created. This is used to |
| +# select sane defaults for the .cnf file and command line flags, but they can |
| +# all be overridden. |
| +TYPE_CA = 2 |
| +TYPE_END_ENTITY = 3 |
| + |
| +# January 1st, 2015 midnight UTC |
| +JANUARY_1_2015_UTC = '150101120000Z' |
| + |
| +# January 1st, 2016 midnight UTC |
| +JANUARY_1_2016_UTC = '160101120000Z' |
| + |
| +# March 2nd, 2015 midnight UTC |
| +DEFAULT_TIME = '150302120000Z' |
| + |
| +g_next_path_id = 1 |
| + |
| +# Base directory into which all temporary output files will be saved. |
| +g_out_dir = None |
| + |
| +g_invoking_script_name = None |
|
mattm
2015/10/29 01:47:18
You could use sys.argv[0] to get it instead of hav
eroman
2015/10/31 00:34:25
Done (good idea, simplifies the boilerplate for ea
|
| + |
| + |
| +def GetUniquePathId(name): |
| + """Returns a base filename that contains 'name', but is unique to the output |
| + directory""" |
| + global g_next_path_id |
| + path_id = '%s_%d' % (name, g_next_path_id) |
| + g_next_path_id += 1 |
| + return path_id |
| + |
| + |
| +class Certificate(object): |
| + """Helper for building an X.509 certificate.""" |
| + |
| + def __init__(self, name, cert_type, issuer): |
| + # The name will be used for the subject's CN, and also as a component of |
| + # the temporary filenames to help with debugging. |
| + self.name = name |
| + self.path_id = GetUniquePathId(name) |
| + |
| + # The issuer is also a Certificate object. Passing |None| means it is a |
| + # self-signed certificate. |
| + self.issuer = issuer |
| + if issuer is None: |
| + self.issuer = self |
| + |
| + # The config contains all the OpenSSL options that will be passed via a |
| + # .cnf file. Set up defaults. |
| + self.config = openssl_conf.Config() |
| + self.InitConfig() |
| + |
| + # Some settings need to be passed as flags rather than in the .cnf file. |
| + # Technically these can be set though a .cnf, however doing so makes it |
| + # sticky to the issuing certificate, rather than selecting it per |
| + # subordinate certificate. |
| + self.validity_flags = [] |
| + self.md_flags = [] |
| + |
| + # By default OpenSSL will use the current time for the start time. Instead |
| + # default to using a fixed timestamp for more predictabl results each time |
| + # the certificates are re-generated. |
| + self.SetValidityRange(JANUARY_1_2015_UTC, JANUARY_1_2016_UTC) |
| + |
| + # Use SHA-256 when THIS certificate is signed (setting it in the |
| + # configuration would instead set the hash to use when signing other |
| + # certificates with this one). |
| + self.SetSignatureHash('sha256') |
| + |
| + # Set appropriate key usages and basic constraints. For flexibility in |
| + # testing (since want to generate some flawed certificates) these are set |
| + # on a per-certificate basis rather than automatically when signing. |
| + if cert_type == TYPE_END_ENTITY: |
| + self.GetExtensions().SetProperty('keyUsage', |
| + 'critical,digitalSignature,keyEncipherment') |
| + self.GetExtensions().SetProperty('extendedKeyUsage', |
| + 'serverAuth,clientAuth') |
| + else: |
| + self.GetExtensions().SetProperty('keyUsage', |
| + 'critical,keyCertSign,cRLSign') |
| + self.GetExtensions().SetProperty('basicConstraints', 'critical,CA:true') |
| + |
| + # Tracks whether the PEM file for this certificate has been written (since |
| + # generation is done lazily). |
| + self.finalized = False |
| + |
| + # Initialize any files that will be needed if this certificate is used to |
| + # sign other certificates. Starts off serial numbers at 1, and will |
| + # increment them for each signed certificate. |
| + WriteStringToFile('01\n', self.GetSerialPath()) |
| + WriteStringToFile('', self.GetDatabasePath()) |
| + |
| + |
| + def GenerateRsaKey(self, size_bits): |
| + """Generates an RSA private key for the certificate.""" |
| + subprocess.check_call( |
| + ['openssl', 'genrsa', '-out', self.GetKeyPath(), str(size_bits)]) |
| + |
| + |
| + def GenerateEcKey(self, named_curve): |
| + """Generates an EC private key for the certificate. |named_curve| can be |
| + something like secp384r1""" |
| + subprocess.check_call( |
| + ['openssl', 'ecparam', '-out', self.GetKeyPath(), '-name', named_curve, |
| + '-genkey']) |
| + |
| + |
| + def SetValidityRange(self, start_date, end_date): |
| + """Sets the Validity notBefore and notAfter properties for the |
| + certificate""" |
| + self.validity_flags = ['-startdate', start_date, '-enddate', end_date] |
| + |
| + |
| + def SetSignatureHash(self, md): |
| + """Sets the hash function that will be used when signing this certificate. |
| + Can be sha1, sha256, sha512, md5, etc.""" |
| + self.md_flags = ['-md', md] |
| + |
| + |
| + def GetExtensions(self): |
| + return self.config.GetSection('req_ext') |
| + |
| + |
| + def GetPath(self, suffix): |
| + """Forms a path to an output file for this certificate, containing the |
| + indicated suffix. The certificate's name will be used as its basis.""" |
| + return os.path.join(g_out_dir, '%s%s' % (self.path_id, suffix)) |
| + |
| + |
| + def GetKeyPath(self): |
| + return self.GetPath('.key') |
| + |
| + |
| + def GetCertPath(self): |
| + return self.GetPath('.pem') |
| + |
| + |
| + def GetSerialPath(self): |
| + return self.GetPath('.serial') |
| + |
| + |
| + def GetCsrPath(self): |
| + return self.GetPath('.csr') |
| + |
| + |
| + def GetDatabasePath(self): |
| + return self.GetPath('.db') |
| + |
| + |
| + def GetConfigPath(self): |
| + return self.GetPath('.cnf') |
| + |
| + |
| + def GetCertPem(self): |
| + # Finish generating a .pem file for the certificate. |
| + self.Finalize() |
| + |
| + # Read the certificate data. |
| + with open(self.GetCertPath(), 'r') as f: |
| + return f.read() |
| + |
| + |
| + def Finalize(self): |
| + """Finishes the certificate creation process. This generates any needed |
| + key, creates and signs the CSR. On completion the resulting PEM file can be |
| + found at self.GetCertPath()""" |
| + |
| + if self.finalized: |
| + return # Already finalized, no work needed. |
| + |
| + self.finalized = True |
| + |
| + # Ensure that the issuer has been "finalized", since its outputs need to be |
| + # accessible. Note that self.issuer could be the same as self. |
| + self.issuer.Finalize() |
| + |
| + # Ensure the certificate has a key. Callers have the option to generate a |
| + # different type of key, but if that was not done default to a new 2048-bit |
| + # RSA key. |
| + if not os.path.isfile(self.GetKeyPath()): |
| + self.GenerateRsaKey(2048) |
| + |
| + |
|
mattm
2015/10/29 01:47:17
nit: extra line
eroman
2015/10/31 00:34:25
Done.
|
| + # Serialize the config to a file. |
| + self.config.WriteToFile(self.GetConfigPath()) |
| + |
| + # Create a CSR. |
| + subprocess.check_call( |
| + ['openssl', 'req', '-new', |
| + '-key', self.GetKeyPath(), |
| + '-out', self.GetCsrPath(), |
| + '-config', self.GetConfigPath()]) |
| + |
| + cmd = ['openssl', 'ca', '-batch', '-in', |
| + self.GetCsrPath(), '-out', self.GetCertPath(), '-config', |
| + self.issuer.GetConfigPath()] |
| + |
| + if self.issuer == self: |
| + cmd.append('-selfsign') |
| + |
| + # Add in any extra flags. |
| + cmd.extend(self.validity_flags) |
| + cmd.extend(self.md_flags) |
| + |
| + # Run the 'openssl ca' command. |
| + subprocess.check_call(cmd) |
| + |
| + |
| + def InitConfig(self): |
| + """Initializes default properties in the certificate .cnf file that are |
| + generic enough to work for all certificates (but can be overridden later). |
| + """ |
| + |
| + # -------------------------------------- |
| + # 'req' section |
| + # -------------------------------------- |
| + |
| + section = self.config.GetSection('req') |
| + |
| + section.SetProperty('encrypt_key', 'no') |
| + section.SetProperty('utf8', 'yes') |
| + section.SetProperty('string_mask', 'utf8only') |
| + section.SetProperty('prompt', 'no') |
| + section.SetProperty('distinguished_name', 'req_dn') |
| + section.SetProperty('req_extensions', 'req_ext') |
| + |
| + # -------------------------------------- |
| + # 'req_dn' section |
| + # -------------------------------------- |
| + |
| + # This section describes the certificate subject's distinguished name. |
| + |
| + section = self.config.GetSection('req_dn') |
| + section.SetProperty('commonName', '"%s"' % (self.name)) |
| + |
| + # -------------------------------------- |
| + # 'req_ext' section |
| + # -------------------------------------- |
| + |
| + # This section describes the certificate's extensions. |
| + |
| + section = self.config.GetSection('req_ext') |
| + section.SetProperty('subjectKeyIdentifier', 'hash') |
| + |
| + # -------------------------------------- |
| + # SECTIONS FOR CAs |
| + # -------------------------------------- |
| + |
| + # The following sections are used by the 'openssl ca' and relate to the |
| + # signing operation. They are not needed for end-entity certificate |
| + # configurations, but only if this certifiate will be used to sign other |
| + # certificates. |
| + |
| + # -------------------------------------- |
| + # 'ca' section |
| + # -------------------------------------- |
| + |
| + section = self.config.GetSection('ca') |
| + section.SetProperty('default_ca', 'root_ca') |
| + |
| + section = self.config.GetSection('root_ca') |
| + section.SetProperty('certificate', self.GetCertPath()) |
| + section.SetProperty('private_key', self.GetKeyPath()) |
| + section.SetProperty('new_certs_dir', g_out_dir) |
| + section.SetProperty('serial', self.GetSerialPath()) |
| + section.SetProperty('database', self.GetDatabasePath()) |
| + section.SetProperty('unique_subject', 'no') |
| + |
| + # These will get overridden via command line flags. |
| + section.SetProperty('default_days', '365') |
| + section.SetProperty('default_md', 'sha256') |
| + |
| + section.SetProperty('policy', 'policy_anything') |
| + section.SetProperty('email_in_dn', 'no') |
| + section.SetProperty('preserve', 'yes') |
| + section.SetProperty('name_opt', 'multiline,-esc_msb,utf8') |
| + section.SetProperty('cert_opt', 'ca_default') |
| + section.SetProperty('copy_extensions', 'copy') |
| + section.SetProperty('x509_extensions', 'signing_ca_ext') |
| + section.SetProperty('default_crl_days', '30') |
| + section.SetProperty('crl_extensions', 'crl_ext') |
| + |
| + section = self.config.GetSection('policy_anything') |
| + section.SetProperty('domainComponent', 'optional') |
| + section.SetProperty('countryName', 'optional') |
| + section.SetProperty('stateOrProvinceName', 'optional') |
| + section.SetProperty('localityName', 'optional') |
| + section.SetProperty('organizationName', 'optional') |
| + section.SetProperty('organizationalUnitName', 'optional') |
| + section.SetProperty('commonName', 'optional') |
| + section.SetProperty('emailAddress', 'optional') |
| + |
| + section = self.config.GetSection('signing_ca_ext') |
| + section.SetProperty('subjectKeyIdentifier', 'hash') |
| + section.SetProperty('authorityKeyIdentifier', 'keyid:always') |
| + section.SetProperty('authorityInfoAccess', '@issuer_info') |
| + section.SetProperty('crlDistributionPoints', '@crl_info') |
| + |
| + section = self.config.GetSection('issuer_info') |
| + section.SetProperty('caIssuers;URI.0', |
| + 'http://url-for-aia/%s.cer' % (self.name)) |
| + |
| + section = self.config.GetSection('crl_info') |
| + section.SetProperty('URI.0', 'http://url-for-crl/%s.crl' % (self.name)) |
| + |
| + section = self.config.GetSection('crl_ext') |
| + section.SetProperty('authorityKeyIdentifier', 'keyid:always') |
| + section.SetProperty('authorityInfoAccess', '@issuer_info') |
| + |
| + |
| +def DataToPem(block_header, block_data): |
| + return '-----BEGIN %s-----\n%s\n-----END %s-----\n' % (block_header, |
| + base64.b64encode(block_data), block_header) |
| + |
| + |
| +def WriteTestFile(description, chain, trusted_certs, utc_time, verify_result): |
| + """Writes a test file that contains all the inputs necessary to run a |
| + verification on a certificate chain""" |
| + |
| + # Prepend the script name that generated the file to the description. |
| + test_data = '[Created by: %s.py]\n\n%s\n' % (g_invoking_script_name, |
| + description) |
| + |
| + # Write the certificate chain to the output file. |
| + for cert in chain: |
| + test_data += '\n' + cert.GetCertPem() |
| + |
| + # Write the trust store. |
| + for cert in trusted_certs: |
| + cert_data = cert.GetCertPem() |
| + # Use a different block type in the .pem file. |
| + cert_data = cert_data.replace('CERTIFICATE', 'TRUSTED_CERTIFICATE') |
| + test_data += '\n' + cert_data |
| + |
| + test_data += '\n' + DataToPem('TIME', utc_time) |
| + |
| + if verify_result: |
| + verify_result = 'SUCCESS' |
|
mattm
2015/10/29 01:47:17
I found it confusing to overwrite the original arg
eroman
2015/10/31 00:34:24
Done (I didn't know that is how you did tertiary o
|
| + else: |
| + verify_result = 'FAIL' |
| + |
| + test_data += '\n' + DataToPem('VERIFY_RESULT', verify_result) |
| + |
| + # Save the chain using the same name as the script that generated it (but |
| + # with the 'generate-' prefix stripped). |
| + stripped_script_name = g_invoking_script_name |
| + parts = g_invoking_script_name.split('generate-', 1) |
| + if len(parts) > 0: |
|
mattm
2015/10/29 01:47:17
should be > 1
eroman
2015/10/31 00:34:25
No longer relevant (in fact I merged this logic in
|
| + stripped_script_name = parts[1] |
|
mattm
2015/10/29 01:47:18
Or you could just do:
stripped_script_name = g_inv
eroman
2015/10/31 00:34:25
I opted for this:
# Strip the leading 'generate
|
| + |
| + final_output_path = os.path.join('%s.pem' % (stripped_script_name)) |
| + |
| + WriteStringToFile(test_data, final_output_path) |
| + |
| + |
| +def WriteStringToFile(data, path): |
| + with open(path, 'w') as f: |
| + f.write(data) |
| + |
| + |
| +def Init(invoking_script_path): |
| + """Creates an output directory for all the temporary files generated by the |
| + calling script. |
| + """ |
| + |
| + global g_out_dir |
| + global g_invoking_script_name |
| + |
| + g_invoking_script_name = os.path.splitext( |
| + os.path.basename(invoking_script_path))[0] |
| + |
| + # Ensure that out/ exists |
| + if not os.path.exists('out'): |
| + os.makedirs('out') |
|
mattm
2015/10/29 01:47:17
no need for os.makedirs here, use os.mkdir?
Actua
eroman
2015/10/31 00:34:25
Done (just keeping the on call to mkdirs)
|
| + |
| + # Use an output directory with the same name as the invoking script. |
| + g_out_dir = os.path.join('out', g_invoking_script_name) |
| + sys.stdout.write('Creating output directory: %s\n' % (g_out_dir)) |
| + |
| + # Ensure the output directory is empty. |
| + shutil.rmtree(g_out_dir, True) |
| + os.makedirs(g_out_dir) |
| + |
| + |
| +def CreateSelfSignedRootCertificate(name): |
| + return Certificate(name, TYPE_CA, None) |
| + |
| + |
| +def CreateIntermediaryCertificate(name, issuer): |
| + return Certificate(name, TYPE_CA, issuer) |
| + |
| + |
| +def CreateEndEntityCertificate(name, issuer): |
| + return Certificate(name, TYPE_END_ENTITY, issuer) |
| + |