Index: net/tools/testserver/minica.py |
diff --git a/net/tools/testserver/minica.py b/net/tools/testserver/minica.py |
index acf68fcbb935059182661b275a106bb3c74472ad..7d7b06b634d5e3873dacd26200f05416532ccdde 100644 |
--- a/net/tools/testserver/minica.py |
+++ b/net/tools/testserver/minica.py |
@@ -3,9 +3,31 @@ |
# found in the LICENSE file. |
import asn1 |
+import datetime |
import hashlib |
+import itertools |
import os |
+import time |
+GENERALIZED_TIME_FORMAT = "%Y%m%d%H%M%SZ" |
+ |
+OCSP_STATE_GOOD = 1 |
+OCSP_STATE_REVOKED = 2 |
+OCSP_STATE_INVALID_RESPONSE = 3 |
+OCSP_STATE_UNAUTHORIZED = 4 |
+OCSP_STATE_UNKNOWN = 5 |
+OCSP_STATE_TRY_LATER = 6 |
+OCSP_STATE_INVALID_RESPONSE_DATA = 7 |
+OCSP_STATE_MISMATCHED_SERIAL = 8 |
+ |
+OCSP_DATE_VALID = 1 |
+OCSP_DATE_OLD = 2 |
+OCSP_DATE_EARLY = 3 |
+OCSP_DATE_LONG = 4 |
+ |
+OCSP_PRODUCED_VALID = 1 |
+OCSP_PRODUCED_BEFORE_CERT = 2 |
+OCSP_PRODUCED_AFTER_CERT = 3 |
# This file implements very minimal certificate and OCSP generation. It's |
# designed to test revocation checking. |
@@ -245,15 +267,8 @@ def MakeCertificate( |
asn1.BitString(privkey.Sign(tbsCert)), |
])) |
- |
-def MakeOCSPResponse(issuer_cn, issuer_key, serial, ocsp_state): |
- # https://tools.ietf.org/html/rfc2560 |
- issuer_name_hash = asn1.OCTETSTRING( |
- hashlib.sha1(asn1.ToDER(Name(cn = issuer_cn))).digest()) |
- |
- issuer_key_hash = asn1.OCTETSTRING( |
- hashlib.sha1(asn1.ToDER(issuer_key)).digest()) |
- |
+def MakeOCSPSingleResponse( |
+ issuer_name_hash, issuer_key_hash, serial, ocsp_state, ocsp_date): |
cert_status = None |
if ocsp_state == OCSP_STATE_REVOKED: |
cert_status = asn1.Explicit(1, asn1.GeneralizedTime("20100101060000Z")) |
@@ -261,28 +276,85 @@ def MakeOCSPResponse(issuer_cn, issuer_key, serial, ocsp_state): |
cert_status = asn1.Raw(asn1.TagAndLength(0x80 | 2, 0)) |
elif ocsp_state == OCSP_STATE_GOOD: |
cert_status = asn1.Raw(asn1.TagAndLength(0x80 | 0, 0)) |
+ elif ocsp_state == OCSP_STATE_MISMATCHED_SERIAL: |
+ cert_status = asn1.Raw(asn1.TagAndLength(0x80 | 0, 0)) |
+ serial -= 1 |
else: |
raise ValueError('Bad OCSP state: ' + str(ocsp_state)) |
- basic_resp_data_der = asn1.ToDER(asn1.SEQUENCE([ |
- asn1.Explicit(2, issuer_key_hash), |
- asn1.GeneralizedTime("20100101060000Z"), # producedAt |
- asn1.SEQUENCE([ |
- asn1.SEQUENCE([ # SingleResponse |
- asn1.SEQUENCE([ # CertID |
- asn1.SEQUENCE([ # hashAlgorithm |
- HASH_SHA1, |
- None, |
- ]), |
- issuer_name_hash, |
- issuer_key_hash, |
- serial, |
- ]), |
- cert_status, |
- asn1.GeneralizedTime("20100101060000Z"), # thisUpdate |
- asn1.Explicit(0, asn1.GeneralizedTime("20300101060000Z")), # nextUpdate |
+ now = datetime.datetime.fromtimestamp(time.mktime(time.gmtime())) |
+ if ocsp_date == OCSP_DATE_VALID: |
+ thisUpdate = now - datetime.timedelta(days=1) |
+ nextUpdate = thisUpdate + datetime.timedelta(weeks=1) |
+ elif ocsp_date == OCSP_DATE_OLD: |
+ thisUpdate = now - datetime.timedelta(hours=1, weeks=1) |
+ nextUpdate = thisUpdate + datetime.timedelta(weeks=1) |
+ elif ocsp_date == OCSP_DATE_EARLY: |
+ thisUpdate = now + datetime.timedelta(hours=1) |
+ nextUpdate = thisUpdate + datetime.timedelta(weeks=1) |
+ elif ocsp_date == OCSP_DATE_LONG: |
+ thisUpdate = now - datetime.timedelta(days=365) |
+ nextUpdate = thisUpdate + datetime.timedelta(hours=1, days=365) |
+ elif ocsp_date == OCSP_DATE_BEFORE_CERT: |
+ thisUpdate = now - datetime.timedelta(days=1) |
+ nextUpdate = thisUpdate + datetime.timedelta(weeks=1) |
+ elif ocsp_date == OCSP_DATE_AFTER_CERT: |
+ thisUpdate = now - datetime.timedelta(days=1) |
+ nextUpdate = thisUpdate + datetime.timedelta(weeks=1) |
+ else: |
+ raise ValueError('Bad OCSP date: ' + str(ocsp_date)) |
+ |
+ return asn1.SEQUENCE([ # SingleResponse |
+ asn1.SEQUENCE([ # CertID |
+ asn1.SEQUENCE([ # hashAlgorithm |
+ HASH_SHA1, |
+ None, |
]), |
+ issuer_name_hash, |
+ issuer_key_hash, |
+ serial, |
]), |
+ cert_status, |
+ asn1.GeneralizedTime( # thisUpdate |
+ thisUpdate.strftime(GENERALIZED_TIME_FORMAT) |
+ ), |
+ asn1.Explicit( # nextUpdate |
+ 0, |
+ asn1.GeneralizedTime(nextUpdate.strftime(GENERALIZED_TIME_FORMAT)) |
+ ), |
+ ]) |
+ |
+def MakeOCSPResponse( |
+ issuer_cn, issuer_key, serial, ocsp_states, ocsp_dates, ocsp_produced): |
+ # https://tools.ietf.org/html/rfc2560 |
+ issuer_name_hash = asn1.OCTETSTRING( |
+ hashlib.sha1(asn1.ToDER(Name(cn = issuer_cn))).digest()) |
+ |
+ issuer_key_hash = asn1.OCTETSTRING( |
+ hashlib.sha1(asn1.ToDER(issuer_key)).digest()) |
+ |
+ now = datetime.datetime.fromtimestamp(time.mktime(time.gmtime())) |
+ if ocsp_produced == OCSP_PRODUCED_VALID: |
+ producedAt = now - datetime.timedelta(days=1) |
+ elif ocsp_produced == OCSP_PRODUCED_BEFORE_CERT: |
+ producedAt = datetime.datetime.strptime( |
+ "19100101050000Z", GENERALIZED_TIME_FORMAT) |
+ elif ocsp_produced == OCSP_PRODUCED_AFTER_CERT: |
+ producedAt = datetime.datetime.strptime( |
+ "20321201070000Z", GENERALIZED_TIME_FORMAT) |
+ else: |
+ raise ValueError('Bad OCSP produced: ' + str(ocsp_produced)) |
+ |
+ single_responses = [ |
+ MakeOCSPSingleResponse(issuer_name_hash, issuer_key_hash, serial, |
+ ocsp_state, ocsp_date) |
+ for ocsp_state, ocsp_date in itertools.izip(ocsp_states, ocsp_dates) |
+ ] |
+ |
+ basic_resp_data_der = asn1.ToDER(asn1.SEQUENCE([ |
+ asn1.Explicit(2, issuer_key_hash), |
+ asn1.GeneralizedTime(producedAt.strftime(GENERALIZED_TIME_FORMAT)), |
+ asn1.SEQUENCE(single_responses), |
])) |
basic_resp = asn1.SEQUENCE([ |
@@ -311,19 +383,15 @@ def DERToPEM(der): |
pem += '-----END CERTIFICATE-----\n' |
return pem |
-OCSP_STATE_GOOD = 1 |
-OCSP_STATE_REVOKED = 2 |
-OCSP_STATE_INVALID = 3 |
-OCSP_STATE_UNAUTHORIZED = 4 |
-OCSP_STATE_UNKNOWN = 5 |
- |
# unauthorizedDER is an OCSPResponse with a status of 6: |
# SEQUENCE { ENUM(6) } |
unauthorizedDER = '30030a0106'.decode('hex') |
def GenerateCertKeyAndOCSP(subject = "127.0.0.1", |
ocsp_url = "http://127.0.0.1", |
- ocsp_state = OCSP_STATE_GOOD, |
+ ocsp_states = None, |
+ ocsp_dates = None, |
+ ocsp_produced = OCSP_PRODUCED_VALID, |
serial = 0): |
'''GenerateCertKeyAndOCSP returns a (cert_and_key_pem, ocsp_der) where: |
* cert_and_key_pem contains a certificate and private key in PEM format |
@@ -331,6 +399,11 @@ def GenerateCertKeyAndOCSP(subject = "127.0.0.1", |
* ocsp_der contains a DER encoded OCSP response or None if ocsp_url is |
None''' |
+ if ocsp_states is None: |
+ ocsp_states = [OCSP_STATE_GOOD] |
+ if ocsp_dates is None: |
+ ocsp_dates = [OCSP_DATE_VALID] |
+ |
if serial == 0: |
serial = RandomNumber(16) |
cert_der = MakeCertificate(ISSUER_CN, bytes(subject), serial, KEY, KEY, |
@@ -339,11 +412,35 @@ def GenerateCertKeyAndOCSP(subject = "127.0.0.1", |
ocsp_der = None |
if ocsp_url is not None: |
- if ocsp_state == OCSP_STATE_UNAUTHORIZED: |
+ if ocsp_states[0] == OCSP_STATE_UNAUTHORIZED: |
ocsp_der = unauthorizedDER |
- elif ocsp_state == OCSP_STATE_INVALID: |
+ elif ocsp_states[0] == OCSP_STATE_INVALID_RESPONSE: |
ocsp_der = '3' |
+ elif ocsp_states[0] == OCSP_STATE_TRY_LATER: |
+ resp = asn1.SEQUENCE([ |
+ asn1.ENUMERATED(3), |
+ ]) |
+ ocsp_der = asn1.ToDER(resp) |
+ elif ocsp_states[0] == OCSP_STATE_INVALID_RESPONSE_DATA: |
+ invalid_data = asn1.ToDER(asn1.OCTETSTRING('not ocsp data')) |
+ basic_resp = asn1.SEQUENCE([ |
+ asn1.Raw(invalid_data), |
+ asn1.SEQUENCE([ |
+ SHA256_WITH_RSA_ENCRYPTION, |
+ None, |
+ ]), |
+ asn1.BitString(KEY.Sign(invalid_data)), |
+ ]) |
+ resp = asn1.SEQUENCE([ |
+ asn1.ENUMERATED(0), |
+ asn1.Explicit(0, asn1.SEQUENCE([ |
+ OCSP_TYPE_BASIC, |
+ asn1.OCTETSTRING(asn1.ToDER(basic_resp)), |
+ ])), |
+ ]) |
+ ocsp_der = asn1.ToDER(resp) |
else: |
- ocsp_der = MakeOCSPResponse(ISSUER_CN, KEY, serial, ocsp_state) |
+ ocsp_der = MakeOCSPResponse( |
+ ISSUER_CN, KEY, serial, ocsp_states, ocsp_dates, ocsp_produced) |
return (cert_pem + KEY_PEM, ocsp_der) |