OLD | NEW |
1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import asn1 | 5 import asn1 |
| 6 import datetime |
6 import hashlib | 7 import hashlib |
| 8 import itertools |
7 import os | 9 import os |
| 10 import time |
8 | 11 |
| 12 GENERALIZED_TIME_FORMAT = "%Y%m%d%H%M%SZ" |
| 13 |
| 14 OCSP_STATE_GOOD = 1 |
| 15 OCSP_STATE_REVOKED = 2 |
| 16 OCSP_STATE_INVALID_RESPONSE = 3 |
| 17 OCSP_STATE_UNAUTHORIZED = 4 |
| 18 OCSP_STATE_UNKNOWN = 5 |
| 19 OCSP_STATE_TRY_LATER = 6 |
| 20 OCSP_STATE_INVALID_RESPONSE_DATA = 7 |
| 21 OCSP_STATE_MISMATCHED_SERIAL = 8 |
| 22 |
| 23 OCSP_DATE_VALID = 1 |
| 24 OCSP_DATE_OLD = 2 |
| 25 OCSP_DATE_EARLY = 3 |
| 26 OCSP_DATE_LONG = 4 |
| 27 |
| 28 OCSP_PRODUCED_VALID = 1 |
| 29 OCSP_PRODUCED_BEFORE_CERT = 2 |
| 30 OCSP_PRODUCED_AFTER_CERT = 3 |
9 | 31 |
10 # This file implements very minimal certificate and OCSP generation. It's | 32 # This file implements very minimal certificate and OCSP generation. It's |
11 # designed to test revocation checking. | 33 # designed to test revocation checking. |
12 | 34 |
13 def RandomNumber(length_in_bytes): | 35 def RandomNumber(length_in_bytes): |
14 '''RandomNumber returns a random number of length 8*|length_in_bytes| bits''' | 36 '''RandomNumber returns a random number of length 8*|length_in_bytes| bits''' |
15 rand = os.urandom(length_in_bytes) | 37 rand = os.urandom(length_in_bytes) |
16 n = 0 | 38 n = 0 |
17 for x in rand: | 39 for x in rand: |
18 n <<= 8 | 40 n <<= 8 |
(...skipping 219 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
238 | 260 |
239 return asn1.ToDER(asn1.SEQUENCE([ | 261 return asn1.ToDER(asn1.SEQUENCE([ |
240 asn1.Raw(tbsCert), | 262 asn1.Raw(tbsCert), |
241 asn1.SEQUENCE([ | 263 asn1.SEQUENCE([ |
242 SHA256_WITH_RSA_ENCRYPTION, | 264 SHA256_WITH_RSA_ENCRYPTION, |
243 None, | 265 None, |
244 ]), | 266 ]), |
245 asn1.BitString(privkey.Sign(tbsCert)), | 267 asn1.BitString(privkey.Sign(tbsCert)), |
246 ])) | 268 ])) |
247 | 269 |
248 | 270 def MakeOCSPSingleResponse( |
249 def MakeOCSPResponse(issuer_cn, issuer_key, serial, ocsp_state): | 271 issuer_name_hash, issuer_key_hash, serial, ocsp_state, ocsp_date): |
250 # https://tools.ietf.org/html/rfc2560 | |
251 issuer_name_hash = asn1.OCTETSTRING( | |
252 hashlib.sha1(asn1.ToDER(Name(cn = issuer_cn))).digest()) | |
253 | |
254 issuer_key_hash = asn1.OCTETSTRING( | |
255 hashlib.sha1(asn1.ToDER(issuer_key)).digest()) | |
256 | |
257 cert_status = None | 272 cert_status = None |
258 if ocsp_state == OCSP_STATE_REVOKED: | 273 if ocsp_state == OCSP_STATE_REVOKED: |
259 cert_status = asn1.Explicit(1, asn1.GeneralizedTime("20100101060000Z")) | 274 cert_status = asn1.Explicit(1, asn1.GeneralizedTime("20100101060000Z")) |
260 elif ocsp_state == OCSP_STATE_UNKNOWN: | 275 elif ocsp_state == OCSP_STATE_UNKNOWN: |
261 cert_status = asn1.Raw(asn1.TagAndLength(0x80 | 2, 0)) | 276 cert_status = asn1.Raw(asn1.TagAndLength(0x80 | 2, 0)) |
262 elif ocsp_state == OCSP_STATE_GOOD: | 277 elif ocsp_state == OCSP_STATE_GOOD: |
263 cert_status = asn1.Raw(asn1.TagAndLength(0x80 | 0, 0)) | 278 cert_status = asn1.Raw(asn1.TagAndLength(0x80 | 0, 0)) |
| 279 elif ocsp_state == OCSP_STATE_MISMATCHED_SERIAL: |
| 280 cert_status = asn1.Raw(asn1.TagAndLength(0x80 | 0, 0)) |
| 281 serial -= 1 |
264 else: | 282 else: |
265 raise ValueError('Bad OCSP state: ' + str(ocsp_state)) | 283 raise ValueError('Bad OCSP state: ' + str(ocsp_state)) |
266 | 284 |
| 285 now = datetime.datetime.fromtimestamp(time.mktime(time.gmtime())) |
| 286 if ocsp_date == OCSP_DATE_VALID: |
| 287 thisUpdate = now - datetime.timedelta(days=1) |
| 288 nextUpdate = thisUpdate + datetime.timedelta(weeks=1) |
| 289 elif ocsp_date == OCSP_DATE_OLD: |
| 290 thisUpdate = now - datetime.timedelta(hours=1, weeks=1) |
| 291 nextUpdate = thisUpdate + datetime.timedelta(weeks=1) |
| 292 elif ocsp_date == OCSP_DATE_EARLY: |
| 293 thisUpdate = now + datetime.timedelta(hours=1) |
| 294 nextUpdate = thisUpdate + datetime.timedelta(weeks=1) |
| 295 elif ocsp_date == OCSP_DATE_LONG: |
| 296 thisUpdate = now - datetime.timedelta(days=365) |
| 297 nextUpdate = thisUpdate + datetime.timedelta(hours=1, days=365) |
| 298 elif ocsp_date == OCSP_DATE_BEFORE_CERT: |
| 299 thisUpdate = now - datetime.timedelta(days=1) |
| 300 nextUpdate = thisUpdate + datetime.timedelta(weeks=1) |
| 301 elif ocsp_date == OCSP_DATE_AFTER_CERT: |
| 302 thisUpdate = now - datetime.timedelta(days=1) |
| 303 nextUpdate = thisUpdate + datetime.timedelta(weeks=1) |
| 304 else: |
| 305 raise ValueError('Bad OCSP date: ' + str(ocsp_date)) |
| 306 |
| 307 return asn1.SEQUENCE([ # SingleResponse |
| 308 asn1.SEQUENCE([ # CertID |
| 309 asn1.SEQUENCE([ # hashAlgorithm |
| 310 HASH_SHA1, |
| 311 None, |
| 312 ]), |
| 313 issuer_name_hash, |
| 314 issuer_key_hash, |
| 315 serial, |
| 316 ]), |
| 317 cert_status, |
| 318 asn1.GeneralizedTime( # thisUpdate |
| 319 thisUpdate.strftime(GENERALIZED_TIME_FORMAT) |
| 320 ), |
| 321 asn1.Explicit( # nextUpdate |
| 322 0, |
| 323 asn1.GeneralizedTime(nextUpdate.strftime(GENERALIZED_TIME_FORMAT)) |
| 324 ), |
| 325 ]) |
| 326 |
| 327 def MakeOCSPResponse( |
| 328 issuer_cn, issuer_key, serial, ocsp_states, ocsp_dates, ocsp_produced): |
| 329 # https://tools.ietf.org/html/rfc2560 |
| 330 issuer_name_hash = asn1.OCTETSTRING( |
| 331 hashlib.sha1(asn1.ToDER(Name(cn = issuer_cn))).digest()) |
| 332 |
| 333 issuer_key_hash = asn1.OCTETSTRING( |
| 334 hashlib.sha1(asn1.ToDER(issuer_key)).digest()) |
| 335 |
| 336 now = datetime.datetime.fromtimestamp(time.mktime(time.gmtime())) |
| 337 if ocsp_produced == OCSP_PRODUCED_VALID: |
| 338 producedAt = now - datetime.timedelta(days=1) |
| 339 elif ocsp_produced == OCSP_PRODUCED_BEFORE_CERT: |
| 340 producedAt = datetime.datetime.strptime( |
| 341 "19100101050000Z", GENERALIZED_TIME_FORMAT) |
| 342 elif ocsp_produced == OCSP_PRODUCED_AFTER_CERT: |
| 343 producedAt = datetime.datetime.strptime( |
| 344 "20321201070000Z", GENERALIZED_TIME_FORMAT) |
| 345 else: |
| 346 raise ValueError('Bad OCSP produced: ' + str(ocsp_produced)) |
| 347 |
| 348 single_responses = [ |
| 349 MakeOCSPSingleResponse(issuer_name_hash, issuer_key_hash, serial, |
| 350 ocsp_state, ocsp_date) |
| 351 for ocsp_state, ocsp_date in itertools.izip(ocsp_states, ocsp_dates) |
| 352 ] |
| 353 |
267 basic_resp_data_der = asn1.ToDER(asn1.SEQUENCE([ | 354 basic_resp_data_der = asn1.ToDER(asn1.SEQUENCE([ |
268 asn1.Explicit(2, issuer_key_hash), | 355 asn1.Explicit(2, issuer_key_hash), |
269 asn1.GeneralizedTime("20100101060000Z"), # producedAt | 356 asn1.GeneralizedTime(producedAt.strftime(GENERALIZED_TIME_FORMAT)), |
270 asn1.SEQUENCE([ | 357 asn1.SEQUENCE(single_responses), |
271 asn1.SEQUENCE([ # SingleResponse | |
272 asn1.SEQUENCE([ # CertID | |
273 asn1.SEQUENCE([ # hashAlgorithm | |
274 HASH_SHA1, | |
275 None, | |
276 ]), | |
277 issuer_name_hash, | |
278 issuer_key_hash, | |
279 serial, | |
280 ]), | |
281 cert_status, | |
282 asn1.GeneralizedTime("20100101060000Z"), # thisUpdate | |
283 asn1.Explicit(0, asn1.GeneralizedTime("20300101060000Z")), # nextUpdate | |
284 ]), | |
285 ]), | |
286 ])) | 358 ])) |
287 | 359 |
288 basic_resp = asn1.SEQUENCE([ | 360 basic_resp = asn1.SEQUENCE([ |
289 asn1.Raw(basic_resp_data_der), | 361 asn1.Raw(basic_resp_data_der), |
290 asn1.SEQUENCE([ | 362 asn1.SEQUENCE([ |
291 SHA256_WITH_RSA_ENCRYPTION, | 363 SHA256_WITH_RSA_ENCRYPTION, |
292 None, | 364 None, |
293 ]), | 365 ]), |
294 asn1.BitString(issuer_key.Sign(basic_resp_data_der)), | 366 asn1.BitString(issuer_key.Sign(basic_resp_data_der)), |
295 ]) | 367 ]) |
296 | 368 |
297 resp = asn1.SEQUENCE([ | 369 resp = asn1.SEQUENCE([ |
298 asn1.ENUMERATED(0), | 370 asn1.ENUMERATED(0), |
299 asn1.Explicit(0, asn1.SEQUENCE([ | 371 asn1.Explicit(0, asn1.SEQUENCE([ |
300 OCSP_TYPE_BASIC, | 372 OCSP_TYPE_BASIC, |
301 asn1.OCTETSTRING(asn1.ToDER(basic_resp)), | 373 asn1.OCTETSTRING(asn1.ToDER(basic_resp)), |
302 ])) | 374 ])) |
303 ]) | 375 ]) |
304 | 376 |
305 return asn1.ToDER(resp) | 377 return asn1.ToDER(resp) |
306 | 378 |
307 | 379 |
308 def DERToPEM(der): | 380 def DERToPEM(der): |
309 pem = '-----BEGIN CERTIFICATE-----\n' | 381 pem = '-----BEGIN CERTIFICATE-----\n' |
310 pem += der.encode('base64') | 382 pem += der.encode('base64') |
311 pem += '-----END CERTIFICATE-----\n' | 383 pem += '-----END CERTIFICATE-----\n' |
312 return pem | 384 return pem |
313 | 385 |
314 OCSP_STATE_GOOD = 1 | |
315 OCSP_STATE_REVOKED = 2 | |
316 OCSP_STATE_INVALID = 3 | |
317 OCSP_STATE_UNAUTHORIZED = 4 | |
318 OCSP_STATE_UNKNOWN = 5 | |
319 | |
320 # unauthorizedDER is an OCSPResponse with a status of 6: | 386 # unauthorizedDER is an OCSPResponse with a status of 6: |
321 # SEQUENCE { ENUM(6) } | 387 # SEQUENCE { ENUM(6) } |
322 unauthorizedDER = '30030a0106'.decode('hex') | 388 unauthorizedDER = '30030a0106'.decode('hex') |
323 | 389 |
324 def GenerateCertKeyAndOCSP(subject = "127.0.0.1", | 390 def GenerateCertKeyAndOCSP(subject = "127.0.0.1", |
325 ocsp_url = "http://127.0.0.1", | 391 ocsp_url = "http://127.0.0.1", |
326 ocsp_state = OCSP_STATE_GOOD, | 392 ocsp_states = None, |
| 393 ocsp_dates = None, |
| 394 ocsp_produced = OCSP_PRODUCED_VALID, |
327 serial = 0): | 395 serial = 0): |
328 '''GenerateCertKeyAndOCSP returns a (cert_and_key_pem, ocsp_der) where: | 396 '''GenerateCertKeyAndOCSP returns a (cert_and_key_pem, ocsp_der) where: |
329 * cert_and_key_pem contains a certificate and private key in PEM format | 397 * cert_and_key_pem contains a certificate and private key in PEM format |
330 with the given subject common name and OCSP URL. | 398 with the given subject common name and OCSP URL. |
331 * ocsp_der contains a DER encoded OCSP response or None if ocsp_url is | 399 * ocsp_der contains a DER encoded OCSP response or None if ocsp_url is |
332 None''' | 400 None''' |
333 | 401 |
| 402 if ocsp_states is None: |
| 403 ocsp_states = [OCSP_STATE_GOOD] |
| 404 if ocsp_dates is None: |
| 405 ocsp_dates = [OCSP_DATE_VALID] |
| 406 |
334 if serial == 0: | 407 if serial == 0: |
335 serial = RandomNumber(16) | 408 serial = RandomNumber(16) |
336 cert_der = MakeCertificate(ISSUER_CN, bytes(subject), serial, KEY, KEY, | 409 cert_der = MakeCertificate(ISSUER_CN, bytes(subject), serial, KEY, KEY, |
337 bytes(ocsp_url)) | 410 bytes(ocsp_url)) |
338 cert_pem = DERToPEM(cert_der) | 411 cert_pem = DERToPEM(cert_der) |
339 | 412 |
340 ocsp_der = None | 413 ocsp_der = None |
341 if ocsp_url is not None: | 414 if ocsp_url is not None: |
342 if ocsp_state == OCSP_STATE_UNAUTHORIZED: | 415 if ocsp_states[0] == OCSP_STATE_UNAUTHORIZED: |
343 ocsp_der = unauthorizedDER | 416 ocsp_der = unauthorizedDER |
344 elif ocsp_state == OCSP_STATE_INVALID: | 417 elif ocsp_states[0] == OCSP_STATE_INVALID_RESPONSE: |
345 ocsp_der = '3' | 418 ocsp_der = '3' |
| 419 elif ocsp_states[0] == OCSP_STATE_TRY_LATER: |
| 420 resp = asn1.SEQUENCE([ |
| 421 asn1.ENUMERATED(3), |
| 422 ]) |
| 423 ocsp_der = asn1.ToDER(resp) |
| 424 elif ocsp_states[0] == OCSP_STATE_INVALID_RESPONSE_DATA: |
| 425 invalid_data = asn1.ToDER(asn1.OCTETSTRING('not ocsp data')) |
| 426 basic_resp = asn1.SEQUENCE([ |
| 427 asn1.Raw(invalid_data), |
| 428 asn1.SEQUENCE([ |
| 429 SHA256_WITH_RSA_ENCRYPTION, |
| 430 None, |
| 431 ]), |
| 432 asn1.BitString(KEY.Sign(invalid_data)), |
| 433 ]) |
| 434 resp = asn1.SEQUENCE([ |
| 435 asn1.ENUMERATED(0), |
| 436 asn1.Explicit(0, asn1.SEQUENCE([ |
| 437 OCSP_TYPE_BASIC, |
| 438 asn1.OCTETSTRING(asn1.ToDER(basic_resp)), |
| 439 ])), |
| 440 ]) |
| 441 ocsp_der = asn1.ToDER(resp) |
346 else: | 442 else: |
347 ocsp_der = MakeOCSPResponse(ISSUER_CN, KEY, serial, ocsp_state) | 443 ocsp_der = MakeOCSPResponse( |
| 444 ISSUER_CN, KEY, serial, ocsp_states, ocsp_dates, ocsp_produced) |
348 | 445 |
349 return (cert_pem + KEY_PEM, ocsp_der) | 446 return (cert_pem + KEY_PEM, ocsp_der) |
OLD | NEW |