OLD | NEW |
| (Empty) |
1 #!/usr/bin/python | |
2 # | |
3 from pyasn1.codec.der import decoder, encoder | |
4 from pyasn1_modules import rfc2560, rfc2459, pem | |
5 from pyasn1.type import univ | |
6 import sys, hashlib | |
7 try: | |
8 import urllib2 | |
9 except ImportError: | |
10 import urllib.request as urllib2 | |
11 | |
12 sha1oid = univ.ObjectIdentifier((1, 3, 14, 3, 2, 26)) | |
13 | |
14 class ValueOnlyBitStringEncoder(encoder.encoder.BitStringEncoder): | |
15 # These methods just do not encode tag and length fields of TLV | |
16 def encodeTag(self, *args): return '' | |
17 def encodeLength(self, *args): return '' | |
18 def encodeValue(*args): | |
19 substrate, isConstructed = encoder.encoder.BitStringEncoder.encodeValue(
*args) | |
20 # OCSP-specific hack follows: cut off the "unused bit count" | |
21 # encoded bit-string value. | |
22 return substrate[1:], isConstructed | |
23 | |
24 def __call__(self, bitStringValue): | |
25 return self.encode(None, bitStringValue, defMode=1, maxChunkSize=0) | |
26 | |
27 valueOnlyBitStringEncoder = ValueOnlyBitStringEncoder() | |
28 | |
29 def mkOcspRequest(issuerCert, userCert): | |
30 issuerTbsCertificate = issuerCert.getComponentByName('tbsCertificate') | |
31 issuerSubject = issuerTbsCertificate.getComponentByName('subject') | |
32 | |
33 userTbsCertificate = userCert.getComponentByName('tbsCertificate') | |
34 userIssuer = userTbsCertificate.getComponentByName('issuer') | |
35 | |
36 assert issuerSubject == userIssuer, '%s\n%s' % ( | |
37 issuerSubject.prettyPrint(), userIssuer.prettyPrint() | |
38 ) | |
39 | |
40 userIssuerHash = hashlib.sha1( | |
41 encoder.encode(userIssuer) | |
42 ).digest() | |
43 | |
44 issuerSubjectPublicKey = issuerTbsCertificate.getComponentByName('subjectPub
licKeyInfo').getComponentByName('subjectPublicKey') | |
45 | |
46 issuerKeyHash = hashlib.sha1( | |
47 valueOnlyBitStringEncoder(issuerSubjectPublicKey) | |
48 ).digest() | |
49 | |
50 userSerialNumber = userTbsCertificate.getComponentByName('serialNumber') | |
51 | |
52 # Build request object | |
53 | |
54 request = rfc2560.Request() | |
55 | |
56 reqCert = request.setComponentByName('reqCert').getComponentByName('reqCert'
) | |
57 | |
58 hashAlgorithm = reqCert.setComponentByName('hashAlgorithm').getComponentByNa
me('hashAlgorithm') | |
59 hashAlgorithm.setComponentByName('algorithm', sha1oid) | |
60 | |
61 reqCert.setComponentByName('issuerNameHash', userIssuerHash) | |
62 reqCert.setComponentByName('issuerKeyHash', issuerKeyHash) | |
63 reqCert.setComponentByName('serialNumber', userSerialNumber) | |
64 | |
65 ocspRequest = rfc2560.OCSPRequest() | |
66 | |
67 tbsRequest = ocspRequest.setComponentByName('tbsRequest').getComponentByName
('tbsRequest') | |
68 tbsRequest.setComponentByName('version', 'v1') | |
69 | |
70 requestList = tbsRequest.setComponentByName('requestList').getComponentByNam
e('requestList') | |
71 requestList.setComponentByPosition(0, request) | |
72 | |
73 return ocspRequest | |
74 | |
75 def parseOcspResponse(ocspResponse): | |
76 responseStatus = ocspResponse.getComponentByName('responseStatus') | |
77 assert responseStatus == rfc2560.OCSPResponseStatus('successful'), responseS
tatus.prettyPrint() | |
78 responseBytes = ocspResponse.getComponentByName('responseBytes') | |
79 responseType = responseBytes.getComponentByName('responseType') | |
80 assert responseType == id_pkix_ocsp_basic, responseType.prettyPrint() | |
81 | |
82 response = responseBytes.getComponentByName('response') | |
83 | |
84 basicOCSPResponse, _ = decoder.decode( | |
85 response, asn1Spec=rfc2560.BasicOCSPResponse() | |
86 ) | |
87 | |
88 tbsResponseData = basicOCSPResponse.getComponentByName('tbsResponseData') | |
89 | |
90 response0 = tbsResponseData.getComponentByName('responses').getComponentByPo
sition(0) | |
91 | |
92 return ( | |
93 tbsResponseData.getComponentByName('producedAt'), | |
94 response0.getComponentByName('certID'), | |
95 response0.getComponentByName('certStatus').getName(), | |
96 response0.getComponentByName('thisUpdate') | |
97 ) | |
98 | |
99 if len(sys.argv) != 2: | |
100 print("""Usage: | |
101 $ cat CACertificate.pem userCertificate.pem | %s <ocsp-responder-url>""" % sys.a
rgv[0]) | |
102 sys.exit(-1) | |
103 else: | |
104 ocspUrl = sys.argv[1] | |
105 | |
106 # Parse CA and user certificates | |
107 | |
108 issuerCert, _ = decoder.decode( | |
109 pem.readPemBlocksFromFile( | |
110 sys.stdin, ('-----BEGIN CERTIFICATE-----', '-----END CERTIFICATE-----') | |
111 )[1], | |
112 asn1Spec=rfc2459.Certificate() | |
113 ) | |
114 userCert, _ = decoder.decode( | |
115 pem.readPemBlocksFromFile( | |
116 sys.stdin, ('-----BEGIN CERTIFICATE-----', '-----END CERTIFICATE-----') | |
117 )[1], | |
118 asn1Spec=rfc2459.Certificate() | |
119 ) | |
120 | |
121 # Build OCSP request | |
122 | |
123 ocspReq = mkOcspRequest(issuerCert, userCert) | |
124 | |
125 # Use HTTP POST to get response (see Appendix A of RFC 2560) | |
126 # In case you need proxies, set the http_proxy env variable | |
127 | |
128 httpReq = urllib2.Request( | |
129 ocspUrl, | |
130 encoder.encode(ocspReq), | |
131 { 'Content-Type': 'application/ocsp-request' } | |
132 ) | |
133 httpRsp = urllib2.urlopen(httpReq).read() | |
134 | |
135 # Process OCSP response | |
136 | |
137 ocspRsp, _ = decoder.decode(httpRsp, asn1Spec=rfc2560.OCSPResponse()) | |
138 | |
139 producedAt, certId, certStatus, thisUpdate = parseOcspResponse(ocspRsp) | |
140 | |
141 print('Certificate ID %s is %s at %s till %s\n' % ( | |
142 certId.getComponentByName('serialNumber'), | |
143 certStatus, | |
144 producedAt, | |
145 thisUpdate)) | |
OLD | NEW |