Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(373)

Side by Side Diff: third_party/twisted_8_1/twisted/test/test_sslverify.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright 2005 Divmod, Inc. See LICENSE file for details
2 # Copyright (c) 2007 Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 import itertools
6
7 try:
8 from OpenSSL import SSL
9 from OpenSSL.crypto import PKey, X509, X509Req
10 from OpenSSL.crypto import TYPE_RSA
11 from twisted.internet import _sslverify as sslverify
12 except ImportError:
13 pass
14
15 from twisted.trial import unittest
16 from twisted.internet import protocol, defer, reactor
17 from twisted.python.reflect import objgrep, isSame
18 from twisted.python import log
19
20 from twisted.internet.error import CertificateError, ConnectionLost
21 from twisted.internet import interfaces
22
23
24 # A couple of static PEM-format certificates to be used by various tests.
25 A_HOST_CERTIFICATE_PEM = """
26 -----BEGIN CERTIFICATE-----
27 MIIC2jCCAkMCAjA5MA0GCSqGSIb3DQEBBAUAMIG0MQswCQYDVQQGEwJVUzEiMCAG
28 A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u
29 MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo
30 dXNldHRzMScwJQYJKoZIhvcNAQkBFhhub2JvZHlAdHdpc3RlZG1hdHJpeC5jb20x
31 ETAPBgNVBAsTCFNlY3VyaXR5MB4XDTA2MDgxNjAxMDEwOFoXDTA3MDgxNjAxMDEw
32 OFowgbQxCzAJBgNVBAYTAlVTMSIwIAYDVQQDExlleGFtcGxlLnR3aXN0ZWRtYXRy
33 aXguY29tMQ8wDQYDVQQHEwZCb3N0b24xHDAaBgNVBAoTE1R3aXN0ZWQgTWF0cml4
34 IExhYnMxFjAUBgNVBAgTDU1hc3NhY2h1c2V0dHMxJzAlBgkqhkiG9w0BCQEWGG5v
35 Ym9keUB0d2lzdGVkbWF0cml4LmNvbTERMA8GA1UECxMIU2VjdXJpdHkwgZ8wDQYJ
36 KoZIhvcNAQEBBQADgY0AMIGJAoGBAMzH8CDF/U91y/bdbdbJKnLgnyvQ9Ig9ZNZp
37 8hpsu4huil60zF03+Lexg2l1FIfURScjBuaJMR6HiMYTMjhzLuByRZ17KW4wYkGi
38 KXstz03VIKy4Tjc+v4aXFI4XdRw10gGMGQlGGscXF/RSoN84VoDKBfOMWdXeConJ
39 VyC4w3iJAgMBAAEwDQYJKoZIhvcNAQEEBQADgYEAviMT4lBoxOgQy32LIgZ4lVCj
40 JNOiZYg8GMQ6y0ugp86X80UjOvkGtNf/R7YgED/giKRN/q/XJiLJDEhzknkocwmO
41 S+4b2XpiaZYxRyKWwL221O7CGmtWYyZl2+92YYmmCiNzWQPfP6BOMlfax0AGLHls
42 fXzCWdG0O/3Lk2SRM0I=
43 -----END CERTIFICATE-----
44 """
45
46 A_PEER_CERTIFICATE_PEM = """
47 -----BEGIN CERTIFICATE-----
48 MIIC3jCCAkcCAjA6MA0GCSqGSIb3DQEBBAUAMIG2MQswCQYDVQQGEwJVUzEiMCAG
49 A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u
50 MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo
51 dXNldHRzMSkwJwYJKoZIhvcNAQkBFhpzb21lYm9keUB0d2lzdGVkbWF0cml4LmNv
52 bTERMA8GA1UECxMIU2VjdXJpdHkwHhcNMDYwODE2MDEwMTU2WhcNMDcwODE2MDEw
53 MTU2WjCBtjELMAkGA1UEBhMCVVMxIjAgBgNVBAMTGWV4YW1wbGUudHdpc3RlZG1h
54 dHJpeC5jb20xDzANBgNVBAcTBkJvc3RvbjEcMBoGA1UEChMTVHdpc3RlZCBNYXRy
55 aXggTGFiczEWMBQGA1UECBMNTWFzc2FjaHVzZXR0czEpMCcGCSqGSIb3DQEJARYa
56 c29tZWJvZHlAdHdpc3RlZG1hdHJpeC5jb20xETAPBgNVBAsTCFNlY3VyaXR5MIGf
57 MA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCnm+WBlgFNbMlHehib9ePGGDXF+Nz4
58 CjGuUmVBaXCRCiVjg3kSDecwqfb0fqTksBZ+oQ1UBjMcSh7OcvFXJZnUesBikGWE
59 JE4V8Bjh+RmbJ1ZAlUPZ40bAkww0OpyIRAGMvKG+4yLFTO4WDxKmfDcrOb6ID8WJ
60 e1u+i3XGkIf/5QIDAQABMA0GCSqGSIb3DQEBBAUAA4GBAD4Oukm3YYkhedUepBEA
61 vvXIQhVDqL7mk6OqYdXmNj6R7ZMC8WWvGZxrzDI1bZuB+4aIxxd1FXC3UOHiR/xg
62 i9cDl1y8P/qRp4aEBNF6rI0D4AxTbfnHQx4ERDAOShJdYZs/2zifPJ6va6YvrEyr
63 yqDtGhklsWW3ZwBzEh5VEOUp
64 -----END CERTIFICATE-----
65 """
66
67
68
69 counter = itertools.count().next
70 def makeCertificate(**kw):
71 keypair = PKey()
72 keypair.generate_key(TYPE_RSA, 512)
73
74 certificate = X509()
75 certificate.gmtime_adj_notBefore(0)
76 certificate.gmtime_adj_notAfter(60 * 60 * 24 * 365) # One year
77 for xname in certificate.get_issuer(), certificate.get_subject():
78 for (k, v) in kw.items():
79 setattr(xname, k, v)
80
81 certificate.set_serial_number(counter())
82 certificate.set_pubkey(keypair)
83 certificate.sign(keypair, "md5")
84
85 return keypair, certificate
86
87
88
89 class DataCallbackProtocol(protocol.Protocol):
90 def dataReceived(self, data):
91 d, self.factory.onData = self.factory.onData, None
92 if d is not None:
93 d.callback(data)
94
95 def connectionLost(self, reason):
96 d, self.factory.onLost = self.factory.onLost, None
97 if d is not None:
98 d.errback(reason)
99
100 class WritingProtocol(protocol.Protocol):
101 byte = 'x'
102 def connectionMade(self):
103 self.transport.write(self.byte)
104
105 def connectionLost(self, reason):
106 self.factory.onLost.errback(reason)
107
108
109 class OpenSSLOptions(unittest.TestCase):
110 serverPort = clientConn = None
111 onServerLost = onClientLost = None
112
113 sKey = None
114 sCert = None
115 cKey = None
116 cCert = None
117
118 def setUp(self):
119 """
120 Create class variables of client and server certificates.
121 """
122 self.sKey, self.sCert = makeCertificate(
123 O="Server Test Certificate",
124 CN="server")
125 self.cKey, self.cCert = makeCertificate(
126 O="Client Test Certificate",
127 CN="client")
128
129 def tearDown(self):
130 if self.serverPort is not None:
131 self.serverPort.stopListening()
132 if self.clientConn is not None:
133 self.clientConn.disconnect()
134
135 L = []
136 if self.onServerLost is not None:
137 L.append(self.onServerLost)
138 if self.onClientLost is not None:
139 L.append(self.onClientLost)
140
141 return defer.DeferredList(L, consumeErrors=True)
142
143 def loopback(self, serverCertOpts, clientCertOpts,
144 onServerLost=None, onClientLost=None, onData=None):
145 if onServerLost is None:
146 self.onServerLost = onServerLost = defer.Deferred()
147 if onClientLost is None:
148 self.onClientLost = onClientLost = defer.Deferred()
149 if onData is None:
150 onData = defer.Deferred()
151
152 serverFactory = protocol.ServerFactory()
153 serverFactory.protocol = DataCallbackProtocol
154 serverFactory.onLost = onServerLost
155 serverFactory.onData = onData
156
157 clientFactory = protocol.ClientFactory()
158 clientFactory.protocol = WritingProtocol
159 clientFactory.onLost = onClientLost
160
161 self.serverPort = reactor.listenSSL(0, serverFactory, serverCertOpts)
162 self.clientConn = reactor.connectSSL('127.0.0.1',
163 self.serverPort.getHost().port, clientFactory, clientCertOpts)
164
165 def test_abbreviatingDistinguishedNames(self):
166 """
167 Check that abbreviations used in certificates correctly map to
168 complete names.
169 """
170 self.assertEquals(
171 sslverify.DN(CN='a', OU='hello'),
172 sslverify.DistinguishedName(commonName='a',
173 organizationalUnitName='hello'))
174 self.assertNotEquals(
175 sslverify.DN(CN='a', OU='hello'),
176 sslverify.DN(CN='a', OU='hello', emailAddress='xxx'))
177 dn = sslverify.DN(CN='abcdefg')
178 self.assertRaises(AttributeError, setattr, dn, 'Cn', 'x')
179 self.assertEquals(dn.CN, dn.commonName)
180 dn.CN = 'bcdefga'
181 self.assertEquals(dn.CN, dn.commonName)
182
183
184 def testInspectDistinguishedName(self):
185 n = sslverify.DN(commonName='common name',
186 organizationName='organization name',
187 organizationalUnitName='organizational unit name',
188 localityName='locality name',
189 stateOrProvinceName='state or province name',
190 countryName='country name',
191 emailAddress='email address')
192 s = n.inspect()
193 for k in [
194 'common name',
195 'organization name',
196 'organizational unit name',
197 'locality name',
198 'state or province name',
199 'country name',
200 'email address']:
201 self.assertIn(k, s, "%r was not in inspect output." % (k,))
202 self.assertIn(k.title(), s, "%r was not in inspect output." % (k,))
203
204
205 def testInspectDistinguishedNameWithoutAllFields(self):
206 n = sslverify.DN(localityName='locality name')
207 s = n.inspect()
208 for k in [
209 'common name',
210 'organization name',
211 'organizational unit name',
212 'state or province name',
213 'country name',
214 'email address']:
215 self.assertNotIn(k, s, "%r was in inspect output." % (k,))
216 self.assertNotIn(k.title(), s, "%r was in inspect output." % (k,))
217 self.assertIn('locality name', s)
218 self.assertIn('Locality Name', s)
219
220
221 def test_inspectCertificate(self):
222 """
223 Test that the C{inspect} method of L{sslverify.Certificate} returns
224 a human-readable string containing some basic information about the
225 certificate.
226 """
227 c = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM)
228 self.assertEqual(
229 c.inspect().split('\n'),
230 ["Certificate For Subject:",
231 " Organizational Unit Name: Security",
232 " Organization Name: Twisted Matrix Labs",
233 " Common Name: example.twistedmatrix.com",
234 " State Or Province Name: Massachusetts",
235 " Country Name: US",
236 " Email Address: nobody@twistedmatrix.com",
237 " Locality Name: Boston",
238 "",
239 "Issuer:",
240 " Organizational Unit Name: Security",
241 " Organization Name: Twisted Matrix Labs",
242 " Common Name: example.twistedmatrix.com",
243 " State Or Province Name: Massachusetts",
244 " Country Name: US",
245 " Email Address: nobody@twistedmatrix.com",
246 " Locality Name: Boston",
247 "",
248 "Serial Number: 12345",
249 "Digest: C4:96:11:00:30:C3:EC:EE:A3:55:AA:ED:8C:84:85:18",
250 "Public Key with Hash: ff33994c80812aa95a79cdb85362d054"])
251
252
253 def test_certificateOptionsSerialization(self):
254 """
255 Test that __setstate__(__getstate__()) round-trips properly.
256 """
257 firstOpts = sslverify.OpenSSLCertificateOptions(
258 privateKey=self.sKey,
259 certificate=self.sCert,
260 method=SSL.SSLv3_METHOD,
261 verify=True,
262 caCerts=[self.sCert],
263 verifyDepth=2,
264 requireCertificate=False,
265 verifyOnce=False,
266 enableSingleUseKeys=False,
267 enableSessions=False,
268 fixBrokenPeers=True)
269 context = firstOpts.getContext()
270 state = firstOpts.__getstate__()
271
272 # The context shouldn't be in the state to serialize
273 self.failIf(objgrep(state, context, isSame),
274 objgrep(state, context, isSame))
275
276 opts = sslverify.OpenSSLCertificateOptions()
277 opts.__setstate__(state)
278 self.assertEqual(opts.privateKey, self.sKey)
279 self.assertEqual(opts.certificate, self.sCert)
280 self.assertEqual(opts.method, SSL.SSLv3_METHOD)
281 self.assertEqual(opts.verify, True)
282 self.assertEqual(opts.caCerts, [self.sCert])
283 self.assertEqual(opts.verifyDepth, 2)
284 self.assertEqual(opts.requireCertificate, False)
285 self.assertEqual(opts.verifyOnce, False)
286 self.assertEqual(opts.enableSingleUseKeys, False)
287 self.assertEqual(opts.enableSessions, False)
288 self.assertEqual(opts.fixBrokenPeers, True)
289
290
291 def test_allowedAnonymousClientConnection(self):
292 """
293 Check that anonymous connections are allowed when certificates aren't
294 required on the server.
295 """
296 onData = defer.Deferred()
297 self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
298 certificate=self.sCert, requireCertificate=False),
299 sslverify.OpenSSLCertificateOptions(
300 requireCertificate=False),
301 onData=onData)
302
303 return onData.addCallback(
304 lambda result: self.assertEquals(result, WritingProtocol.byte))
305
306 def test_refusedAnonymousClientConnection(self):
307 """
308 Check that anonymous connections are refused when certificates are
309 required on the server.
310 """
311 onServerLost = defer.Deferred()
312 onClientLost = defer.Deferred()
313 self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
314 certificate=self.sCert, verify=True,
315 caCerts=[self.sCert], requireCertificate=True),
316 sslverify.OpenSSLCertificateOptions(
317 requireCertificate=False),
318 onServerLost=onServerLost,
319 onClientLost=onClientLost)
320
321 d = defer.DeferredList([onClientLost, onServerLost],
322 consumeErrors=True)
323
324
325 def afterLost(((cSuccess, cResult), (sSuccess, sResult))):
326
327 self.failIf(cSuccess)
328 self.failIf(sSuccess)
329 # Win32 fails to report the SSL Error, and report a connection lost
330 # instead: there is a race condition so that's not totally
331 # surprising (see ticket #2877 in the tracker)
332 cResult.trap(SSL.Error, ConnectionLost)
333 sResult.trap(SSL.Error)
334
335 return d.addCallback(afterLost)
336
337 def test_failedCertificateVerification(self):
338 """
339 Check that connecting with a certificate not accepted by the server CA
340 fails.
341 """
342 onServerLost = defer.Deferred()
343 onClientLost = defer.Deferred()
344 self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
345 certificate=self.sCert, verify=False,
346 requireCertificate=False),
347 sslverify.OpenSSLCertificateOptions(verify=True,
348 requireCertificate=False, caCerts=[self.cCert]),
349 onServerLost=onServerLost,
350 onClientLost=onClientLost)
351
352 d = defer.DeferredList([onClientLost, onServerLost],
353 consumeErrors=True)
354 def afterLost(((cSuccess, cResult), (sSuccess, sResult))):
355
356 self.failIf(cSuccess)
357 self.failIf(sSuccess)
358
359 return d.addCallback(afterLost)
360
361 def test_successfulCertificateVerification(self):
362 """
363 Test a successful connection with client certificate validation on
364 server side.
365 """
366 onData = defer.Deferred()
367 self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
368 certificate=self.sCert, verify=False,
369 requireCertificate=False),
370 sslverify.OpenSSLCertificateOptions(verify=True,
371 requireCertificate=True, caCerts=[self.sCert]),
372 onData=onData)
373
374 return onData.addCallback(
375 lambda result: self.assertEquals(result, WritingProtocol.byte))
376
377 def test_successfulSymmetricSelfSignedCertificateVerification(self):
378 """
379 Test a successful connection with validation on both server and client
380 sides.
381 """
382 onData = defer.Deferred()
383 self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
384 certificate=self.sCert, verify=True,
385 requireCertificate=True, caCerts=[self.cCert]),
386 sslverify.OpenSSLCertificateOptions(privateKey=self.cKey,
387 certificate=self.cCert, verify=True,
388 requireCertificate=True, caCerts=[self.sCert]),
389 onData=onData)
390
391 return onData.addCallback(
392 lambda result: self.assertEquals(result, WritingProtocol.byte))
393
394 def test_verification(self):
395 """
396 Check certificates verification building custom certificates data.
397 """
398 clientDN = sslverify.DistinguishedName(commonName='client')
399 clientKey = sslverify.KeyPair.generate()
400 clientCertReq = clientKey.certificateRequest(clientDN)
401
402 serverDN = sslverify.DistinguishedName(commonName='server')
403 serverKey = sslverify.KeyPair.generate()
404 serverCertReq = serverKey.certificateRequest(serverDN)
405
406 clientSelfCertReq = clientKey.certificateRequest(clientDN)
407 clientSelfCertData = clientKey.signCertificateRequest(
408 clientDN, clientSelfCertReq, lambda dn: True, 132)
409 clientSelfCert = clientKey.newCertificate(clientSelfCertData)
410
411 serverSelfCertReq = serverKey.certificateRequest(serverDN)
412 serverSelfCertData = serverKey.signCertificateRequest(
413 serverDN, serverSelfCertReq, lambda dn: True, 516)
414 serverSelfCert = serverKey.newCertificate(serverSelfCertData)
415
416 clientCertData = serverKey.signCertificateRequest(
417 serverDN, clientCertReq, lambda dn: True, 7)
418 clientCert = clientKey.newCertificate(clientCertData)
419
420 serverCertData = clientKey.signCertificateRequest(
421 clientDN, serverCertReq, lambda dn: True, 42)
422 serverCert = serverKey.newCertificate(serverCertData)
423
424 onData = defer.Deferred()
425
426 serverOpts = serverCert.options(serverSelfCert)
427 clientOpts = clientCert.options(clientSelfCert)
428
429 self.loopback(serverOpts,
430 clientOpts,
431 onData=onData)
432
433 return onData.addCallback(
434 lambda result: self.assertEquals(result, WritingProtocol.byte))
435
436
437
438 if interfaces.IReactorSSL(reactor, None) is None:
439 OpenSSLOptions.skip = "Reactor does not support SSL, cannot run SSL tests"
440
441
442
443 class _NotSSLTransport:
444 def getHandle(self):
445 return self
446
447 class _MaybeSSLTransport:
448 def getHandle(self):
449 return self
450
451 def get_peer_certificate(self):
452 return None
453
454 def get_host_certificate(self):
455 return None
456
457
458 class _ActualSSLTransport:
459 def getHandle(self):
460 return self
461
462 def get_host_certificate(self):
463 return sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM).original
464
465 def get_peer_certificate(self):
466 return sslverify.Certificate.loadPEM(A_PEER_CERTIFICATE_PEM).original
467
468
469 class Constructors(unittest.TestCase):
470 def test_peerFromNonSSLTransport(self):
471 """
472 Verify that peerFromTransport raises an exception if the transport
473 passed is not actually an SSL transport.
474 """
475 x = self.assertRaises(CertificateError,
476 sslverify.Certificate.peerFromTransport,
477 _NotSSLTransport())
478 self.failUnless(str(x).startswith("non-TLS"))
479
480 def test_peerFromBlankSSLTransport(self):
481 """
482 Verify that peerFromTransport raises an exception if the transport
483 passed is an SSL transport, but doesn't have a peer certificate.
484 """
485 x = self.assertRaises(CertificateError,
486 sslverify.Certificate.peerFromTransport,
487 _MaybeSSLTransport())
488 self.failUnless(str(x).startswith("TLS"))
489
490 def test_hostFromNonSSLTransport(self):
491 """
492 Verify that hostFromTransport raises an exception if the transport
493 passed is not actually an SSL transport.
494 """
495 x = self.assertRaises(CertificateError,
496 sslverify.Certificate.hostFromTransport,
497 _NotSSLTransport())
498 self.failUnless(str(x).startswith("non-TLS"))
499
500 def test_hostFromBlankSSLTransport(self):
501 """
502 Verify that hostFromTransport raises an exception if the transport
503 passed is an SSL transport, but doesn't have a host certificate.
504 """
505 x = self.assertRaises(CertificateError,
506 sslverify.Certificate.hostFromTransport,
507 _MaybeSSLTransport())
508 self.failUnless(str(x).startswith("TLS"))
509
510
511 def test_hostFromSSLTransport(self):
512 """
513 Verify that hostFromTransport successfully creates the correct
514 certificate if passed a valid SSL transport.
515 """
516 self.assertEqual(
517 sslverify.Certificate.hostFromTransport(
518 _ActualSSLTransport()).serialNumber(),
519 12345)
520
521 def test_peerFromSSLTransport(self):
522 """
523 Verify that peerFromTransport successfully creates the correct
524 certificate if passed a valid SSL transport.
525 """
526 self.assertEqual(
527 sslverify.Certificate.peerFromTransport(
528 _ActualSSLTransport()).serialNumber(),
529 12346)
530
531
532
533 if interfaces.IReactorSSL(reactor, None) is None:
534 Constructors.skip = "Reactor does not support SSL, cannot run SSL tests"
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/test/test_ssl.py ('k') | third_party/twisted_8_1/twisted/test/test_stateful.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698