Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(239)

Unified Diff: third_party/twisted_8_1/twisted/test/test_sslverify.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/twisted_8_1/twisted/test/test_sslverify.py
diff --git a/third_party/twisted_8_1/twisted/test/test_sslverify.py b/third_party/twisted_8_1/twisted/test/test_sslverify.py
deleted file mode 100644
index ad05b9fea139e6559fa2730117e66a209bdea036..0000000000000000000000000000000000000000
--- a/third_party/twisted_8_1/twisted/test/test_sslverify.py
+++ /dev/null
@@ -1,534 +0,0 @@
-# Copyright 2005 Divmod, Inc. See LICENSE file for details
-# Copyright (c) 2007 Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-import itertools
-
-try:
- from OpenSSL import SSL
- from OpenSSL.crypto import PKey, X509, X509Req
- from OpenSSL.crypto import TYPE_RSA
- from twisted.internet import _sslverify as sslverify
-except ImportError:
- pass
-
-from twisted.trial import unittest
-from twisted.internet import protocol, defer, reactor
-from twisted.python.reflect import objgrep, isSame
-from twisted.python import log
-
-from twisted.internet.error import CertificateError, ConnectionLost
-from twisted.internet import interfaces
-
-
-# A couple of static PEM-format certificates to be used by various tests.
-A_HOST_CERTIFICATE_PEM = """
------BEGIN CERTIFICATE-----
- MIIC2jCCAkMCAjA5MA0GCSqGSIb3DQEBBAUAMIG0MQswCQYDVQQGEwJVUzEiMCAG
- A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u
- MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo
- dXNldHRzMScwJQYJKoZIhvcNAQkBFhhub2JvZHlAdHdpc3RlZG1hdHJpeC5jb20x
- ETAPBgNVBAsTCFNlY3VyaXR5MB4XDTA2MDgxNjAxMDEwOFoXDTA3MDgxNjAxMDEw
- OFowgbQxCzAJBgNVBAYTAlVTMSIwIAYDVQQDExlleGFtcGxlLnR3aXN0ZWRtYXRy
- aXguY29tMQ8wDQYDVQQHEwZCb3N0b24xHDAaBgNVBAoTE1R3aXN0ZWQgTWF0cml4
- IExhYnMxFjAUBgNVBAgTDU1hc3NhY2h1c2V0dHMxJzAlBgkqhkiG9w0BCQEWGG5v
- Ym9keUB0d2lzdGVkbWF0cml4LmNvbTERMA8GA1UECxMIU2VjdXJpdHkwgZ8wDQYJ
- KoZIhvcNAQEBBQADgY0AMIGJAoGBAMzH8CDF/U91y/bdbdbJKnLgnyvQ9Ig9ZNZp
- 8hpsu4huil60zF03+Lexg2l1FIfURScjBuaJMR6HiMYTMjhzLuByRZ17KW4wYkGi
- KXstz03VIKy4Tjc+v4aXFI4XdRw10gGMGQlGGscXF/RSoN84VoDKBfOMWdXeConJ
- VyC4w3iJAgMBAAEwDQYJKoZIhvcNAQEEBQADgYEAviMT4lBoxOgQy32LIgZ4lVCj
- JNOiZYg8GMQ6y0ugp86X80UjOvkGtNf/R7YgED/giKRN/q/XJiLJDEhzknkocwmO
- S+4b2XpiaZYxRyKWwL221O7CGmtWYyZl2+92YYmmCiNzWQPfP6BOMlfax0AGLHls
- fXzCWdG0O/3Lk2SRM0I=
------END CERTIFICATE-----
-"""
-
-A_PEER_CERTIFICATE_PEM = """
------BEGIN CERTIFICATE-----
- MIIC3jCCAkcCAjA6MA0GCSqGSIb3DQEBBAUAMIG2MQswCQYDVQQGEwJVUzEiMCAG
- A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u
- MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo
- dXNldHRzMSkwJwYJKoZIhvcNAQkBFhpzb21lYm9keUB0d2lzdGVkbWF0cml4LmNv
- bTERMA8GA1UECxMIU2VjdXJpdHkwHhcNMDYwODE2MDEwMTU2WhcNMDcwODE2MDEw
- MTU2WjCBtjELMAkGA1UEBhMCVVMxIjAgBgNVBAMTGWV4YW1wbGUudHdpc3RlZG1h
- dHJpeC5jb20xDzANBgNVBAcTBkJvc3RvbjEcMBoGA1UEChMTVHdpc3RlZCBNYXRy
- aXggTGFiczEWMBQGA1UECBMNTWFzc2FjaHVzZXR0czEpMCcGCSqGSIb3DQEJARYa
- c29tZWJvZHlAdHdpc3RlZG1hdHJpeC5jb20xETAPBgNVBAsTCFNlY3VyaXR5MIGf
- MA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCnm+WBlgFNbMlHehib9ePGGDXF+Nz4
- CjGuUmVBaXCRCiVjg3kSDecwqfb0fqTksBZ+oQ1UBjMcSh7OcvFXJZnUesBikGWE
- JE4V8Bjh+RmbJ1ZAlUPZ40bAkww0OpyIRAGMvKG+4yLFTO4WDxKmfDcrOb6ID8WJ
- e1u+i3XGkIf/5QIDAQABMA0GCSqGSIb3DQEBBAUAA4GBAD4Oukm3YYkhedUepBEA
- vvXIQhVDqL7mk6OqYdXmNj6R7ZMC8WWvGZxrzDI1bZuB+4aIxxd1FXC3UOHiR/xg
- i9cDl1y8P/qRp4aEBNF6rI0D4AxTbfnHQx4ERDAOShJdYZs/2zifPJ6va6YvrEyr
- yqDtGhklsWW3ZwBzEh5VEOUp
------END CERTIFICATE-----
-"""
-
-
-
-counter = itertools.count().next
-def makeCertificate(**kw):
- keypair = PKey()
- keypair.generate_key(TYPE_RSA, 512)
-
- certificate = X509()
- certificate.gmtime_adj_notBefore(0)
- certificate.gmtime_adj_notAfter(60 * 60 * 24 * 365) # One year
- for xname in certificate.get_issuer(), certificate.get_subject():
- for (k, v) in kw.items():
- setattr(xname, k, v)
-
- certificate.set_serial_number(counter())
- certificate.set_pubkey(keypair)
- certificate.sign(keypair, "md5")
-
- return keypair, certificate
-
-
-
-class DataCallbackProtocol(protocol.Protocol):
- def dataReceived(self, data):
- d, self.factory.onData = self.factory.onData, None
- if d is not None:
- d.callback(data)
-
- def connectionLost(self, reason):
- d, self.factory.onLost = self.factory.onLost, None
- if d is not None:
- d.errback(reason)
-
-class WritingProtocol(protocol.Protocol):
- byte = 'x'
- def connectionMade(self):
- self.transport.write(self.byte)
-
- def connectionLost(self, reason):
- self.factory.onLost.errback(reason)
-
-
-class OpenSSLOptions(unittest.TestCase):
- serverPort = clientConn = None
- onServerLost = onClientLost = None
-
- sKey = None
- sCert = None
- cKey = None
- cCert = None
-
- def setUp(self):
- """
- Create class variables of client and server certificates.
- """
- self.sKey, self.sCert = makeCertificate(
- O="Server Test Certificate",
- CN="server")
- self.cKey, self.cCert = makeCertificate(
- O="Client Test Certificate",
- CN="client")
-
- def tearDown(self):
- if self.serverPort is not None:
- self.serverPort.stopListening()
- if self.clientConn is not None:
- self.clientConn.disconnect()
-
- L = []
- if self.onServerLost is not None:
- L.append(self.onServerLost)
- if self.onClientLost is not None:
- L.append(self.onClientLost)
-
- return defer.DeferredList(L, consumeErrors=True)
-
- def loopback(self, serverCertOpts, clientCertOpts,
- onServerLost=None, onClientLost=None, onData=None):
- if onServerLost is None:
- self.onServerLost = onServerLost = defer.Deferred()
- if onClientLost is None:
- self.onClientLost = onClientLost = defer.Deferred()
- if onData is None:
- onData = defer.Deferred()
-
- serverFactory = protocol.ServerFactory()
- serverFactory.protocol = DataCallbackProtocol
- serverFactory.onLost = onServerLost
- serverFactory.onData = onData
-
- clientFactory = protocol.ClientFactory()
- clientFactory.protocol = WritingProtocol
- clientFactory.onLost = onClientLost
-
- self.serverPort = reactor.listenSSL(0, serverFactory, serverCertOpts)
- self.clientConn = reactor.connectSSL('127.0.0.1',
- self.serverPort.getHost().port, clientFactory, clientCertOpts)
-
- def test_abbreviatingDistinguishedNames(self):
- """
- Check that abbreviations used in certificates correctly map to
- complete names.
- """
- self.assertEquals(
- sslverify.DN(CN='a', OU='hello'),
- sslverify.DistinguishedName(commonName='a',
- organizationalUnitName='hello'))
- self.assertNotEquals(
- sslverify.DN(CN='a', OU='hello'),
- sslverify.DN(CN='a', OU='hello', emailAddress='xxx'))
- dn = sslverify.DN(CN='abcdefg')
- self.assertRaises(AttributeError, setattr, dn, 'Cn', 'x')
- self.assertEquals(dn.CN, dn.commonName)
- dn.CN = 'bcdefga'
- self.assertEquals(dn.CN, dn.commonName)
-
-
- def testInspectDistinguishedName(self):
- n = sslverify.DN(commonName='common name',
- organizationName='organization name',
- organizationalUnitName='organizational unit name',
- localityName='locality name',
- stateOrProvinceName='state or province name',
- countryName='country name',
- emailAddress='email address')
- s = n.inspect()
- for k in [
- 'common name',
- 'organization name',
- 'organizational unit name',
- 'locality name',
- 'state or province name',
- 'country name',
- 'email address']:
- self.assertIn(k, s, "%r was not in inspect output." % (k,))
- self.assertIn(k.title(), s, "%r was not in inspect output." % (k,))
-
-
- def testInspectDistinguishedNameWithoutAllFields(self):
- n = sslverify.DN(localityName='locality name')
- s = n.inspect()
- for k in [
- 'common name',
- 'organization name',
- 'organizational unit name',
- 'state or province name',
- 'country name',
- 'email address']:
- self.assertNotIn(k, s, "%r was in inspect output." % (k,))
- self.assertNotIn(k.title(), s, "%r was in inspect output." % (k,))
- self.assertIn('locality name', s)
- self.assertIn('Locality Name', s)
-
-
- def test_inspectCertificate(self):
- """
- Test that the C{inspect} method of L{sslverify.Certificate} returns
- a human-readable string containing some basic information about the
- certificate.
- """
- c = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM)
- self.assertEqual(
- c.inspect().split('\n'),
- ["Certificate For Subject:",
- " Organizational Unit Name: Security",
- " Organization Name: Twisted Matrix Labs",
- " Common Name: example.twistedmatrix.com",
- " State Or Province Name: Massachusetts",
- " Country Name: US",
- " Email Address: nobody@twistedmatrix.com",
- " Locality Name: Boston",
- "",
- "Issuer:",
- " Organizational Unit Name: Security",
- " Organization Name: Twisted Matrix Labs",
- " Common Name: example.twistedmatrix.com",
- " State Or Province Name: Massachusetts",
- " Country Name: US",
- " Email Address: nobody@twistedmatrix.com",
- " Locality Name: Boston",
- "",
- "Serial Number: 12345",
- "Digest: C4:96:11:00:30:C3:EC:EE:A3:55:AA:ED:8C:84:85:18",
- "Public Key with Hash: ff33994c80812aa95a79cdb85362d054"])
-
-
- def test_certificateOptionsSerialization(self):
- """
- Test that __setstate__(__getstate__()) round-trips properly.
- """
- firstOpts = sslverify.OpenSSLCertificateOptions(
- privateKey=self.sKey,
- certificate=self.sCert,
- method=SSL.SSLv3_METHOD,
- verify=True,
- caCerts=[self.sCert],
- verifyDepth=2,
- requireCertificate=False,
- verifyOnce=False,
- enableSingleUseKeys=False,
- enableSessions=False,
- fixBrokenPeers=True)
- context = firstOpts.getContext()
- state = firstOpts.__getstate__()
-
- # The context shouldn't be in the state to serialize
- self.failIf(objgrep(state, context, isSame),
- objgrep(state, context, isSame))
-
- opts = sslverify.OpenSSLCertificateOptions()
- opts.__setstate__(state)
- self.assertEqual(opts.privateKey, self.sKey)
- self.assertEqual(opts.certificate, self.sCert)
- self.assertEqual(opts.method, SSL.SSLv3_METHOD)
- self.assertEqual(opts.verify, True)
- self.assertEqual(opts.caCerts, [self.sCert])
- self.assertEqual(opts.verifyDepth, 2)
- self.assertEqual(opts.requireCertificate, False)
- self.assertEqual(opts.verifyOnce, False)
- self.assertEqual(opts.enableSingleUseKeys, False)
- self.assertEqual(opts.enableSessions, False)
- self.assertEqual(opts.fixBrokenPeers, True)
-
-
- def test_allowedAnonymousClientConnection(self):
- """
- Check that anonymous connections are allowed when certificates aren't
- required on the server.
- """
- onData = defer.Deferred()
- self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
- certificate=self.sCert, requireCertificate=False),
- sslverify.OpenSSLCertificateOptions(
- requireCertificate=False),
- onData=onData)
-
- return onData.addCallback(
- lambda result: self.assertEquals(result, WritingProtocol.byte))
-
- def test_refusedAnonymousClientConnection(self):
- """
- Check that anonymous connections are refused when certificates are
- required on the server.
- """
- onServerLost = defer.Deferred()
- onClientLost = defer.Deferred()
- self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
- certificate=self.sCert, verify=True,
- caCerts=[self.sCert], requireCertificate=True),
- sslverify.OpenSSLCertificateOptions(
- requireCertificate=False),
- onServerLost=onServerLost,
- onClientLost=onClientLost)
-
- d = defer.DeferredList([onClientLost, onServerLost],
- consumeErrors=True)
-
-
- def afterLost(((cSuccess, cResult), (sSuccess, sResult))):
-
- self.failIf(cSuccess)
- self.failIf(sSuccess)
- # Win32 fails to report the SSL Error, and report a connection lost
- # instead: there is a race condition so that's not totally
- # surprising (see ticket #2877 in the tracker)
- cResult.trap(SSL.Error, ConnectionLost)
- sResult.trap(SSL.Error)
-
- return d.addCallback(afterLost)
-
- def test_failedCertificateVerification(self):
- """
- Check that connecting with a certificate not accepted by the server CA
- fails.
- """
- onServerLost = defer.Deferred()
- onClientLost = defer.Deferred()
- self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
- certificate=self.sCert, verify=False,
- requireCertificate=False),
- sslverify.OpenSSLCertificateOptions(verify=True,
- requireCertificate=False, caCerts=[self.cCert]),
- onServerLost=onServerLost,
- onClientLost=onClientLost)
-
- d = defer.DeferredList([onClientLost, onServerLost],
- consumeErrors=True)
- def afterLost(((cSuccess, cResult), (sSuccess, sResult))):
-
- self.failIf(cSuccess)
- self.failIf(sSuccess)
-
- return d.addCallback(afterLost)
-
- def test_successfulCertificateVerification(self):
- """
- Test a successful connection with client certificate validation on
- server side.
- """
- onData = defer.Deferred()
- self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
- certificate=self.sCert, verify=False,
- requireCertificate=False),
- sslverify.OpenSSLCertificateOptions(verify=True,
- requireCertificate=True, caCerts=[self.sCert]),
- onData=onData)
-
- return onData.addCallback(
- lambda result: self.assertEquals(result, WritingProtocol.byte))
-
- def test_successfulSymmetricSelfSignedCertificateVerification(self):
- """
- Test a successful connection with validation on both server and client
- sides.
- """
- onData = defer.Deferred()
- self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
- certificate=self.sCert, verify=True,
- requireCertificate=True, caCerts=[self.cCert]),
- sslverify.OpenSSLCertificateOptions(privateKey=self.cKey,
- certificate=self.cCert, verify=True,
- requireCertificate=True, caCerts=[self.sCert]),
- onData=onData)
-
- return onData.addCallback(
- lambda result: self.assertEquals(result, WritingProtocol.byte))
-
- def test_verification(self):
- """
- Check certificates verification building custom certificates data.
- """
- clientDN = sslverify.DistinguishedName(commonName='client')
- clientKey = sslverify.KeyPair.generate()
- clientCertReq = clientKey.certificateRequest(clientDN)
-
- serverDN = sslverify.DistinguishedName(commonName='server')
- serverKey = sslverify.KeyPair.generate()
- serverCertReq = serverKey.certificateRequest(serverDN)
-
- clientSelfCertReq = clientKey.certificateRequest(clientDN)
- clientSelfCertData = clientKey.signCertificateRequest(
- clientDN, clientSelfCertReq, lambda dn: True, 132)
- clientSelfCert = clientKey.newCertificate(clientSelfCertData)
-
- serverSelfCertReq = serverKey.certificateRequest(serverDN)
- serverSelfCertData = serverKey.signCertificateRequest(
- serverDN, serverSelfCertReq, lambda dn: True, 516)
- serverSelfCert = serverKey.newCertificate(serverSelfCertData)
-
- clientCertData = serverKey.signCertificateRequest(
- serverDN, clientCertReq, lambda dn: True, 7)
- clientCert = clientKey.newCertificate(clientCertData)
-
- serverCertData = clientKey.signCertificateRequest(
- clientDN, serverCertReq, lambda dn: True, 42)
- serverCert = serverKey.newCertificate(serverCertData)
-
- onData = defer.Deferred()
-
- serverOpts = serverCert.options(serverSelfCert)
- clientOpts = clientCert.options(clientSelfCert)
-
- self.loopback(serverOpts,
- clientOpts,
- onData=onData)
-
- return onData.addCallback(
- lambda result: self.assertEquals(result, WritingProtocol.byte))
-
-
-
-if interfaces.IReactorSSL(reactor, None) is None:
- OpenSSLOptions.skip = "Reactor does not support SSL, cannot run SSL tests"
-
-
-
-class _NotSSLTransport:
- def getHandle(self):
- return self
-
-class _MaybeSSLTransport:
- def getHandle(self):
- return self
-
- def get_peer_certificate(self):
- return None
-
- def get_host_certificate(self):
- return None
-
-
-class _ActualSSLTransport:
- def getHandle(self):
- return self
-
- def get_host_certificate(self):
- return sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM).original
-
- def get_peer_certificate(self):
- return sslverify.Certificate.loadPEM(A_PEER_CERTIFICATE_PEM).original
-
-
-class Constructors(unittest.TestCase):
- def test_peerFromNonSSLTransport(self):
- """
- Verify that peerFromTransport raises an exception if the transport
- passed is not actually an SSL transport.
- """
- x = self.assertRaises(CertificateError,
- sslverify.Certificate.peerFromTransport,
- _NotSSLTransport())
- self.failUnless(str(x).startswith("non-TLS"))
-
- def test_peerFromBlankSSLTransport(self):
- """
- Verify that peerFromTransport raises an exception if the transport
- passed is an SSL transport, but doesn't have a peer certificate.
- """
- x = self.assertRaises(CertificateError,
- sslverify.Certificate.peerFromTransport,
- _MaybeSSLTransport())
- self.failUnless(str(x).startswith("TLS"))
-
- def test_hostFromNonSSLTransport(self):
- """
- Verify that hostFromTransport raises an exception if the transport
- passed is not actually an SSL transport.
- """
- x = self.assertRaises(CertificateError,
- sslverify.Certificate.hostFromTransport,
- _NotSSLTransport())
- self.failUnless(str(x).startswith("non-TLS"))
-
- def test_hostFromBlankSSLTransport(self):
- """
- Verify that hostFromTransport raises an exception if the transport
- passed is an SSL transport, but doesn't have a host certificate.
- """
- x = self.assertRaises(CertificateError,
- sslverify.Certificate.hostFromTransport,
- _MaybeSSLTransport())
- self.failUnless(str(x).startswith("TLS"))
-
-
- def test_hostFromSSLTransport(self):
- """
- Verify that hostFromTransport successfully creates the correct
- certificate if passed a valid SSL transport.
- """
- self.assertEqual(
- sslverify.Certificate.hostFromTransport(
- _ActualSSLTransport()).serialNumber(),
- 12345)
-
- def test_peerFromSSLTransport(self):
- """
- Verify that peerFromTransport successfully creates the correct
- certificate if passed a valid SSL transport.
- """
- self.assertEqual(
- sslverify.Certificate.peerFromTransport(
- _ActualSSLTransport()).serialNumber(),
- 12346)
-
-
-
-if interfaces.IReactorSSL(reactor, None) is None:
- Constructors.skip = "Reactor does not support SSL, cannot run SSL tests"
« no previous file with comments | « third_party/twisted_8_1/twisted/test/test_ssl.py ('k') | third_party/twisted_8_1/twisted/test/test_stateful.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698