OLD | NEW |
| (Empty) |
1 # Copyright 2005 Divmod, Inc. See LICENSE file for details | |
2 # Copyright (c) 2007 Twisted Matrix Laboratories. | |
3 # See LICENSE for details. | |
4 | |
5 import itertools | |
6 | |
7 try: | |
8 from OpenSSL import SSL | |
9 from OpenSSL.crypto import PKey, X509, X509Req | |
10 from OpenSSL.crypto import TYPE_RSA | |
11 from twisted.internet import _sslverify as sslverify | |
12 except ImportError: | |
13 pass | |
14 | |
15 from twisted.trial import unittest | |
16 from twisted.internet import protocol, defer, reactor | |
17 from twisted.python.reflect import objgrep, isSame | |
18 from twisted.python import log | |
19 | |
20 from twisted.internet.error import CertificateError, ConnectionLost | |
21 from twisted.internet import interfaces | |
22 | |
23 | |
24 # A couple of static PEM-format certificates to be used by various tests. | |
25 A_HOST_CERTIFICATE_PEM = """ | |
26 -----BEGIN CERTIFICATE----- | |
27 MIIC2jCCAkMCAjA5MA0GCSqGSIb3DQEBBAUAMIG0MQswCQYDVQQGEwJVUzEiMCAG | |
28 A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u | |
29 MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo | |
30 dXNldHRzMScwJQYJKoZIhvcNAQkBFhhub2JvZHlAdHdpc3RlZG1hdHJpeC5jb20x | |
31 ETAPBgNVBAsTCFNlY3VyaXR5MB4XDTA2MDgxNjAxMDEwOFoXDTA3MDgxNjAxMDEw | |
32 OFowgbQxCzAJBgNVBAYTAlVTMSIwIAYDVQQDExlleGFtcGxlLnR3aXN0ZWRtYXRy | |
33 aXguY29tMQ8wDQYDVQQHEwZCb3N0b24xHDAaBgNVBAoTE1R3aXN0ZWQgTWF0cml4 | |
34 IExhYnMxFjAUBgNVBAgTDU1hc3NhY2h1c2V0dHMxJzAlBgkqhkiG9w0BCQEWGG5v | |
35 Ym9keUB0d2lzdGVkbWF0cml4LmNvbTERMA8GA1UECxMIU2VjdXJpdHkwgZ8wDQYJ | |
36 KoZIhvcNAQEBBQADgY0AMIGJAoGBAMzH8CDF/U91y/bdbdbJKnLgnyvQ9Ig9ZNZp | |
37 8hpsu4huil60zF03+Lexg2l1FIfURScjBuaJMR6HiMYTMjhzLuByRZ17KW4wYkGi | |
38 KXstz03VIKy4Tjc+v4aXFI4XdRw10gGMGQlGGscXF/RSoN84VoDKBfOMWdXeConJ | |
39 VyC4w3iJAgMBAAEwDQYJKoZIhvcNAQEEBQADgYEAviMT4lBoxOgQy32LIgZ4lVCj | |
40 JNOiZYg8GMQ6y0ugp86X80UjOvkGtNf/R7YgED/giKRN/q/XJiLJDEhzknkocwmO | |
41 S+4b2XpiaZYxRyKWwL221O7CGmtWYyZl2+92YYmmCiNzWQPfP6BOMlfax0AGLHls | |
42 fXzCWdG0O/3Lk2SRM0I= | |
43 -----END CERTIFICATE----- | |
44 """ | |
45 | |
46 A_PEER_CERTIFICATE_PEM = """ | |
47 -----BEGIN CERTIFICATE----- | |
48 MIIC3jCCAkcCAjA6MA0GCSqGSIb3DQEBBAUAMIG2MQswCQYDVQQGEwJVUzEiMCAG | |
49 A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u | |
50 MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo | |
51 dXNldHRzMSkwJwYJKoZIhvcNAQkBFhpzb21lYm9keUB0d2lzdGVkbWF0cml4LmNv | |
52 bTERMA8GA1UECxMIU2VjdXJpdHkwHhcNMDYwODE2MDEwMTU2WhcNMDcwODE2MDEw | |
53 MTU2WjCBtjELMAkGA1UEBhMCVVMxIjAgBgNVBAMTGWV4YW1wbGUudHdpc3RlZG1h | |
54 dHJpeC5jb20xDzANBgNVBAcTBkJvc3RvbjEcMBoGA1UEChMTVHdpc3RlZCBNYXRy | |
55 aXggTGFiczEWMBQGA1UECBMNTWFzc2FjaHVzZXR0czEpMCcGCSqGSIb3DQEJARYa | |
56 c29tZWJvZHlAdHdpc3RlZG1hdHJpeC5jb20xETAPBgNVBAsTCFNlY3VyaXR5MIGf | |
57 MA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCnm+WBlgFNbMlHehib9ePGGDXF+Nz4 | |
58 CjGuUmVBaXCRCiVjg3kSDecwqfb0fqTksBZ+oQ1UBjMcSh7OcvFXJZnUesBikGWE | |
59 JE4V8Bjh+RmbJ1ZAlUPZ40bAkww0OpyIRAGMvKG+4yLFTO4WDxKmfDcrOb6ID8WJ | |
60 e1u+i3XGkIf/5QIDAQABMA0GCSqGSIb3DQEBBAUAA4GBAD4Oukm3YYkhedUepBEA | |
61 vvXIQhVDqL7mk6OqYdXmNj6R7ZMC8WWvGZxrzDI1bZuB+4aIxxd1FXC3UOHiR/xg | |
62 i9cDl1y8P/qRp4aEBNF6rI0D4AxTbfnHQx4ERDAOShJdYZs/2zifPJ6va6YvrEyr | |
63 yqDtGhklsWW3ZwBzEh5VEOUp | |
64 -----END CERTIFICATE----- | |
65 """ | |
66 | |
67 | |
68 | |
69 counter = itertools.count().next | |
70 def makeCertificate(**kw): | |
71 keypair = PKey() | |
72 keypair.generate_key(TYPE_RSA, 512) | |
73 | |
74 certificate = X509() | |
75 certificate.gmtime_adj_notBefore(0) | |
76 certificate.gmtime_adj_notAfter(60 * 60 * 24 * 365) # One year | |
77 for xname in certificate.get_issuer(), certificate.get_subject(): | |
78 for (k, v) in kw.items(): | |
79 setattr(xname, k, v) | |
80 | |
81 certificate.set_serial_number(counter()) | |
82 certificate.set_pubkey(keypair) | |
83 certificate.sign(keypair, "md5") | |
84 | |
85 return keypair, certificate | |
86 | |
87 | |
88 | |
89 class DataCallbackProtocol(protocol.Protocol): | |
90 def dataReceived(self, data): | |
91 d, self.factory.onData = self.factory.onData, None | |
92 if d is not None: | |
93 d.callback(data) | |
94 | |
95 def connectionLost(self, reason): | |
96 d, self.factory.onLost = self.factory.onLost, None | |
97 if d is not None: | |
98 d.errback(reason) | |
99 | |
100 class WritingProtocol(protocol.Protocol): | |
101 byte = 'x' | |
102 def connectionMade(self): | |
103 self.transport.write(self.byte) | |
104 | |
105 def connectionLost(self, reason): | |
106 self.factory.onLost.errback(reason) | |
107 | |
108 | |
109 class OpenSSLOptions(unittest.TestCase): | |
110 serverPort = clientConn = None | |
111 onServerLost = onClientLost = None | |
112 | |
113 sKey = None | |
114 sCert = None | |
115 cKey = None | |
116 cCert = None | |
117 | |
118 def setUp(self): | |
119 """ | |
120 Create class variables of client and server certificates. | |
121 """ | |
122 self.sKey, self.sCert = makeCertificate( | |
123 O="Server Test Certificate", | |
124 CN="server") | |
125 self.cKey, self.cCert = makeCertificate( | |
126 O="Client Test Certificate", | |
127 CN="client") | |
128 | |
129 def tearDown(self): | |
130 if self.serverPort is not None: | |
131 self.serverPort.stopListening() | |
132 if self.clientConn is not None: | |
133 self.clientConn.disconnect() | |
134 | |
135 L = [] | |
136 if self.onServerLost is not None: | |
137 L.append(self.onServerLost) | |
138 if self.onClientLost is not None: | |
139 L.append(self.onClientLost) | |
140 | |
141 return defer.DeferredList(L, consumeErrors=True) | |
142 | |
143 def loopback(self, serverCertOpts, clientCertOpts, | |
144 onServerLost=None, onClientLost=None, onData=None): | |
145 if onServerLost is None: | |
146 self.onServerLost = onServerLost = defer.Deferred() | |
147 if onClientLost is None: | |
148 self.onClientLost = onClientLost = defer.Deferred() | |
149 if onData is None: | |
150 onData = defer.Deferred() | |
151 | |
152 serverFactory = protocol.ServerFactory() | |
153 serverFactory.protocol = DataCallbackProtocol | |
154 serverFactory.onLost = onServerLost | |
155 serverFactory.onData = onData | |
156 | |
157 clientFactory = protocol.ClientFactory() | |
158 clientFactory.protocol = WritingProtocol | |
159 clientFactory.onLost = onClientLost | |
160 | |
161 self.serverPort = reactor.listenSSL(0, serverFactory, serverCertOpts) | |
162 self.clientConn = reactor.connectSSL('127.0.0.1', | |
163 self.serverPort.getHost().port, clientFactory, clientCertOpts) | |
164 | |
165 def test_abbreviatingDistinguishedNames(self): | |
166 """ | |
167 Check that abbreviations used in certificates correctly map to | |
168 complete names. | |
169 """ | |
170 self.assertEquals( | |
171 sslverify.DN(CN='a', OU='hello'), | |
172 sslverify.DistinguishedName(commonName='a', | |
173 organizationalUnitName='hello')) | |
174 self.assertNotEquals( | |
175 sslverify.DN(CN='a', OU='hello'), | |
176 sslverify.DN(CN='a', OU='hello', emailAddress='xxx')) | |
177 dn = sslverify.DN(CN='abcdefg') | |
178 self.assertRaises(AttributeError, setattr, dn, 'Cn', 'x') | |
179 self.assertEquals(dn.CN, dn.commonName) | |
180 dn.CN = 'bcdefga' | |
181 self.assertEquals(dn.CN, dn.commonName) | |
182 | |
183 | |
184 def testInspectDistinguishedName(self): | |
185 n = sslverify.DN(commonName='common name', | |
186 organizationName='organization name', | |
187 organizationalUnitName='organizational unit name', | |
188 localityName='locality name', | |
189 stateOrProvinceName='state or province name', | |
190 countryName='country name', | |
191 emailAddress='email address') | |
192 s = n.inspect() | |
193 for k in [ | |
194 'common name', | |
195 'organization name', | |
196 'organizational unit name', | |
197 'locality name', | |
198 'state or province name', | |
199 'country name', | |
200 'email address']: | |
201 self.assertIn(k, s, "%r was not in inspect output." % (k,)) | |
202 self.assertIn(k.title(), s, "%r was not in inspect output." % (k,)) | |
203 | |
204 | |
205 def testInspectDistinguishedNameWithoutAllFields(self): | |
206 n = sslverify.DN(localityName='locality name') | |
207 s = n.inspect() | |
208 for k in [ | |
209 'common name', | |
210 'organization name', | |
211 'organizational unit name', | |
212 'state or province name', | |
213 'country name', | |
214 'email address']: | |
215 self.assertNotIn(k, s, "%r was in inspect output." % (k,)) | |
216 self.assertNotIn(k.title(), s, "%r was in inspect output." % (k,)) | |
217 self.assertIn('locality name', s) | |
218 self.assertIn('Locality Name', s) | |
219 | |
220 | |
221 def test_inspectCertificate(self): | |
222 """ | |
223 Test that the C{inspect} method of L{sslverify.Certificate} returns | |
224 a human-readable string containing some basic information about the | |
225 certificate. | |
226 """ | |
227 c = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM) | |
228 self.assertEqual( | |
229 c.inspect().split('\n'), | |
230 ["Certificate For Subject:", | |
231 " Organizational Unit Name: Security", | |
232 " Organization Name: Twisted Matrix Labs", | |
233 " Common Name: example.twistedmatrix.com", | |
234 " State Or Province Name: Massachusetts", | |
235 " Country Name: US", | |
236 " Email Address: nobody@twistedmatrix.com", | |
237 " Locality Name: Boston", | |
238 "", | |
239 "Issuer:", | |
240 " Organizational Unit Name: Security", | |
241 " Organization Name: Twisted Matrix Labs", | |
242 " Common Name: example.twistedmatrix.com", | |
243 " State Or Province Name: Massachusetts", | |
244 " Country Name: US", | |
245 " Email Address: nobody@twistedmatrix.com", | |
246 " Locality Name: Boston", | |
247 "", | |
248 "Serial Number: 12345", | |
249 "Digest: C4:96:11:00:30:C3:EC:EE:A3:55:AA:ED:8C:84:85:18", | |
250 "Public Key with Hash: ff33994c80812aa95a79cdb85362d054"]) | |
251 | |
252 | |
253 def test_certificateOptionsSerialization(self): | |
254 """ | |
255 Test that __setstate__(__getstate__()) round-trips properly. | |
256 """ | |
257 firstOpts = sslverify.OpenSSLCertificateOptions( | |
258 privateKey=self.sKey, | |
259 certificate=self.sCert, | |
260 method=SSL.SSLv3_METHOD, | |
261 verify=True, | |
262 caCerts=[self.sCert], | |
263 verifyDepth=2, | |
264 requireCertificate=False, | |
265 verifyOnce=False, | |
266 enableSingleUseKeys=False, | |
267 enableSessions=False, | |
268 fixBrokenPeers=True) | |
269 context = firstOpts.getContext() | |
270 state = firstOpts.__getstate__() | |
271 | |
272 # The context shouldn't be in the state to serialize | |
273 self.failIf(objgrep(state, context, isSame), | |
274 objgrep(state, context, isSame)) | |
275 | |
276 opts = sslverify.OpenSSLCertificateOptions() | |
277 opts.__setstate__(state) | |
278 self.assertEqual(opts.privateKey, self.sKey) | |
279 self.assertEqual(opts.certificate, self.sCert) | |
280 self.assertEqual(opts.method, SSL.SSLv3_METHOD) | |
281 self.assertEqual(opts.verify, True) | |
282 self.assertEqual(opts.caCerts, [self.sCert]) | |
283 self.assertEqual(opts.verifyDepth, 2) | |
284 self.assertEqual(opts.requireCertificate, False) | |
285 self.assertEqual(opts.verifyOnce, False) | |
286 self.assertEqual(opts.enableSingleUseKeys, False) | |
287 self.assertEqual(opts.enableSessions, False) | |
288 self.assertEqual(opts.fixBrokenPeers, True) | |
289 | |
290 | |
291 def test_allowedAnonymousClientConnection(self): | |
292 """ | |
293 Check that anonymous connections are allowed when certificates aren't | |
294 required on the server. | |
295 """ | |
296 onData = defer.Deferred() | |
297 self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey, | |
298 certificate=self.sCert, requireCertificate=False), | |
299 sslverify.OpenSSLCertificateOptions( | |
300 requireCertificate=False), | |
301 onData=onData) | |
302 | |
303 return onData.addCallback( | |
304 lambda result: self.assertEquals(result, WritingProtocol.byte)) | |
305 | |
306 def test_refusedAnonymousClientConnection(self): | |
307 """ | |
308 Check that anonymous connections are refused when certificates are | |
309 required on the server. | |
310 """ | |
311 onServerLost = defer.Deferred() | |
312 onClientLost = defer.Deferred() | |
313 self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey, | |
314 certificate=self.sCert, verify=True, | |
315 caCerts=[self.sCert], requireCertificate=True), | |
316 sslverify.OpenSSLCertificateOptions( | |
317 requireCertificate=False), | |
318 onServerLost=onServerLost, | |
319 onClientLost=onClientLost) | |
320 | |
321 d = defer.DeferredList([onClientLost, onServerLost], | |
322 consumeErrors=True) | |
323 | |
324 | |
325 def afterLost(((cSuccess, cResult), (sSuccess, sResult))): | |
326 | |
327 self.failIf(cSuccess) | |
328 self.failIf(sSuccess) | |
329 # Win32 fails to report the SSL Error, and report a connection lost | |
330 # instead: there is a race condition so that's not totally | |
331 # surprising (see ticket #2877 in the tracker) | |
332 cResult.trap(SSL.Error, ConnectionLost) | |
333 sResult.trap(SSL.Error) | |
334 | |
335 return d.addCallback(afterLost) | |
336 | |
337 def test_failedCertificateVerification(self): | |
338 """ | |
339 Check that connecting with a certificate not accepted by the server CA | |
340 fails. | |
341 """ | |
342 onServerLost = defer.Deferred() | |
343 onClientLost = defer.Deferred() | |
344 self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey, | |
345 certificate=self.sCert, verify=False, | |
346 requireCertificate=False), | |
347 sslverify.OpenSSLCertificateOptions(verify=True, | |
348 requireCertificate=False, caCerts=[self.cCert]), | |
349 onServerLost=onServerLost, | |
350 onClientLost=onClientLost) | |
351 | |
352 d = defer.DeferredList([onClientLost, onServerLost], | |
353 consumeErrors=True) | |
354 def afterLost(((cSuccess, cResult), (sSuccess, sResult))): | |
355 | |
356 self.failIf(cSuccess) | |
357 self.failIf(sSuccess) | |
358 | |
359 return d.addCallback(afterLost) | |
360 | |
361 def test_successfulCertificateVerification(self): | |
362 """ | |
363 Test a successful connection with client certificate validation on | |
364 server side. | |
365 """ | |
366 onData = defer.Deferred() | |
367 self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey, | |
368 certificate=self.sCert, verify=False, | |
369 requireCertificate=False), | |
370 sslverify.OpenSSLCertificateOptions(verify=True, | |
371 requireCertificate=True, caCerts=[self.sCert]), | |
372 onData=onData) | |
373 | |
374 return onData.addCallback( | |
375 lambda result: self.assertEquals(result, WritingProtocol.byte)) | |
376 | |
377 def test_successfulSymmetricSelfSignedCertificateVerification(self): | |
378 """ | |
379 Test a successful connection with validation on both server and client | |
380 sides. | |
381 """ | |
382 onData = defer.Deferred() | |
383 self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey, | |
384 certificate=self.sCert, verify=True, | |
385 requireCertificate=True, caCerts=[self.cCert]), | |
386 sslverify.OpenSSLCertificateOptions(privateKey=self.cKey, | |
387 certificate=self.cCert, verify=True, | |
388 requireCertificate=True, caCerts=[self.sCert]), | |
389 onData=onData) | |
390 | |
391 return onData.addCallback( | |
392 lambda result: self.assertEquals(result, WritingProtocol.byte)) | |
393 | |
394 def test_verification(self): | |
395 """ | |
396 Check certificates verification building custom certificates data. | |
397 """ | |
398 clientDN = sslverify.DistinguishedName(commonName='client') | |
399 clientKey = sslverify.KeyPair.generate() | |
400 clientCertReq = clientKey.certificateRequest(clientDN) | |
401 | |
402 serverDN = sslverify.DistinguishedName(commonName='server') | |
403 serverKey = sslverify.KeyPair.generate() | |
404 serverCertReq = serverKey.certificateRequest(serverDN) | |
405 | |
406 clientSelfCertReq = clientKey.certificateRequest(clientDN) | |
407 clientSelfCertData = clientKey.signCertificateRequest( | |
408 clientDN, clientSelfCertReq, lambda dn: True, 132) | |
409 clientSelfCert = clientKey.newCertificate(clientSelfCertData) | |
410 | |
411 serverSelfCertReq = serverKey.certificateRequest(serverDN) | |
412 serverSelfCertData = serverKey.signCertificateRequest( | |
413 serverDN, serverSelfCertReq, lambda dn: True, 516) | |
414 serverSelfCert = serverKey.newCertificate(serverSelfCertData) | |
415 | |
416 clientCertData = serverKey.signCertificateRequest( | |
417 serverDN, clientCertReq, lambda dn: True, 7) | |
418 clientCert = clientKey.newCertificate(clientCertData) | |
419 | |
420 serverCertData = clientKey.signCertificateRequest( | |
421 clientDN, serverCertReq, lambda dn: True, 42) | |
422 serverCert = serverKey.newCertificate(serverCertData) | |
423 | |
424 onData = defer.Deferred() | |
425 | |
426 serverOpts = serverCert.options(serverSelfCert) | |
427 clientOpts = clientCert.options(clientSelfCert) | |
428 | |
429 self.loopback(serverOpts, | |
430 clientOpts, | |
431 onData=onData) | |
432 | |
433 return onData.addCallback( | |
434 lambda result: self.assertEquals(result, WritingProtocol.byte)) | |
435 | |
436 | |
437 | |
438 if interfaces.IReactorSSL(reactor, None) is None: | |
439 OpenSSLOptions.skip = "Reactor does not support SSL, cannot run SSL tests" | |
440 | |
441 | |
442 | |
443 class _NotSSLTransport: | |
444 def getHandle(self): | |
445 return self | |
446 | |
447 class _MaybeSSLTransport: | |
448 def getHandle(self): | |
449 return self | |
450 | |
451 def get_peer_certificate(self): | |
452 return None | |
453 | |
454 def get_host_certificate(self): | |
455 return None | |
456 | |
457 | |
458 class _ActualSSLTransport: | |
459 def getHandle(self): | |
460 return self | |
461 | |
462 def get_host_certificate(self): | |
463 return sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM).original | |
464 | |
465 def get_peer_certificate(self): | |
466 return sslverify.Certificate.loadPEM(A_PEER_CERTIFICATE_PEM).original | |
467 | |
468 | |
469 class Constructors(unittest.TestCase): | |
470 def test_peerFromNonSSLTransport(self): | |
471 """ | |
472 Verify that peerFromTransport raises an exception if the transport | |
473 passed is not actually an SSL transport. | |
474 """ | |
475 x = self.assertRaises(CertificateError, | |
476 sslverify.Certificate.peerFromTransport, | |
477 _NotSSLTransport()) | |
478 self.failUnless(str(x).startswith("non-TLS")) | |
479 | |
480 def test_peerFromBlankSSLTransport(self): | |
481 """ | |
482 Verify that peerFromTransport raises an exception if the transport | |
483 passed is an SSL transport, but doesn't have a peer certificate. | |
484 """ | |
485 x = self.assertRaises(CertificateError, | |
486 sslverify.Certificate.peerFromTransport, | |
487 _MaybeSSLTransport()) | |
488 self.failUnless(str(x).startswith("TLS")) | |
489 | |
490 def test_hostFromNonSSLTransport(self): | |
491 """ | |
492 Verify that hostFromTransport raises an exception if the transport | |
493 passed is not actually an SSL transport. | |
494 """ | |
495 x = self.assertRaises(CertificateError, | |
496 sslverify.Certificate.hostFromTransport, | |
497 _NotSSLTransport()) | |
498 self.failUnless(str(x).startswith("non-TLS")) | |
499 | |
500 def test_hostFromBlankSSLTransport(self): | |
501 """ | |
502 Verify that hostFromTransport raises an exception if the transport | |
503 passed is an SSL transport, but doesn't have a host certificate. | |
504 """ | |
505 x = self.assertRaises(CertificateError, | |
506 sslverify.Certificate.hostFromTransport, | |
507 _MaybeSSLTransport()) | |
508 self.failUnless(str(x).startswith("TLS")) | |
509 | |
510 | |
511 def test_hostFromSSLTransport(self): | |
512 """ | |
513 Verify that hostFromTransport successfully creates the correct | |
514 certificate if passed a valid SSL transport. | |
515 """ | |
516 self.assertEqual( | |
517 sslverify.Certificate.hostFromTransport( | |
518 _ActualSSLTransport()).serialNumber(), | |
519 12345) | |
520 | |
521 def test_peerFromSSLTransport(self): | |
522 """ | |
523 Verify that peerFromTransport successfully creates the correct | |
524 certificate if passed a valid SSL transport. | |
525 """ | |
526 self.assertEqual( | |
527 sslverify.Certificate.peerFromTransport( | |
528 _ActualSSLTransport()).serialNumber(), | |
529 12346) | |
530 | |
531 | |
532 | |
533 if interfaces.IReactorSSL(reactor, None) is None: | |
534 Constructors.skip = "Reactor does not support SSL, cannot run SSL tests" | |
OLD | NEW |