Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(235)

Unified Diff: third_party/twisted_8_1/twisted/test/test_ssl.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/twisted_8_1/twisted/test/test_ssl.py
diff --git a/third_party/twisted_8_1/twisted/test/test_ssl.py b/third_party/twisted_8_1/twisted/test/test_ssl.py
deleted file mode 100644
index 15892994ede6e0a967484d348044113abde1c690..0000000000000000000000000000000000000000
--- a/third_party/twisted_8_1/twisted/test/test_ssl.py
+++ /dev/null
@@ -1,431 +0,0 @@
-# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-
-from twisted.trial import unittest, util as trial_util
-from twisted.internet import protocol, reactor, interfaces, defer
-from twisted.protocols import basic
-from twisted.python import util, log
-from twisted.python.runtime import platform
-from twisted.test.test_tcp import WriteDataTestCase, ProperlyCloseFilesMixin
-
-import os, errno
-
-try:
- from OpenSSL import SSL, crypto
- from twisted.internet import ssl
- from twisted.test.ssl_helpers import ClientTLSContext
-except ImportError:
- def _noSSL():
- # ugh, make pyflakes happy.
- global SSL
- global ssl
- SSL = ssl = None
- _noSSL()
-
-certPath = util.sibpath(__file__, "server.pem")
-
-class UnintelligentProtocol(basic.LineReceiver):
- pretext = [
- "first line",
- "last thing before tls starts",
- "STARTTLS"]
-
- posttext = [
- "first thing after tls started",
- "last thing ever"]
-
- def connectionMade(self):
- for l in self.pretext:
- self.sendLine(l)
-
- def lineReceived(self, line):
- if line == "READY":
- self.transport.startTLS(ClientTLSContext(), self.factory.client)
- for l in self.posttext:
- self.sendLine(l)
- self.transport.loseConnection()
-
-
-class LineCollector(basic.LineReceiver):
- def __init__(self, doTLS, fillBuffer=0):
- self.doTLS = doTLS
- self.fillBuffer = fillBuffer
-
- def connectionMade(self):
- self.factory.rawdata = ''
- self.factory.lines = []
-
- def lineReceived(self, line):
- self.factory.lines.append(line)
- if line == 'STARTTLS':
- if self.fillBuffer:
- for x in range(500):
- self.sendLine('X'*1000)
- self.sendLine('READY')
- if self.doTLS:
- ctx = ServerTLSContext(
- privateKeyFileName=certPath,
- certificateFileName=certPath,
- )
- self.transport.startTLS(ctx, self.factory.server)
- else:
- self.setRawMode()
-
- def rawDataReceived(self, data):
- self.factory.rawdata += data
- self.factory.done = 1
-
- def connectionLost(self, reason):
- self.factory.done = 1
-
-
-class SingleLineServerProtocol(protocol.Protocol):
- def connectionMade(self):
- self.transport.identifier = 'SERVER'
- self.transport.write("+OK <some crap>\r\n")
- self.transport.getPeerCertificate()
-
-
-class RecordingClientProtocol(protocol.Protocol):
- def connectionMade(self):
- self.transport.identifier = 'CLIENT'
- self.buffer = []
- self.transport.getPeerCertificate()
-
- def dataReceived(self, data):
- self.factory.buffer.append(data)
-
-
-class ImmediatelyDisconnectingProtocol(protocol.Protocol):
- def connectionMade(self):
- self.transport.loseConnection()
-
- def connectionLost(self, reason):
- self.factory.connectionDisconnected.callback(None)
-
-
-class AlmostImmediatelyDisconnectingProtocol(protocol.Protocol):
- def connectionMade(self):
- # Twisted's SSL support is terribly broken.
- reactor.callLater(0.1, self.transport.loseConnection)
-
- def connectionLost(self, reason):
- self.factory.connectionDisconnected.callback(reason)
-
-
-def generateCertificateObjects(organization, organizationalUnit):
- pkey = crypto.PKey()
- pkey.generate_key(crypto.TYPE_RSA, 512)
- req = crypto.X509Req()
- subject = req.get_subject()
- subject.O = organization
- subject.OU = organizationalUnit
- req.set_pubkey(pkey)
- req.sign(pkey, "md5")
-
- # Here comes the actual certificate
- cert = crypto.X509()
- cert.set_serial_number(1)
- cert.gmtime_adj_notBefore(0)
- cert.gmtime_adj_notAfter(60) # Testing certificates need not be long lived
- cert.set_issuer(req.get_subject())
- cert.set_subject(req.get_subject())
- cert.set_pubkey(req.get_pubkey())
- cert.sign(pkey, "md5")
-
- return pkey, req, cert
-
-
-def generateCertificateFiles(basename, organization, organizationalUnit):
- pkey, req, cert = generateCertificateObjects(organization, organizationalUnit)
-
- for ext, obj, dumpFunc in [
- ('key', pkey, crypto.dump_privatekey),
- ('req', req, crypto.dump_certificate_request),
- ('cert', cert, crypto.dump_certificate)]:
- fName = os.extsep.join((basename, ext))
- fObj = file(fName, 'w')
- fObj.write(dumpFunc(crypto.FILETYPE_PEM, obj))
- fObj.close()
-
-
-class ContextGeneratingMixin:
- def makeContextFactory(self, org, orgUnit, *args, **kwArgs):
- base = self.mktemp()
- generateCertificateFiles(base, org, orgUnit)
- serverCtxFactory = ssl.DefaultOpenSSLContextFactory(
- os.extsep.join((base, 'key')),
- os.extsep.join((base, 'cert')),
- *args, **kwArgs)
-
- return base, serverCtxFactory
-
- def setupServerAndClient(self, clientArgs, clientKwArgs, serverArgs, serverKwArgs):
- self.clientBase, self.clientCtxFactory = self.makeContextFactory(
- *clientArgs, **clientKwArgs)
- self.serverBase, self.serverCtxFactory = self.makeContextFactory(
- *serverArgs, **serverKwArgs)
-
-
-if SSL is not None:
- class ServerTLSContext(ssl.DefaultOpenSSLContextFactory):
- isClient = 0
- def __init__(self, *args, **kw):
- kw['sslmethod'] = SSL.TLSv1_METHOD
- ssl.DefaultOpenSSLContextFactory.__init__(self, *args, **kw)
-
-
-class StolenTCPTestCase(ProperlyCloseFilesMixin, WriteDataTestCase):
- """
- For SSL transports, test many of the same things which are tested for
- TCP transports.
- """
- def createServer(self, address, portNumber, factory):
- contextFactory = ssl.CertificateOptions()
- return reactor.listenSSL(
- portNumber, factory, contextFactory, interface=address)
-
-
- def connectClient(self, address, portNumber, clientCreator):
- contextFactory = ssl.CertificateOptions()
- return clientCreator.connectSSL(address, portNumber, contextFactory)
-
-
- def getHandleExceptionType(self):
- return SSL.SysCallError
-
-
- def getHandleErrorCode(self):
- # Windows 2000 SP 4 and Windows XP SP 2 give back WSAENOTSOCK for
- # SSL.Connection.write for some reason.
- if platform.getType() == 'win32':
- return errno.WSAENOTSOCK
- return ProperlyCloseFilesMixin.getHandleErrorCode(self)
-
-
-class TLSTestCase(unittest.TestCase):
- fillBuffer = 0
-
- port = None
- clientProto = None
- serverProto = None
-
- def tearDown(self):
- if self.clientProto is not None and self.clientProto.transport is not None:
- self.clientProto.transport.loseConnection()
- if self.serverProto is not None and self.serverProto.transport is not None:
- self.serverProto.transport.loseConnection()
-
- if self.port is not None:
- return defer.maybeDeferred(self.port.stopListening)
-
- def _runTest(self, clientProto, serverProto, clientIsServer=False):
- self.clientProto = clientProto
- cf = self.clientFactory = protocol.ClientFactory()
- cf.protocol = lambda: clientProto
- if clientIsServer:
- cf.server = 0
- else:
- cf.client = 1
-
- self.serverProto = serverProto
- sf = self.serverFactory = protocol.ServerFactory()
- sf.protocol = lambda: serverProto
- if clientIsServer:
- sf.client = 0
- else:
- sf.server = 1
-
- if clientIsServer:
- inCharge = cf
- else:
- inCharge = sf
- inCharge.done = 0
-
- port = self.port = reactor.listenTCP(0, sf, interface="127.0.0.1")
- portNo = port.getHost().port
-
- reactor.connectTCP('127.0.0.1', portNo, cf)
-
- i = 0
- while i < 1000 and not inCharge.done:
- reactor.iterate(0.01)
- i += 1
- self.failUnless(
- inCharge.done,
- "Never finished reading all lines: %s" % (inCharge.lines,))
-
-
- def testTLS(self):
- self._runTest(UnintelligentProtocol(), LineCollector(1, self.fillBuffer))
- self.assertEquals(
- self.serverFactory.lines,
- UnintelligentProtocol.pretext + UnintelligentProtocol.posttext
- )
-
-
- def testUnTLS(self):
- self._runTest(UnintelligentProtocol(), LineCollector(0, self.fillBuffer))
- self.assertEquals(
- self.serverFactory.lines,
- UnintelligentProtocol.pretext
- )
- self.failUnless(self.serverFactory.rawdata, "No encrypted bytes received")
-
-
- def testBackwardsTLS(self):
- self._runTest(LineCollector(1, self.fillBuffer), UnintelligentProtocol(), True)
- self.assertEquals(
- self.clientFactory.lines,
- UnintelligentProtocol.pretext + UnintelligentProtocol.posttext
- )
-
-
-
-_bufferedSuppression = trial_util.suppress(
- message="startTLS with unwritten buffered data currently doesn't work "
- "right. See issue #686. Closing connection.",
- category=RuntimeWarning)
-
-
-class SpammyTLSTestCase(TLSTestCase):
- """
- Test TLS features with bytes sitting in the out buffer.
- """
- fillBuffer = 1
-
- def testTLS(self):
- return TLSTestCase.testTLS(self)
- testTLS.suppress = [_bufferedSuppression]
- testTLS.todo = "startTLS doesn't empty buffer before starting TLS. :("
-
-
- def testBackwardsTLS(self):
- return TLSTestCase.testBackwardsTLS(self)
- testBackwardsTLS.suppress = [_bufferedSuppression]
- testBackwardsTLS.todo = "startTLS doesn't empty buffer before starting TLS. :("
-
-
-class BufferingTestCase(unittest.TestCase):
- port = None
- connector = None
- serverProto = None
- clientProto = None
-
- def tearDown(self):
- if self.serverProto is not None and self.serverProto.transport is not None:
- self.serverProto.transport.loseConnection()
- if self.clientProto is not None and self.clientProto.transport is not None:
- self.clientProto.transport.loseConnection()
- if self.port is not None:
- return defer.maybeDeferred(self.port.stopListening)
-
- def testOpenSSLBuffering(self):
- serverProto = self.serverProto = SingleLineServerProtocol()
- clientProto = self.clientProto = RecordingClientProtocol()
-
- server = protocol.ServerFactory()
- client = self.client = protocol.ClientFactory()
-
- server.protocol = lambda: serverProto
- client.protocol = lambda: clientProto
- client.buffer = []
-
- sCTX = ssl.DefaultOpenSSLContextFactory(certPath, certPath)
- cCTX = ssl.ClientContextFactory()
-
- port = self.port = reactor.listenSSL(0, server, sCTX, interface='127.0.0.1')
- reactor.connectSSL('127.0.0.1', port.getHost().port, client, cCTX)
-
- i = 0
- while i < 5000 and not client.buffer:
- i += 1
- reactor.iterate()
-
- self.assertEquals(client.buffer, ["+OK <some crap>\r\n"])
-
-
-class ConnectionLostTestCase(unittest.TestCase, ContextGeneratingMixin):
-
- def testImmediateDisconnect(self):
- org = "twisted.test.test_ssl"
- self.setupServerAndClient(
- (org, org + ", client"), {},
- (org, org + ", server"), {})
-
- # Set up a server, connect to it with a client, which should work since our verifiers
- # allow anything, then disconnect.
- serverProtocolFactory = protocol.ServerFactory()
- serverProtocolFactory.protocol = protocol.Protocol
- self.serverPort = serverPort = reactor.listenSSL(0,
- serverProtocolFactory, self.serverCtxFactory)
-
- clientProtocolFactory = protocol.ClientFactory()
- clientProtocolFactory.protocol = ImmediatelyDisconnectingProtocol
- clientProtocolFactory.connectionDisconnected = defer.Deferred()
- clientConnector = reactor.connectSSL('127.0.0.1',
- serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory)
-
- return clientProtocolFactory.connectionDisconnected.addCallback(
- lambda ignoredResult: self.serverPort.stopListening())
-
- def testFailedVerify(self):
- org = "twisted.test.test_ssl"
- self.setupServerAndClient(
- (org, org + ", client"), {},
- (org, org + ", server"), {})
-
- def verify(*a):
- return False
- self.clientCtxFactory.getContext().set_verify(SSL.VERIFY_PEER, verify)
-
- serverConnLost = defer.Deferred()
- serverProtocol = protocol.Protocol()
- serverProtocol.connectionLost = serverConnLost.callback
- serverProtocolFactory = protocol.ServerFactory()
- serverProtocolFactory.protocol = lambda: serverProtocol
- self.serverPort = serverPort = reactor.listenSSL(0,
- serverProtocolFactory, self.serverCtxFactory)
-
- clientConnLost = defer.Deferred()
- clientProtocol = protocol.Protocol()
- clientProtocol.connectionLost = clientConnLost.callback
- clientProtocolFactory = protocol.ClientFactory()
- clientProtocolFactory.protocol = lambda: clientProtocol
- clientConnector = reactor.connectSSL('127.0.0.1',
- serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory)
-
- dl = defer.DeferredList([serverConnLost, clientConnLost], consumeErrors=True)
- return dl.addCallback(self._cbLostConns)
-
- def _cbLostConns(self, results):
- (sSuccess, sResult), (cSuccess, cResult) = results
-
- self.failIf(sSuccess)
- self.failIf(cSuccess)
-
- acceptableErrors = [SSL.Error]
-
- # Rather than getting a verification failure on Windows, we are getting
- # a connection failure. Without something like sslverify proxying
- # in-between we can't fix up the platform's errors, so let's just
- # specifically say it is only OK in this one case to keep the tests
- # passing. Normally we'd like to be as strict as possible here, so
- # we're not going to allow this to report errors incorrectly on any
- # other platforms.
-
- if platform.isWindows():
- from twisted.internet.error import ConnectionLost
- acceptableErrors.append(ConnectionLost)
-
- sResult.trap(*acceptableErrors)
- cResult.trap(*acceptableErrors)
-
- return self.serverPort.stopListening()
-
-
-if interfaces.IReactorSSL(reactor, None) is None:
- for tCase in [StolenTCPTestCase, TLSTestCase, SpammyTLSTestCase,
- BufferingTestCase, ConnectionLostTestCase]:
- tCase.skip = "Reactor does not support SSL, cannot run SSL tests"
« no previous file with comments | « third_party/twisted_8_1/twisted/test/test_split_compat.py ('k') | third_party/twisted_8_1/twisted/test/test_sslverify.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698