OLD | NEW |
---|---|
1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import asn1 | 5 import asn1 |
6 import datetime | |
6 import hashlib | 7 import hashlib |
7 import os | 8 import os |
9 import time | |
8 | 10 |
11 GENERALIZED_TIME_FORMAT = "%Y%m%d%H%M%SZ" | |
12 | |
13 OCSP_STATE_GOOD = 1 | |
14 OCSP_STATE_REVOKED = 2 | |
15 OCSP_STATE_INVALID = 3 | |
16 OCSP_STATE_UNAUTHORIZED = 4 | |
17 OCSP_STATE_UNKNOWN = 5 | |
18 | |
19 OCSP_DATE_VALID = 1 | |
20 OCSP_DATE_OLD = 2 | |
21 OCSP_DATE_EARLY = 3 | |
22 OCSP_DATE_LONG = 4 | |
9 | 23 |
10 # This file implements very minimal certificate and OCSP generation. It's | 24 # This file implements very minimal certificate and OCSP generation. It's |
11 # designed to test revocation checking. | 25 # designed to test revocation checking. |
12 | 26 |
13 def RandomNumber(length_in_bytes): | 27 def RandomNumber(length_in_bytes): |
14 '''RandomNumber returns a random number of length 8*|length_in_bytes| bits''' | 28 '''RandomNumber returns a random number of length 8*|length_in_bytes| bits''' |
15 rand = os.urandom(length_in_bytes) | 29 rand = os.urandom(length_in_bytes) |
16 n = 0 | 30 n = 0 |
17 for x in rand: | 31 for x in rand: |
18 n <<= 8 | 32 n <<= 8 |
(...skipping 219 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
238 | 252 |
239 return asn1.ToDER(asn1.SEQUENCE([ | 253 return asn1.ToDER(asn1.SEQUENCE([ |
240 asn1.Raw(tbsCert), | 254 asn1.Raw(tbsCert), |
241 asn1.SEQUENCE([ | 255 asn1.SEQUENCE([ |
242 SHA256_WITH_RSA_ENCRYPTION, | 256 SHA256_WITH_RSA_ENCRYPTION, |
243 None, | 257 None, |
244 ]), | 258 ]), |
245 asn1.BitString(privkey.Sign(tbsCert)), | 259 asn1.BitString(privkey.Sign(tbsCert)), |
246 ])) | 260 ])) |
247 | 261 |
248 | 262 def MakeOCSPResponse(issuer_cn, issuer_key, serial, ocsp_state, ocsp_date): |
249 def MakeOCSPResponse(issuer_cn, issuer_key, serial, ocsp_state): | |
250 # https://tools.ietf.org/html/rfc2560 | 263 # https://tools.ietf.org/html/rfc2560 |
251 issuer_name_hash = asn1.OCTETSTRING( | 264 issuer_name_hash = asn1.OCTETSTRING( |
252 hashlib.sha1(asn1.ToDER(Name(cn = issuer_cn))).digest()) | 265 hashlib.sha1(asn1.ToDER(Name(cn = issuer_cn))).digest()) |
253 | 266 |
254 issuer_key_hash = asn1.OCTETSTRING( | 267 issuer_key_hash = asn1.OCTETSTRING( |
255 hashlib.sha1(asn1.ToDER(issuer_key)).digest()) | 268 hashlib.sha1(asn1.ToDER(issuer_key)).digest()) |
256 | 269 |
257 cert_status = None | 270 cert_status = None |
258 if ocsp_state == OCSP_STATE_REVOKED: | 271 if ocsp_state == OCSP_STATE_REVOKED: |
259 cert_status = asn1.Explicit(1, asn1.GeneralizedTime("20100101060000Z")) | 272 cert_status = asn1.Explicit(1, asn1.GeneralizedTime("20100101060000Z")) |
260 elif ocsp_state == OCSP_STATE_UNKNOWN: | 273 elif ocsp_state == OCSP_STATE_UNKNOWN: |
261 cert_status = asn1.Raw(asn1.TagAndLength(0x80 | 2, 0)) | 274 cert_status = asn1.Raw(asn1.TagAndLength(0x80 | 2, 0)) |
262 elif ocsp_state == OCSP_STATE_GOOD: | 275 elif ocsp_state == OCSP_STATE_GOOD: |
263 cert_status = asn1.Raw(asn1.TagAndLength(0x80 | 0, 0)) | 276 cert_status = asn1.Raw(asn1.TagAndLength(0x80 | 0, 0)) |
264 else: | 277 else: |
265 raise ValueError('Bad OCSP state: ' + str(ocsp_state)) | 278 raise ValueError('Bad OCSP state: ' + str(ocsp_state)) |
266 | 279 |
280 now = datetime.datetime.fromtimestamp(time.mktime(time.gmtime())) | |
281 if ocsp_date == OCSP_DATE_VALID: | |
282 thisUpdate = now - datetime.timedelta(days=1) | |
283 nextUpdate = thisUpdate + datetime.timedelta(weeks=1) | |
284 elif ocsp_date == OCSP_DATE_OLD: | |
285 thisUpdate = now - datetime.timedelta(hours=1, weeks=1) | |
286 nextUpdate = thisUpdate + datetime.timedelta(weeks=1) | |
287 elif ocsp_date == OCSP_DATE_EARLY: | |
288 thisUpdate = now + datetime.timedelta(hours=1) | |
289 nextUpdate = thisUpdate + datetime.timedelta(weeks=1) | |
290 elif ocsp_date == OCSP_DATE_LONG: | |
291 thisUpdate = now - datetime.timedelta(days=365) | |
292 nextUpdate = thisUpdate + datetime.timedelta(hours=1, days=365) | |
293 else: | |
294 raise ValueError('Bad OCSP date: ' + str(ocsp_date)) | |
295 producedAt = thisUpdate | |
dadrian
2016/06/27 22:43:03
I had to modify producedAt because apparently NSS
svaldez
2016/06/29 14:41:23
From the RFC, the only requirement on producedAt i
dadrian
2016/06/30 21:52:43
I added a check to make sure notBefore <= produced
Ryan Sleevi
2016/06/30 22:14:50
There were some security bugs about this. From an
dadrian
2016/07/08 22:17:30
I mangled these down to one test class that _shoul
| |
296 | |
267 basic_resp_data_der = asn1.ToDER(asn1.SEQUENCE([ | 297 basic_resp_data_der = asn1.ToDER(asn1.SEQUENCE([ |
268 asn1.Explicit(2, issuer_key_hash), | 298 asn1.Explicit(2, issuer_key_hash), |
269 asn1.GeneralizedTime("20100101060000Z"), # producedAt | 299 asn1.GeneralizedTime(producedAt.strftime(GENERALIZED_TIME_FORMAT)), |
270 asn1.SEQUENCE([ | 300 asn1.SEQUENCE([ |
271 asn1.SEQUENCE([ # SingleResponse | 301 asn1.SEQUENCE([ # SingleResponse |
272 asn1.SEQUENCE([ # CertID | 302 asn1.SEQUENCE([ # CertID |
273 asn1.SEQUENCE([ # hashAlgorithm | 303 asn1.SEQUENCE([ # hashAlgorithm |
274 HASH_SHA1, | 304 HASH_SHA1, |
275 None, | 305 None, |
276 ]), | 306 ]), |
277 issuer_name_hash, | 307 issuer_name_hash, |
278 issuer_key_hash, | 308 issuer_key_hash, |
279 serial, | 309 serial, |
280 ]), | 310 ]), |
281 cert_status, | 311 cert_status, |
282 asn1.GeneralizedTime("20100101060000Z"), # thisUpdate | 312 asn1.GeneralizedTime( # thisUpdate |
283 asn1.Explicit(0, asn1.GeneralizedTime("20300101060000Z")), # nextUpdate | 313 thisUpdate.strftime(GENERALIZED_TIME_FORMAT) |
314 ), | |
315 asn1.Explicit( # nextUpdate | |
316 0, | |
317 asn1.GeneralizedTime(nextUpdate.strftime(GENERALIZED_TIME_FORMAT)) | |
318 ), | |
284 ]), | 319 ]), |
285 ]), | 320 ]), |
286 ])) | 321 ])) |
287 | 322 |
288 basic_resp = asn1.SEQUENCE([ | 323 basic_resp = asn1.SEQUENCE([ |
289 asn1.Raw(basic_resp_data_der), | 324 asn1.Raw(basic_resp_data_der), |
290 asn1.SEQUENCE([ | 325 asn1.SEQUENCE([ |
291 SHA256_WITH_RSA_ENCRYPTION, | 326 SHA256_WITH_RSA_ENCRYPTION, |
292 None, | 327 None, |
293 ]), | 328 ]), |
(...skipping 10 matching lines...) Expand all Loading... | |
304 | 339 |
305 return asn1.ToDER(resp) | 340 return asn1.ToDER(resp) |
306 | 341 |
307 | 342 |
308 def DERToPEM(der): | 343 def DERToPEM(der): |
309 pem = '-----BEGIN CERTIFICATE-----\n' | 344 pem = '-----BEGIN CERTIFICATE-----\n' |
310 pem += der.encode('base64') | 345 pem += der.encode('base64') |
311 pem += '-----END CERTIFICATE-----\n' | 346 pem += '-----END CERTIFICATE-----\n' |
312 return pem | 347 return pem |
313 | 348 |
314 OCSP_STATE_GOOD = 1 | |
315 OCSP_STATE_REVOKED = 2 | |
316 OCSP_STATE_INVALID = 3 | |
317 OCSP_STATE_UNAUTHORIZED = 4 | |
318 OCSP_STATE_UNKNOWN = 5 | |
319 | |
320 # unauthorizedDER is an OCSPResponse with a status of 6: | 349 # unauthorizedDER is an OCSPResponse with a status of 6: |
321 # SEQUENCE { ENUM(6) } | 350 # SEQUENCE { ENUM(6) } |
322 unauthorizedDER = '30030a0106'.decode('hex') | 351 unauthorizedDER = '30030a0106'.decode('hex') |
323 | 352 |
324 def GenerateCertKeyAndOCSP(subject = "127.0.0.1", | 353 def GenerateCertKeyAndOCSP(subject = "127.0.0.1", |
325 ocsp_url = "http://127.0.0.1", | 354 ocsp_url = "http://127.0.0.1", |
326 ocsp_state = OCSP_STATE_GOOD, | 355 ocsp_state = OCSP_STATE_GOOD, |
356 ocsp_date = OCSP_DATE_VALID, | |
327 serial = 0): | 357 serial = 0): |
328 '''GenerateCertKeyAndOCSP returns a (cert_and_key_pem, ocsp_der) where: | 358 '''GenerateCertKeyAndOCSP returns a (cert_and_key_pem, ocsp_der) where: |
329 * cert_and_key_pem contains a certificate and private key in PEM format | 359 * cert_and_key_pem contains a certificate and private key in PEM format |
330 with the given subject common name and OCSP URL. | 360 with the given subject common name and OCSP URL. |
331 * ocsp_der contains a DER encoded OCSP response or None if ocsp_url is | 361 * ocsp_der contains a DER encoded OCSP response or None if ocsp_url is |
332 None''' | 362 None''' |
333 | 363 |
334 if serial == 0: | 364 if serial == 0: |
335 serial = RandomNumber(16) | 365 serial = RandomNumber(16) |
336 cert_der = MakeCertificate(ISSUER_CN, bytes(subject), serial, KEY, KEY, | 366 cert_der = MakeCertificate(ISSUER_CN, bytes(subject), serial, KEY, KEY, |
337 bytes(ocsp_url)) | 367 bytes(ocsp_url)) |
338 cert_pem = DERToPEM(cert_der) | 368 cert_pem = DERToPEM(cert_der) |
339 | 369 |
340 ocsp_der = None | 370 ocsp_der = None |
341 if ocsp_url is not None: | 371 if ocsp_url is not None: |
342 if ocsp_state == OCSP_STATE_UNAUTHORIZED: | 372 if ocsp_state == OCSP_STATE_UNAUTHORIZED: |
343 ocsp_der = unauthorizedDER | 373 ocsp_der = unauthorizedDER |
344 elif ocsp_state == OCSP_STATE_INVALID: | 374 elif ocsp_state == OCSP_STATE_INVALID: |
345 ocsp_der = '3' | 375 ocsp_der = '3' |
346 else: | 376 else: |
347 ocsp_der = MakeOCSPResponse(ISSUER_CN, KEY, serial, ocsp_state) | 377 ocsp_der = MakeOCSPResponse( |
378 ISSUER_CN, KEY, serial, ocsp_state, ocsp_date) | |
348 | 379 |
349 return (cert_pem + KEY_PEM, ocsp_der) | 380 return (cert_pem + KEY_PEM, ocsp_der) |
OLD | NEW |