| OLD | NEW |
| (Empty) |
| 1 # Copyright 2005 Divmod, Inc. See LICENSE file for details | |
| 2 # Copyright (c) 2007 Twisted Matrix Laboratories. | |
| 3 # See LICENSE for details. | |
| 4 | |
| 5 import itertools | |
| 6 | |
| 7 try: | |
| 8 from OpenSSL import SSL | |
| 9 from OpenSSL.crypto import PKey, X509, X509Req | |
| 10 from OpenSSL.crypto import TYPE_RSA | |
| 11 from twisted.internet import _sslverify as sslverify | |
| 12 except ImportError: | |
| 13 pass | |
| 14 | |
| 15 from twisted.trial import unittest | |
| 16 from twisted.internet import protocol, defer, reactor | |
| 17 from twisted.python.reflect import objgrep, isSame | |
| 18 from twisted.python import log | |
| 19 | |
| 20 from twisted.internet.error import CertificateError, ConnectionLost | |
| 21 from twisted.internet import interfaces | |
| 22 | |
| 23 | |
| 24 # A couple of static PEM-format certificates to be used by various tests. | |
| 25 A_HOST_CERTIFICATE_PEM = """ | |
| 26 -----BEGIN CERTIFICATE----- | |
| 27 MIIC2jCCAkMCAjA5MA0GCSqGSIb3DQEBBAUAMIG0MQswCQYDVQQGEwJVUzEiMCAG | |
| 28 A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u | |
| 29 MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo | |
| 30 dXNldHRzMScwJQYJKoZIhvcNAQkBFhhub2JvZHlAdHdpc3RlZG1hdHJpeC5jb20x | |
| 31 ETAPBgNVBAsTCFNlY3VyaXR5MB4XDTA2MDgxNjAxMDEwOFoXDTA3MDgxNjAxMDEw | |
| 32 OFowgbQxCzAJBgNVBAYTAlVTMSIwIAYDVQQDExlleGFtcGxlLnR3aXN0ZWRtYXRy | |
| 33 aXguY29tMQ8wDQYDVQQHEwZCb3N0b24xHDAaBgNVBAoTE1R3aXN0ZWQgTWF0cml4 | |
| 34 IExhYnMxFjAUBgNVBAgTDU1hc3NhY2h1c2V0dHMxJzAlBgkqhkiG9w0BCQEWGG5v | |
| 35 Ym9keUB0d2lzdGVkbWF0cml4LmNvbTERMA8GA1UECxMIU2VjdXJpdHkwgZ8wDQYJ | |
| 36 KoZIhvcNAQEBBQADgY0AMIGJAoGBAMzH8CDF/U91y/bdbdbJKnLgnyvQ9Ig9ZNZp | |
| 37 8hpsu4huil60zF03+Lexg2l1FIfURScjBuaJMR6HiMYTMjhzLuByRZ17KW4wYkGi | |
| 38 KXstz03VIKy4Tjc+v4aXFI4XdRw10gGMGQlGGscXF/RSoN84VoDKBfOMWdXeConJ | |
| 39 VyC4w3iJAgMBAAEwDQYJKoZIhvcNAQEEBQADgYEAviMT4lBoxOgQy32LIgZ4lVCj | |
| 40 JNOiZYg8GMQ6y0ugp86X80UjOvkGtNf/R7YgED/giKRN/q/XJiLJDEhzknkocwmO | |
| 41 S+4b2XpiaZYxRyKWwL221O7CGmtWYyZl2+92YYmmCiNzWQPfP6BOMlfax0AGLHls | |
| 42 fXzCWdG0O/3Lk2SRM0I= | |
| 43 -----END CERTIFICATE----- | |
| 44 """ | |
| 45 | |
| 46 A_PEER_CERTIFICATE_PEM = """ | |
| 47 -----BEGIN CERTIFICATE----- | |
| 48 MIIC3jCCAkcCAjA6MA0GCSqGSIb3DQEBBAUAMIG2MQswCQYDVQQGEwJVUzEiMCAG | |
| 49 A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u | |
| 50 MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo | |
| 51 dXNldHRzMSkwJwYJKoZIhvcNAQkBFhpzb21lYm9keUB0d2lzdGVkbWF0cml4LmNv | |
| 52 bTERMA8GA1UECxMIU2VjdXJpdHkwHhcNMDYwODE2MDEwMTU2WhcNMDcwODE2MDEw | |
| 53 MTU2WjCBtjELMAkGA1UEBhMCVVMxIjAgBgNVBAMTGWV4YW1wbGUudHdpc3RlZG1h | |
| 54 dHJpeC5jb20xDzANBgNVBAcTBkJvc3RvbjEcMBoGA1UEChMTVHdpc3RlZCBNYXRy | |
| 55 aXggTGFiczEWMBQGA1UECBMNTWFzc2FjaHVzZXR0czEpMCcGCSqGSIb3DQEJARYa | |
| 56 c29tZWJvZHlAdHdpc3RlZG1hdHJpeC5jb20xETAPBgNVBAsTCFNlY3VyaXR5MIGf | |
| 57 MA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCnm+WBlgFNbMlHehib9ePGGDXF+Nz4 | |
| 58 CjGuUmVBaXCRCiVjg3kSDecwqfb0fqTksBZ+oQ1UBjMcSh7OcvFXJZnUesBikGWE | |
| 59 JE4V8Bjh+RmbJ1ZAlUPZ40bAkww0OpyIRAGMvKG+4yLFTO4WDxKmfDcrOb6ID8WJ | |
| 60 e1u+i3XGkIf/5QIDAQABMA0GCSqGSIb3DQEBBAUAA4GBAD4Oukm3YYkhedUepBEA | |
| 61 vvXIQhVDqL7mk6OqYdXmNj6R7ZMC8WWvGZxrzDI1bZuB+4aIxxd1FXC3UOHiR/xg | |
| 62 i9cDl1y8P/qRp4aEBNF6rI0D4AxTbfnHQx4ERDAOShJdYZs/2zifPJ6va6YvrEyr | |
| 63 yqDtGhklsWW3ZwBzEh5VEOUp | |
| 64 -----END CERTIFICATE----- | |
| 65 """ | |
| 66 | |
| 67 | |
| 68 | |
| 69 counter = itertools.count().next | |
| 70 def makeCertificate(**kw): | |
| 71 keypair = PKey() | |
| 72 keypair.generate_key(TYPE_RSA, 512) | |
| 73 | |
| 74 certificate = X509() | |
| 75 certificate.gmtime_adj_notBefore(0) | |
| 76 certificate.gmtime_adj_notAfter(60 * 60 * 24 * 365) # One year | |
| 77 for xname in certificate.get_issuer(), certificate.get_subject(): | |
| 78 for (k, v) in kw.items(): | |
| 79 setattr(xname, k, v) | |
| 80 | |
| 81 certificate.set_serial_number(counter()) | |
| 82 certificate.set_pubkey(keypair) | |
| 83 certificate.sign(keypair, "md5") | |
| 84 | |
| 85 return keypair, certificate | |
| 86 | |
| 87 | |
| 88 | |
| 89 class DataCallbackProtocol(protocol.Protocol): | |
| 90 def dataReceived(self, data): | |
| 91 d, self.factory.onData = self.factory.onData, None | |
| 92 if d is not None: | |
| 93 d.callback(data) | |
| 94 | |
| 95 def connectionLost(self, reason): | |
| 96 d, self.factory.onLost = self.factory.onLost, None | |
| 97 if d is not None: | |
| 98 d.errback(reason) | |
| 99 | |
| 100 class WritingProtocol(protocol.Protocol): | |
| 101 byte = 'x' | |
| 102 def connectionMade(self): | |
| 103 self.transport.write(self.byte) | |
| 104 | |
| 105 def connectionLost(self, reason): | |
| 106 self.factory.onLost.errback(reason) | |
| 107 | |
| 108 | |
| 109 class OpenSSLOptions(unittest.TestCase): | |
| 110 serverPort = clientConn = None | |
| 111 onServerLost = onClientLost = None | |
| 112 | |
| 113 sKey = None | |
| 114 sCert = None | |
| 115 cKey = None | |
| 116 cCert = None | |
| 117 | |
| 118 def setUp(self): | |
| 119 """ | |
| 120 Create class variables of client and server certificates. | |
| 121 """ | |
| 122 self.sKey, self.sCert = makeCertificate( | |
| 123 O="Server Test Certificate", | |
| 124 CN="server") | |
| 125 self.cKey, self.cCert = makeCertificate( | |
| 126 O="Client Test Certificate", | |
| 127 CN="client") | |
| 128 | |
| 129 def tearDown(self): | |
| 130 if self.serverPort is not None: | |
| 131 self.serverPort.stopListening() | |
| 132 if self.clientConn is not None: | |
| 133 self.clientConn.disconnect() | |
| 134 | |
| 135 L = [] | |
| 136 if self.onServerLost is not None: | |
| 137 L.append(self.onServerLost) | |
| 138 if self.onClientLost is not None: | |
| 139 L.append(self.onClientLost) | |
| 140 | |
| 141 return defer.DeferredList(L, consumeErrors=True) | |
| 142 | |
| 143 def loopback(self, serverCertOpts, clientCertOpts, | |
| 144 onServerLost=None, onClientLost=None, onData=None): | |
| 145 if onServerLost is None: | |
| 146 self.onServerLost = onServerLost = defer.Deferred() | |
| 147 if onClientLost is None: | |
| 148 self.onClientLost = onClientLost = defer.Deferred() | |
| 149 if onData is None: | |
| 150 onData = defer.Deferred() | |
| 151 | |
| 152 serverFactory = protocol.ServerFactory() | |
| 153 serverFactory.protocol = DataCallbackProtocol | |
| 154 serverFactory.onLost = onServerLost | |
| 155 serverFactory.onData = onData | |
| 156 | |
| 157 clientFactory = protocol.ClientFactory() | |
| 158 clientFactory.protocol = WritingProtocol | |
| 159 clientFactory.onLost = onClientLost | |
| 160 | |
| 161 self.serverPort = reactor.listenSSL(0, serverFactory, serverCertOpts) | |
| 162 self.clientConn = reactor.connectSSL('127.0.0.1', | |
| 163 self.serverPort.getHost().port, clientFactory, clientCertOpts) | |
| 164 | |
| 165 def test_abbreviatingDistinguishedNames(self): | |
| 166 """ | |
| 167 Check that abbreviations used in certificates correctly map to | |
| 168 complete names. | |
| 169 """ | |
| 170 self.assertEquals( | |
| 171 sslverify.DN(CN='a', OU='hello'), | |
| 172 sslverify.DistinguishedName(commonName='a', | |
| 173 organizationalUnitName='hello')) | |
| 174 self.assertNotEquals( | |
| 175 sslverify.DN(CN='a', OU='hello'), | |
| 176 sslverify.DN(CN='a', OU='hello', emailAddress='xxx')) | |
| 177 dn = sslverify.DN(CN='abcdefg') | |
| 178 self.assertRaises(AttributeError, setattr, dn, 'Cn', 'x') | |
| 179 self.assertEquals(dn.CN, dn.commonName) | |
| 180 dn.CN = 'bcdefga' | |
| 181 self.assertEquals(dn.CN, dn.commonName) | |
| 182 | |
| 183 | |
| 184 def testInspectDistinguishedName(self): | |
| 185 n = sslverify.DN(commonName='common name', | |
| 186 organizationName='organization name', | |
| 187 organizationalUnitName='organizational unit name', | |
| 188 localityName='locality name', | |
| 189 stateOrProvinceName='state or province name', | |
| 190 countryName='country name', | |
| 191 emailAddress='email address') | |
| 192 s = n.inspect() | |
| 193 for k in [ | |
| 194 'common name', | |
| 195 'organization name', | |
| 196 'organizational unit name', | |
| 197 'locality name', | |
| 198 'state or province name', | |
| 199 'country name', | |
| 200 'email address']: | |
| 201 self.assertIn(k, s, "%r was not in inspect output." % (k,)) | |
| 202 self.assertIn(k.title(), s, "%r was not in inspect output." % (k,)) | |
| 203 | |
| 204 | |
| 205 def testInspectDistinguishedNameWithoutAllFields(self): | |
| 206 n = sslverify.DN(localityName='locality name') | |
| 207 s = n.inspect() | |
| 208 for k in [ | |
| 209 'common name', | |
| 210 'organization name', | |
| 211 'organizational unit name', | |
| 212 'state or province name', | |
| 213 'country name', | |
| 214 'email address']: | |
| 215 self.assertNotIn(k, s, "%r was in inspect output." % (k,)) | |
| 216 self.assertNotIn(k.title(), s, "%r was in inspect output." % (k,)) | |
| 217 self.assertIn('locality name', s) | |
| 218 self.assertIn('Locality Name', s) | |
| 219 | |
| 220 | |
| 221 def test_inspectCertificate(self): | |
| 222 """ | |
| 223 Test that the C{inspect} method of L{sslverify.Certificate} returns | |
| 224 a human-readable string containing some basic information about the | |
| 225 certificate. | |
| 226 """ | |
| 227 c = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM) | |
| 228 self.assertEqual( | |
| 229 c.inspect().split('\n'), | |
| 230 ["Certificate For Subject:", | |
| 231 " Organizational Unit Name: Security", | |
| 232 " Organization Name: Twisted Matrix Labs", | |
| 233 " Common Name: example.twistedmatrix.com", | |
| 234 " State Or Province Name: Massachusetts", | |
| 235 " Country Name: US", | |
| 236 " Email Address: nobody@twistedmatrix.com", | |
| 237 " Locality Name: Boston", | |
| 238 "", | |
| 239 "Issuer:", | |
| 240 " Organizational Unit Name: Security", | |
| 241 " Organization Name: Twisted Matrix Labs", | |
| 242 " Common Name: example.twistedmatrix.com", | |
| 243 " State Or Province Name: Massachusetts", | |
| 244 " Country Name: US", | |
| 245 " Email Address: nobody@twistedmatrix.com", | |
| 246 " Locality Name: Boston", | |
| 247 "", | |
| 248 "Serial Number: 12345", | |
| 249 "Digest: C4:96:11:00:30:C3:EC:EE:A3:55:AA:ED:8C:84:85:18", | |
| 250 "Public Key with Hash: ff33994c80812aa95a79cdb85362d054"]) | |
| 251 | |
| 252 | |
| 253 def test_certificateOptionsSerialization(self): | |
| 254 """ | |
| 255 Test that __setstate__(__getstate__()) round-trips properly. | |
| 256 """ | |
| 257 firstOpts = sslverify.OpenSSLCertificateOptions( | |
| 258 privateKey=self.sKey, | |
| 259 certificate=self.sCert, | |
| 260 method=SSL.SSLv3_METHOD, | |
| 261 verify=True, | |
| 262 caCerts=[self.sCert], | |
| 263 verifyDepth=2, | |
| 264 requireCertificate=False, | |
| 265 verifyOnce=False, | |
| 266 enableSingleUseKeys=False, | |
| 267 enableSessions=False, | |
| 268 fixBrokenPeers=True) | |
| 269 context = firstOpts.getContext() | |
| 270 state = firstOpts.__getstate__() | |
| 271 | |
| 272 # The context shouldn't be in the state to serialize | |
| 273 self.failIf(objgrep(state, context, isSame), | |
| 274 objgrep(state, context, isSame)) | |
| 275 | |
| 276 opts = sslverify.OpenSSLCertificateOptions() | |
| 277 opts.__setstate__(state) | |
| 278 self.assertEqual(opts.privateKey, self.sKey) | |
| 279 self.assertEqual(opts.certificate, self.sCert) | |
| 280 self.assertEqual(opts.method, SSL.SSLv3_METHOD) | |
| 281 self.assertEqual(opts.verify, True) | |
| 282 self.assertEqual(opts.caCerts, [self.sCert]) | |
| 283 self.assertEqual(opts.verifyDepth, 2) | |
| 284 self.assertEqual(opts.requireCertificate, False) | |
| 285 self.assertEqual(opts.verifyOnce, False) | |
| 286 self.assertEqual(opts.enableSingleUseKeys, False) | |
| 287 self.assertEqual(opts.enableSessions, False) | |
| 288 self.assertEqual(opts.fixBrokenPeers, True) | |
| 289 | |
| 290 | |
| 291 def test_allowedAnonymousClientConnection(self): | |
| 292 """ | |
| 293 Check that anonymous connections are allowed when certificates aren't | |
| 294 required on the server. | |
| 295 """ | |
| 296 onData = defer.Deferred() | |
| 297 self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey, | |
| 298 certificate=self.sCert, requireCertificate=False), | |
| 299 sslverify.OpenSSLCertificateOptions( | |
| 300 requireCertificate=False), | |
| 301 onData=onData) | |
| 302 | |
| 303 return onData.addCallback( | |
| 304 lambda result: self.assertEquals(result, WritingProtocol.byte)) | |
| 305 | |
| 306 def test_refusedAnonymousClientConnection(self): | |
| 307 """ | |
| 308 Check that anonymous connections are refused when certificates are | |
| 309 required on the server. | |
| 310 """ | |
| 311 onServerLost = defer.Deferred() | |
| 312 onClientLost = defer.Deferred() | |
| 313 self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey, | |
| 314 certificate=self.sCert, verify=True, | |
| 315 caCerts=[self.sCert], requireCertificate=True), | |
| 316 sslverify.OpenSSLCertificateOptions( | |
| 317 requireCertificate=False), | |
| 318 onServerLost=onServerLost, | |
| 319 onClientLost=onClientLost) | |
| 320 | |
| 321 d = defer.DeferredList([onClientLost, onServerLost], | |
| 322 consumeErrors=True) | |
| 323 | |
| 324 | |
| 325 def afterLost(((cSuccess, cResult), (sSuccess, sResult))): | |
| 326 | |
| 327 self.failIf(cSuccess) | |
| 328 self.failIf(sSuccess) | |
| 329 # Win32 fails to report the SSL Error, and report a connection lost | |
| 330 # instead: there is a race condition so that's not totally | |
| 331 # surprising (see ticket #2877 in the tracker) | |
| 332 cResult.trap(SSL.Error, ConnectionLost) | |
| 333 sResult.trap(SSL.Error) | |
| 334 | |
| 335 return d.addCallback(afterLost) | |
| 336 | |
| 337 def test_failedCertificateVerification(self): | |
| 338 """ | |
| 339 Check that connecting with a certificate not accepted by the server CA | |
| 340 fails. | |
| 341 """ | |
| 342 onServerLost = defer.Deferred() | |
| 343 onClientLost = defer.Deferred() | |
| 344 self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey, | |
| 345 certificate=self.sCert, verify=False, | |
| 346 requireCertificate=False), | |
| 347 sslverify.OpenSSLCertificateOptions(verify=True, | |
| 348 requireCertificate=False, caCerts=[self.cCert]), | |
| 349 onServerLost=onServerLost, | |
| 350 onClientLost=onClientLost) | |
| 351 | |
| 352 d = defer.DeferredList([onClientLost, onServerLost], | |
| 353 consumeErrors=True) | |
| 354 def afterLost(((cSuccess, cResult), (sSuccess, sResult))): | |
| 355 | |
| 356 self.failIf(cSuccess) | |
| 357 self.failIf(sSuccess) | |
| 358 | |
| 359 return d.addCallback(afterLost) | |
| 360 | |
| 361 def test_successfulCertificateVerification(self): | |
| 362 """ | |
| 363 Test a successful connection with client certificate validation on | |
| 364 server side. | |
| 365 """ | |
| 366 onData = defer.Deferred() | |
| 367 self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey, | |
| 368 certificate=self.sCert, verify=False, | |
| 369 requireCertificate=False), | |
| 370 sslverify.OpenSSLCertificateOptions(verify=True, | |
| 371 requireCertificate=True, caCerts=[self.sCert]), | |
| 372 onData=onData) | |
| 373 | |
| 374 return onData.addCallback( | |
| 375 lambda result: self.assertEquals(result, WritingProtocol.byte)) | |
| 376 | |
| 377 def test_successfulSymmetricSelfSignedCertificateVerification(self): | |
| 378 """ | |
| 379 Test a successful connection with validation on both server and client | |
| 380 sides. | |
| 381 """ | |
| 382 onData = defer.Deferred() | |
| 383 self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey, | |
| 384 certificate=self.sCert, verify=True, | |
| 385 requireCertificate=True, caCerts=[self.cCert]), | |
| 386 sslverify.OpenSSLCertificateOptions(privateKey=self.cKey, | |
| 387 certificate=self.cCert, verify=True, | |
| 388 requireCertificate=True, caCerts=[self.sCert]), | |
| 389 onData=onData) | |
| 390 | |
| 391 return onData.addCallback( | |
| 392 lambda result: self.assertEquals(result, WritingProtocol.byte)) | |
| 393 | |
| 394 def test_verification(self): | |
| 395 """ | |
| 396 Check certificates verification building custom certificates data. | |
| 397 """ | |
| 398 clientDN = sslverify.DistinguishedName(commonName='client') | |
| 399 clientKey = sslverify.KeyPair.generate() | |
| 400 clientCertReq = clientKey.certificateRequest(clientDN) | |
| 401 | |
| 402 serverDN = sslverify.DistinguishedName(commonName='server') | |
| 403 serverKey = sslverify.KeyPair.generate() | |
| 404 serverCertReq = serverKey.certificateRequest(serverDN) | |
| 405 | |
| 406 clientSelfCertReq = clientKey.certificateRequest(clientDN) | |
| 407 clientSelfCertData = clientKey.signCertificateRequest( | |
| 408 clientDN, clientSelfCertReq, lambda dn: True, 132) | |
| 409 clientSelfCert = clientKey.newCertificate(clientSelfCertData) | |
| 410 | |
| 411 serverSelfCertReq = serverKey.certificateRequest(serverDN) | |
| 412 serverSelfCertData = serverKey.signCertificateRequest( | |
| 413 serverDN, serverSelfCertReq, lambda dn: True, 516) | |
| 414 serverSelfCert = serverKey.newCertificate(serverSelfCertData) | |
| 415 | |
| 416 clientCertData = serverKey.signCertificateRequest( | |
| 417 serverDN, clientCertReq, lambda dn: True, 7) | |
| 418 clientCert = clientKey.newCertificate(clientCertData) | |
| 419 | |
| 420 serverCertData = clientKey.signCertificateRequest( | |
| 421 clientDN, serverCertReq, lambda dn: True, 42) | |
| 422 serverCert = serverKey.newCertificate(serverCertData) | |
| 423 | |
| 424 onData = defer.Deferred() | |
| 425 | |
| 426 serverOpts = serverCert.options(serverSelfCert) | |
| 427 clientOpts = clientCert.options(clientSelfCert) | |
| 428 | |
| 429 self.loopback(serverOpts, | |
| 430 clientOpts, | |
| 431 onData=onData) | |
| 432 | |
| 433 return onData.addCallback( | |
| 434 lambda result: self.assertEquals(result, WritingProtocol.byte)) | |
| 435 | |
| 436 | |
| 437 | |
| 438 if interfaces.IReactorSSL(reactor, None) is None: | |
| 439 OpenSSLOptions.skip = "Reactor does not support SSL, cannot run SSL tests" | |
| 440 | |
| 441 | |
| 442 | |
| 443 class _NotSSLTransport: | |
| 444 def getHandle(self): | |
| 445 return self | |
| 446 | |
| 447 class _MaybeSSLTransport: | |
| 448 def getHandle(self): | |
| 449 return self | |
| 450 | |
| 451 def get_peer_certificate(self): | |
| 452 return None | |
| 453 | |
| 454 def get_host_certificate(self): | |
| 455 return None | |
| 456 | |
| 457 | |
| 458 class _ActualSSLTransport: | |
| 459 def getHandle(self): | |
| 460 return self | |
| 461 | |
| 462 def get_host_certificate(self): | |
| 463 return sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM).original | |
| 464 | |
| 465 def get_peer_certificate(self): | |
| 466 return sslverify.Certificate.loadPEM(A_PEER_CERTIFICATE_PEM).original | |
| 467 | |
| 468 | |
| 469 class Constructors(unittest.TestCase): | |
| 470 def test_peerFromNonSSLTransport(self): | |
| 471 """ | |
| 472 Verify that peerFromTransport raises an exception if the transport | |
| 473 passed is not actually an SSL transport. | |
| 474 """ | |
| 475 x = self.assertRaises(CertificateError, | |
| 476 sslverify.Certificate.peerFromTransport, | |
| 477 _NotSSLTransport()) | |
| 478 self.failUnless(str(x).startswith("non-TLS")) | |
| 479 | |
| 480 def test_peerFromBlankSSLTransport(self): | |
| 481 """ | |
| 482 Verify that peerFromTransport raises an exception if the transport | |
| 483 passed is an SSL transport, but doesn't have a peer certificate. | |
| 484 """ | |
| 485 x = self.assertRaises(CertificateError, | |
| 486 sslverify.Certificate.peerFromTransport, | |
| 487 _MaybeSSLTransport()) | |
| 488 self.failUnless(str(x).startswith("TLS")) | |
| 489 | |
| 490 def test_hostFromNonSSLTransport(self): | |
| 491 """ | |
| 492 Verify that hostFromTransport raises an exception if the transport | |
| 493 passed is not actually an SSL transport. | |
| 494 """ | |
| 495 x = self.assertRaises(CertificateError, | |
| 496 sslverify.Certificate.hostFromTransport, | |
| 497 _NotSSLTransport()) | |
| 498 self.failUnless(str(x).startswith("non-TLS")) | |
| 499 | |
| 500 def test_hostFromBlankSSLTransport(self): | |
| 501 """ | |
| 502 Verify that hostFromTransport raises an exception if the transport | |
| 503 passed is an SSL transport, but doesn't have a host certificate. | |
| 504 """ | |
| 505 x = self.assertRaises(CertificateError, | |
| 506 sslverify.Certificate.hostFromTransport, | |
| 507 _MaybeSSLTransport()) | |
| 508 self.failUnless(str(x).startswith("TLS")) | |
| 509 | |
| 510 | |
| 511 def test_hostFromSSLTransport(self): | |
| 512 """ | |
| 513 Verify that hostFromTransport successfully creates the correct | |
| 514 certificate if passed a valid SSL transport. | |
| 515 """ | |
| 516 self.assertEqual( | |
| 517 sslverify.Certificate.hostFromTransport( | |
| 518 _ActualSSLTransport()).serialNumber(), | |
| 519 12345) | |
| 520 | |
| 521 def test_peerFromSSLTransport(self): | |
| 522 """ | |
| 523 Verify that peerFromTransport successfully creates the correct | |
| 524 certificate if passed a valid SSL transport. | |
| 525 """ | |
| 526 self.assertEqual( | |
| 527 sslverify.Certificate.peerFromTransport( | |
| 528 _ActualSSLTransport()).serialNumber(), | |
| 529 12346) | |
| 530 | |
| 531 | |
| 532 | |
| 533 if interfaces.IReactorSSL(reactor, None) is None: | |
| 534 Constructors.skip = "Reactor does not support SSL, cannot run SSL tests" | |
| OLD | NEW |