OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/python |
| 2 # |
| 3 from pyasn1.codec.der import decoder, encoder |
| 4 from pyasn1_modules import rfc2560, rfc2459, pem |
| 5 from pyasn1.type import univ |
| 6 import sys, hashlib |
| 7 try: |
| 8 import urllib2 |
| 9 except ImportError: |
| 10 import urllib.request as urllib2 |
| 11 |
| 12 sha1oid = univ.ObjectIdentifier((1, 3, 14, 3, 2, 26)) |
| 13 |
| 14 class ValueOnlyBitStringEncoder(encoder.encoder.BitStringEncoder): |
| 15 # These methods just do not encode tag and length fields of TLV |
| 16 def encodeTag(self, *args): return '' |
| 17 def encodeLength(self, *args): return '' |
| 18 def encodeValue(*args): |
| 19 substrate, isConstructed = encoder.encoder.BitStringEncoder.encodeValue(
*args) |
| 20 # OCSP-specific hack follows: cut off the "unused bit count" |
| 21 # encoded bit-string value. |
| 22 return substrate[1:], isConstructed |
| 23 |
| 24 def __call__(self, bitStringValue): |
| 25 return self.encode(None, bitStringValue, defMode=1, maxChunkSize=0) |
| 26 |
| 27 valueOnlyBitStringEncoder = ValueOnlyBitStringEncoder() |
| 28 |
| 29 def mkOcspRequest(issuerCert, userCert): |
| 30 issuerTbsCertificate = issuerCert.getComponentByName('tbsCertificate') |
| 31 issuerSubject = issuerTbsCertificate.getComponentByName('subject') |
| 32 |
| 33 userTbsCertificate = userCert.getComponentByName('tbsCertificate') |
| 34 userIssuer = userTbsCertificate.getComponentByName('issuer') |
| 35 |
| 36 assert issuerSubject == userIssuer, '%s\n%s' % ( |
| 37 issuerSubject.prettyPrint(), userIssuer.prettyPrint() |
| 38 ) |
| 39 |
| 40 userIssuerHash = hashlib.sha1( |
| 41 encoder.encode(userIssuer) |
| 42 ).digest() |
| 43 |
| 44 issuerSubjectPublicKey = issuerTbsCertificate.getComponentByName('subjectPub
licKeyInfo').getComponentByName('subjectPublicKey') |
| 45 |
| 46 issuerKeyHash = hashlib.sha1( |
| 47 valueOnlyBitStringEncoder(issuerSubjectPublicKey) |
| 48 ).digest() |
| 49 |
| 50 userSerialNumber = userTbsCertificate.getComponentByName('serialNumber') |
| 51 |
| 52 # Build request object |
| 53 |
| 54 request = rfc2560.Request() |
| 55 |
| 56 reqCert = request.setComponentByName('reqCert').getComponentByName('reqCert'
) |
| 57 |
| 58 hashAlgorithm = reqCert.setComponentByName('hashAlgorithm').getComponentByNa
me('hashAlgorithm') |
| 59 hashAlgorithm.setComponentByName('algorithm', sha1oid) |
| 60 |
| 61 reqCert.setComponentByName('issuerNameHash', userIssuerHash) |
| 62 reqCert.setComponentByName('issuerKeyHash', issuerKeyHash) |
| 63 reqCert.setComponentByName('serialNumber', userSerialNumber) |
| 64 |
| 65 ocspRequest = rfc2560.OCSPRequest() |
| 66 |
| 67 tbsRequest = ocspRequest.setComponentByName('tbsRequest').getComponentByName
('tbsRequest') |
| 68 tbsRequest.setComponentByName('version', 'v1') |
| 69 |
| 70 requestList = tbsRequest.setComponentByName('requestList').getComponentByNam
e('requestList') |
| 71 requestList.setComponentByPosition(0, request) |
| 72 |
| 73 return ocspRequest |
| 74 |
| 75 def parseOcspRequest(ocspRequest): |
| 76 tbsRequest = ocspRequest['responseStatus'] |
| 77 |
| 78 assert responseStatus == rfc2560.OCSPResponseStatus('successful'), responseS
tatus.prettyPrint() |
| 79 responseBytes = ocspResponse.getComponentByName('responseBytes') |
| 80 responseType = responseBytes.getComponentByName('responseType') |
| 81 assert responseType == id_pkix_ocsp_basic, responseType.prettyPrint() |
| 82 |
| 83 response = responseBytes.getComponentByName('response') |
| 84 |
| 85 basicOCSPResponse, _ = decoder.decode( |
| 86 response, asn1Spec=rfc2560.BasicOCSPResponse() |
| 87 ) |
| 88 |
| 89 tbsResponseData = basicOCSPResponse.getComponentByName('tbsResponseData') |
| 90 |
| 91 response0 = tbsResponseData.getComponentByName('responses').getComponentByPo
sition(0) |
| 92 |
| 93 return ( |
| 94 tbsResponseData.getComponentByName('producedAt'), |
| 95 response0.getComponentByName('certID'), |
| 96 response0.getComponentByName('certStatus').getName(), |
| 97 response0.getComponentByName('thisUpdate') |
| 98 ) |
| 99 |
| 100 if len(sys.argv) != 2: |
| 101 print("""Usage: |
| 102 $ cat CACertificate.pem userCertificate.pem | %s <ocsp-responder-url>""" % sys.a
rgv[0]) |
| 103 sys.exit(-1) |
| 104 else: |
| 105 ocspUrl = sys.argv[1] |
| 106 |
| 107 # Parse CA and user certificates |
| 108 |
| 109 issuerCert, _ = decoder.decode( |
| 110 pem.readPemFromFile(sys.stdin)[1], |
| 111 asn1Spec=rfc2459.Certificate() |
| 112 ) |
| 113 userCert, _ = decoder.decode( |
| 114 pem.readPemFromFile(sys.stdin)[1], |
| 115 asn1Spec=rfc2459.Certificate() |
| 116 ) |
| 117 |
| 118 # Build OCSP request |
| 119 |
| 120 ocspReq = mkOcspRequest(issuerCert, userCert) |
| 121 |
| 122 # Use HTTP POST to get response (see Appendix A of RFC 2560) |
| 123 # In case you need proxies, set the http_proxy env variable |
| 124 |
| 125 httpReq = urllib2.Request( |
| 126 ocspUrl, |
| 127 encoder.encode(ocspReq), |
| 128 { 'Content-Type': 'application/ocsp-request' } |
| 129 ) |
| 130 httpRsp = urllib2.urlopen(httpReq).read() |
| 131 |
| 132 # Process OCSP response |
| 133 |
| 134 ocspRsp, _ = decoder.decode(httpRsp, asn1Spec=rfc2560.OCSPResponse()) |
| 135 |
| 136 producedAt, certId, certStatus, thisUpdate = parseOcspResponse(ocspRsp) |
| 137 |
| 138 print('Certificate ID %s is %s at %s till %s\n' % ( |
| 139 certId.getComponentByName('serialNumber'), |
| 140 certStatus, |
| 141 producedAt, |
| 142 thisUpdate |
| 143 )) |
OLD | NEW |