Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(83)

Unified Diff: third_party/tlslite/tests/tlstest.py

Issue 210323002: Update tlslite to 0.4.6. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Executable bit and --similarity=80 Created 6 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « third_party/tlslite/tests/serverX509Key.pem ('k') | third_party/tlslite/tests/verifierDB » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: third_party/tlslite/tests/tlstest.py
diff --git a/third_party/tlslite/tests/tlstest.py b/third_party/tlslite/tests/tlstest.py
new file mode 100755
index 0000000000000000000000000000000000000000..fa1b13fa65d3248748de662d75a136ecedf39cba
--- /dev/null
+++ b/third_party/tlslite/tests/tlstest.py
@@ -0,0 +1,793 @@
+#!/usr/bin/env python
+
+# Authors:
+# Trevor Perrin
+# Kees Bos - Added tests for XML-RPC
+# Dimitris Moraitis - Anon ciphersuites
+# Marcelo Fernandez - Added test for NPN
+# Martin von Loewis - python 3 port
+
+#
+# See the LICENSE file for legal information regarding use of this file.
+from __future__ import print_function
+import sys
+import os
+import os.path
+import socket
+import time
+import getopt
+try:
+ from BaseHTTPServer import HTTPServer
+ from SimpleHTTPServer import SimpleHTTPRequestHandler
+except ImportError:
+ from http.server import HTTPServer, SimpleHTTPRequestHandler
+
+from tlslite import TLSConnection, Fault, HandshakeSettings, \
+ X509, X509CertChain, IMAP4_TLS, VerifierDB, Session, SessionCache, \
+ parsePEMKey, constants, \
+ AlertDescription, HTTPTLSConnection, TLSSocketServerMixIn, \
+ POP3_TLS, m2cryptoLoaded, pycryptoLoaded, gmpyLoaded, tackpyLoaded, \
+ Checker, __version__
+
+from tlslite.errors import *
+from tlslite.utils.cryptomath import prngName
+try:
+ import xmlrpclib
+except ImportError:
+ # Python 3
+ from xmlrpc import client as xmlrpclib
+from tlslite import *
+
+try:
+ from tack.structures.Tack import Tack
+
+except ImportError:
+ pass
+
+def printUsage(s=None):
+ if m2cryptoLoaded:
+ crypto = "M2Crypto/OpenSSL"
+ else:
+ crypto = "Python crypto"
+ if s:
+ print("ERROR: %s" % s)
+ print("""\ntls.py version %s (using %s)
+
+Commands:
+ server HOST:PORT DIRECTORY
+
+ client HOST:PORT DIRECTORY
+""" % (__version__, crypto))
+ sys.exit(-1)
+
+
+def testConnClient(conn):
+ b1 = os.urandom(1)
+ b10 = os.urandom(10)
+ b100 = os.urandom(100)
+ b1000 = os.urandom(1000)
+ conn.write(b1)
+ conn.write(b10)
+ conn.write(b100)
+ conn.write(b1000)
+ assert(conn.read(min=1, max=1) == b1)
+ assert(conn.read(min=10, max=10) == b10)
+ assert(conn.read(min=100, max=100) == b100)
+ assert(conn.read(min=1000, max=1000) == b1000)
+
+def clientTestCmd(argv):
+
+ address = argv[0]
+ dir = argv[1]
+
+ #Split address into hostname/port tuple
+ address = address.split(":")
+ address = ( address[0], int(address[1]) )
+
+ def connect():
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ if hasattr(sock, 'settimeout'): #It's a python 2.3 feature
+ sock.settimeout(5)
+ sock.connect(address)
+ c = TLSConnection(sock)
+ return c
+
+ test = 0
+
+ badFault = False
+
+ print("Test 0 - anonymous handshake")
+ connection = connect()
+ connection.handshakeClientAnonymous()
+ testConnClient(connection)
+ connection.close()
+
+ print("Test 1 - good X509 (plus SNI)")
+ connection = connect()
+ connection.handshakeClientCert(serverName=address[0])
+ testConnClient(connection)
+ assert(isinstance(connection.session.serverCertChain, X509CertChain))
+ assert(connection.session.serverName == address[0])
+ connection.close()
+
+ print("Test 1.a - good X509, SSLv3")
+ connection = connect()
+ settings = HandshakeSettings()
+ settings.minVersion = (3,0)
+ settings.maxVersion = (3,0)
+ connection.handshakeClientCert(settings=settings)
+ testConnClient(connection)
+ assert(isinstance(connection.session.serverCertChain, X509CertChain))
+ connection.close()
+
+ print("Test 1.b - good X509, RC4-MD5")
+ connection = connect()
+ settings = HandshakeSettings()
+ settings.macNames = ["md5"]
+ connection.handshakeClientCert(settings=settings)
+ testConnClient(connection)
+ assert(isinstance(connection.session.serverCertChain, X509CertChain))
+ assert(connection.session.cipherSuite == constants.CipherSuite.TLS_RSA_WITH_RC4_128_MD5)
+ connection.close()
+
+ if tackpyLoaded:
+
+ settings = HandshakeSettings()
+ settings.useExperimentalTackExtension = True
+
+ print("Test 2.a - good X.509, TACK")
+ connection = connect()
+ connection.handshakeClientCert(settings=settings)
+ assert(connection.session.tackExt.tacks[0].getTackId() == "rrted.ptvtl.d2uiq.ox2xe.w4ss3")
+ assert(connection.session.tackExt.activation_flags == 1)
+ testConnClient(connection)
+ connection.close()
+
+ print("Test 2.b - good X.509, TACK unrelated to cert chain")
+ connection = connect()
+ try:
+ connection.handshakeClientCert(settings=settings)
+ assert(False)
+ except TLSLocalAlert as alert:
+ if alert.description != AlertDescription.illegal_parameter:
+ raise
+ connection.close()
+
+ print("Test 3 - good SRP")
+ connection = connect()
+ connection.handshakeClientSRP("test", "password")
+ testConnClient(connection)
+ connection.close()
+
+ print("Test 4 - SRP faults")
+ for fault in Fault.clientSrpFaults + Fault.genericFaults:
+ connection = connect()
+ connection.fault = fault
+ try:
+ connection.handshakeClientSRP("test", "password")
+ print(" Good Fault %s" % (Fault.faultNames[fault]))
+ except TLSFaultError as e:
+ print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
+ badFault = True
+
+ print("Test 6 - good SRP: with X.509 certificate, TLSv1.0")
+ settings = HandshakeSettings()
+ settings.minVersion = (3,1)
+ settings.maxVersion = (3,1)
+ connection = connect()
+ connection.handshakeClientSRP("test", "password", settings=settings)
+ assert(isinstance(connection.session.serverCertChain, X509CertChain))
+ testConnClient(connection)
+ connection.close()
+
+ print("Test 7 - X.509 with SRP faults")
+ for fault in Fault.clientSrpFaults + Fault.genericFaults:
+ connection = connect()
+ connection.fault = fault
+ try:
+ connection.handshakeClientSRP("test", "password")
+ print(" Good Fault %s" % (Fault.faultNames[fault]))
+ except TLSFaultError as e:
+ print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
+ badFault = True
+
+ print("Test 11 - X.509 faults")
+ for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
+ connection = connect()
+ connection.fault = fault
+ try:
+ connection.handshakeClientCert()
+ print(" Good Fault %s" % (Fault.faultNames[fault]))
+ except TLSFaultError as e:
+ print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
+ badFault = True
+
+ print("Test 14 - good mutual X509")
+ x509Cert = X509().parse(open(os.path.join(dir, "clientX509Cert.pem")).read())
+ x509Chain = X509CertChain([x509Cert])
+ s = open(os.path.join(dir, "clientX509Key.pem")).read()
+ x509Key = parsePEMKey(s, private=True)
+
+ connection = connect()
+ connection.handshakeClientCert(x509Chain, x509Key)
+ testConnClient(connection)
+ assert(isinstance(connection.session.serverCertChain, X509CertChain))
+ connection.close()
+
+ print("Test 14.a - good mutual X509, SSLv3")
+ connection = connect()
+ settings = HandshakeSettings()
+ settings.minVersion = (3,0)
+ settings.maxVersion = (3,0)
+ connection.handshakeClientCert(x509Chain, x509Key, settings=settings)
+ testConnClient(connection)
+ assert(isinstance(connection.session.serverCertChain, X509CertChain))
+ connection.close()
+
+ print("Test 15 - mutual X.509 faults")
+ for fault in Fault.clientCertFaults + Fault.genericFaults:
+ connection = connect()
+ connection.fault = fault
+ try:
+ connection.handshakeClientCert(x509Chain, x509Key)
+ print(" Good Fault %s" % (Fault.faultNames[fault]))
+ except TLSFaultError as e:
+ print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
+ badFault = True
+
+ print("Test 18 - good SRP, prepare to resume... (plus SNI)")
+ connection = connect()
+ connection.handshakeClientSRP("test", "password", serverName=address[0])
+ testConnClient(connection)
+ connection.close()
+ session = connection.session
+
+ print("Test 19 - resumption (plus SNI)")
+ connection = connect()
+ connection.handshakeClientSRP("test", "garbage", serverName=address[0],
+ session=session)
+ testConnClient(connection)
+ #Don't close! -- see below
+
+ print("Test 20 - invalidated resumption (plus SNI)")
+ connection.sock.close() #Close the socket without a close_notify!
+ connection = connect()
+ try:
+ connection.handshakeClientSRP("test", "garbage",
+ serverName=address[0], session=session)
+ assert(False)
+ except TLSRemoteAlert as alert:
+ if alert.description != AlertDescription.bad_record_mac:
+ raise
+ connection.close()
+
+ print("Test 21 - HTTPS test X.509")
+ address = address[0], address[1]+1
+ if hasattr(socket, "timeout"):
+ timeoutEx = socket.timeout
+ else:
+ timeoutEx = socket.error
+ while 1:
+ try:
+ time.sleep(2)
+ htmlBody = bytearray(open(os.path.join(dir, "index.html")).read(), "utf-8")
+ fingerprint = None
+ for y in range(2):
+ checker =Checker(x509Fingerprint=fingerprint)
+ h = HTTPTLSConnection(\
+ address[0], address[1], checker=checker)
+ for x in range(3):
+ h.request("GET", "/index.html")
+ r = h.getresponse()
+ assert(r.status == 200)
+ b = bytearray(r.read())
+ assert(b == htmlBody)
+ fingerprint = h.tlsSession.serverCertChain.getFingerprint()
+ assert(fingerprint)
+ time.sleep(2)
+ break
+ except timeoutEx:
+ print("timeout, retrying...")
+ pass
+
+ address = address[0], address[1]+1
+
+ implementations = []
+ if m2cryptoLoaded:
+ implementations.append("openssl")
+ if pycryptoLoaded:
+ implementations.append("pycrypto")
+ implementations.append("python")
+
+ print("Test 22 - different ciphers, TLSv1.0")
+ for implementation in implementations:
+ for cipher in ["aes128", "aes256", "rc4"]:
+
+ print("Test 22:", end=' ')
+ connection = connect()
+
+ settings = HandshakeSettings()
+ settings.cipherNames = [cipher]
+ settings.cipherImplementations = [implementation, "python"]
+ settings.minVersion = (3,1)
+ settings.maxVersion = (3,1)
+ connection.handshakeClientCert(settings=settings)
+ testConnClient(connection)
+ print("%s %s" % (connection.getCipherName(), connection.getCipherImplementation()))
+ connection.close()
+
+ print("Test 23 - throughput test")
+ for implementation in implementations:
+ for cipher in ["aes128", "aes256", "3des", "rc4"]:
+ if cipher == "3des" and implementation not in ("openssl", "pycrypto"):
+ continue
+
+ print("Test 23:", end=' ')
+ connection = connect()
+
+ settings = HandshakeSettings()
+ settings.cipherNames = [cipher]
+ settings.cipherImplementations = [implementation, "python"]
+ connection.handshakeClientCert(settings=settings)
+ print("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation()), end=' ')
+
+ startTime = time.clock()
+ connection.write(b"hello"*10000)
+ h = connection.read(min=50000, max=50000)
+ stopTime = time.clock()
+ if stopTime-startTime:
+ print("100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime-startTime)))
+ else:
+ print("100K exchanged very fast")
+
+ assert(h == b"hello"*10000)
+ connection.close()
+
+ print("Test 24.a - Next-Protocol Client Negotiation")
+ connection = connect()
+ connection.handshakeClientCert(nextProtos=[b"http/1.1"])
+ #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
+ assert(connection.next_proto == b'http/1.1')
+ connection.close()
+
+ print("Test 24.b - Next-Protocol Client Negotiation")
+ connection = connect()
+ connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"])
+ #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
+ assert(connection.next_proto == b'spdy/2')
+ connection.close()
+
+ print("Test 24.c - Next-Protocol Client Negotiation")
+ connection = connect()
+ connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"])
+ #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
+ assert(connection.next_proto == b'spdy/2')
+ connection.close()
+
+ print("Test 24.d - Next-Protocol Client Negotiation")
+ connection = connect()
+ connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"])
+ #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
+ assert(connection.next_proto == b'spdy/2')
+ connection.close()
+
+ print("Test 24.e - Next-Protocol Client Negotiation")
+ connection = connect()
+ connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"])
+ #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
+ assert(connection.next_proto == b'spdy/3')
+ connection.close()
+
+ print("Test 24.f - Next-Protocol Client Negotiation")
+ connection = connect()
+ connection.handshakeClientCert(nextProtos=[b"http/1.1"])
+ #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
+ assert(connection.next_proto == b'http/1.1')
+ connection.close()
+
+ print("Test 24.g - Next-Protocol Client Negotiation")
+ connection = connect()
+ connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"])
+ #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
+ assert(connection.next_proto == b'spdy/2')
+ connection.close()
+
+ print('Test 25 - good standard XMLRPC https client')
+ time.sleep(2) # Hack for lack of ability to set timeout here
+ address = address[0], address[1]+1
+ server = xmlrpclib.Server('https://%s:%s' % address)
+ assert server.add(1,2) == 3
+ assert server.pow(2,4) == 16
+
+ print('Test 26 - good tlslite XMLRPC client')
+ transport = XMLRPCTransport(ignoreAbruptClose=True)
+ server = xmlrpclib.Server('https://%s:%s' % address, transport)
+ assert server.add(1,2) == 3
+ assert server.pow(2,4) == 16
+
+ print('Test 27 - good XMLRPC ignored protocol')
+ server = xmlrpclib.Server('http://%s:%s' % address, transport)
+ assert server.add(1,2) == 3
+ assert server.pow(2,4) == 16
+
+ print("Test 28 - Internet servers test")
+ try:
+ i = IMAP4_TLS("cyrus.andrew.cmu.edu")
+ i.login("anonymous", "anonymous@anonymous.net")
+ i.logout()
+ print("Test 28: IMAP4 good")
+ p = POP3_TLS("pop.gmail.com")
+ p.quit()
+ print("Test 29: POP3 good")
+ except socket.error as e:
+ print("Non-critical error: socket error trying to reach internet server: ", e)
+
+ if not badFault:
+ print("Test succeeded")
+ else:
+ print("Test failed")
+
+
+
+def testConnServer(connection):
+ count = 0
+ while 1:
+ s = connection.read()
+ count += len(s)
+ if len(s) == 0:
+ break
+ connection.write(s)
+ if count == 1111:
+ break
+
+def serverTestCmd(argv):
+
+ address = argv[0]
+ dir = argv[1]
+
+ #Split address into hostname/port tuple
+ address = address.split(":")
+ address = ( address[0], int(address[1]) )
+
+ #Connect to server
+ lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ lsock.bind(address)
+ lsock.listen(5)
+
+ def connect():
+ return TLSConnection(lsock.accept()[0])
+
+ x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read())
+ x509Chain = X509CertChain([x509Cert])
+ s = open(os.path.join(dir, "serverX509Key.pem")).read()
+ x509Key = parsePEMKey(s, private=True)
+
+ print("Test 0 - Anonymous server handshake")
+ connection = connect()
+ connection.handshakeServer(anon=True)
+ testConnServer(connection)
+ connection.close()
+
+ print("Test 1 - good X.509")
+ connection = connect()
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
+ assert(connection.session.serverName == address[0])
+ testConnServer(connection)
+ connection.close()
+
+ print("Test 1.a - good X.509, SSL v3")
+ connection = connect()
+ settings = HandshakeSettings()
+ settings.minVersion = (3,0)
+ settings.maxVersion = (3,0)
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
+ testConnServer(connection)
+ connection.close()
+
+ print("Test 1.b - good X.509, RC4-MD5")
+ connection = connect()
+ settings = HandshakeSettings()
+ settings.macNames = ["sha", "md5"]
+ settings.cipherNames = ["rc4"]
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
+ testConnServer(connection)
+ connection.close()
+
+ if tackpyLoaded:
+ tack = Tack.createFromPem(open("./TACK1.pem", "rU").read())
+ tackUnrelated = Tack.createFromPem(open("./TACKunrelated.pem", "rU").read())
+
+ settings = HandshakeSettings()
+ settings.useExperimentalTackExtension = True
+
+ print("Test 2.a - good X.509, TACK")
+ connection = connect()
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ tacks=[tack], activationFlags=1, settings=settings)
+ testConnServer(connection)
+ connection.close()
+
+ print("Test 2.b - good X.509, TACK unrelated to cert chain")
+ connection = connect()
+ try:
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ tacks=[tackUnrelated], settings=settings)
+ assert(False)
+ except TLSRemoteAlert as alert:
+ if alert.description != AlertDescription.illegal_parameter:
+ raise
+
+ print("Test 3 - good SRP")
+ verifierDB = VerifierDB()
+ verifierDB.create()
+ entry = VerifierDB.makeVerifier("test", "password", 1536)
+ verifierDB["test"] = entry
+
+ connection = connect()
+ connection.handshakeServer(verifierDB=verifierDB)
+ testConnServer(connection)
+ connection.close()
+
+ print("Test 4 - SRP faults")
+ for fault in Fault.clientSrpFaults + Fault.genericFaults:
+ connection = connect()
+ connection.fault = fault
+ try:
+ connection.handshakeServer(verifierDB=verifierDB)
+ assert()
+ except:
+ pass
+ connection.close()
+
+ print("Test 6 - good SRP: with X.509 cert")
+ connection = connect()
+ connection.handshakeServer(verifierDB=verifierDB, \
+ certChain=x509Chain, privateKey=x509Key)
+ testConnServer(connection)
+ connection.close()
+
+ print("Test 7 - X.509 with SRP faults")
+ for fault in Fault.clientSrpFaults + Fault.genericFaults:
+ connection = connect()
+ connection.fault = fault
+ try:
+ connection.handshakeServer(verifierDB=verifierDB, \
+ certChain=x509Chain, privateKey=x509Key)
+ assert()
+ except:
+ pass
+ connection.close()
+
+ print("Test 11 - X.509 faults")
+ for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
+ connection = connect()
+ connection.fault = fault
+ try:
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
+ assert()
+ except:
+ pass
+ connection.close()
+
+ print("Test 14 - good mutual X.509")
+ connection = connect()
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
+ testConnServer(connection)
+ assert(isinstance(connection.session.serverCertChain, X509CertChain))
+ connection.close()
+
+ print("Test 14a - good mutual X.509, SSLv3")
+ connection = connect()
+ settings = HandshakeSettings()
+ settings.minVersion = (3,0)
+ settings.maxVersion = (3,0)
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings)
+ testConnServer(connection)
+ assert(isinstance(connection.session.serverCertChain, X509CertChain))
+ connection.close()
+
+ print("Test 15 - mutual X.509 faults")
+ for fault in Fault.clientCertFaults + Fault.genericFaults:
+ connection = connect()
+ connection.fault = fault
+ try:
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
+ assert()
+ except:
+ pass
+ connection.close()
+
+ print("Test 18 - good SRP, prepare to resume")
+ sessionCache = SessionCache()
+ connection = connect()
+ connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
+ assert(connection.session.serverName == address[0])
+ testConnServer(connection)
+ connection.close()
+
+ print("Test 19 - resumption")
+ connection = connect()
+ connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
+ assert(connection.session.serverName == address[0])
+ testConnServer(connection)
+ #Don't close! -- see next test
+
+ print("Test 20 - invalidated resumption")
+ try:
+ connection.read(min=1, max=1)
+ assert() #Client is going to close the socket without a close_notify
+ except TLSAbruptCloseError as e:
+ pass
+ connection = connect()
+ try:
+ connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
+ except TLSLocalAlert as alert:
+ if alert.description != AlertDescription.bad_record_mac:
+ raise
+ connection.close()
+
+ print("Test 21 - HTTPS test X.509")
+
+ #Close the current listening socket
+ lsock.close()
+
+ #Create and run an HTTP Server using TLSSocketServerMixIn
+ class MyHTTPServer(TLSSocketServerMixIn,
+ HTTPServer):
+ def handshake(self, tlsConnection):
+ tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
+ return True
+ cd = os.getcwd()
+ os.chdir(dir)
+ address = address[0], address[1]+1
+ httpd = MyHTTPServer(address, SimpleHTTPRequestHandler)
+ for x in range(6):
+ httpd.handle_request()
+ httpd.server_close()
+ cd = os.chdir(cd)
+
+ #Re-connect the listening socket
+ lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ address = address[0], address[1]+1
+ lsock.bind(address)
+ lsock.listen(5)
+
+ implementations = []
+ if m2cryptoLoaded:
+ implementations.append("openssl")
+ if pycryptoLoaded:
+ implementations.append("pycrypto")
+ implementations.append("python")
+
+ print("Test 22 - different ciphers")
+ for implementation in ["python"] * len(implementations):
+ for cipher in ["aes128", "aes256", "rc4"]:
+
+ print("Test 22:", end=' ')
+ connection = connect()
+
+ settings = HandshakeSettings()
+ settings.cipherNames = [cipher]
+ settings.cipherImplementations = [implementation, "python"]
+
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ settings=settings)
+ print(connection.getCipherName(), connection.getCipherImplementation())
+ testConnServer(connection)
+ connection.close()
+
+ print("Test 23 - throughput test")
+ for implementation in implementations:
+ for cipher in ["aes128", "aes256", "3des", "rc4"]:
+ if cipher == "3des" and implementation not in ("openssl", "pycrypto"):
+ continue
+
+ print("Test 23:", end=' ')
+ connection = connect()
+
+ settings = HandshakeSettings()
+ settings.cipherNames = [cipher]
+ settings.cipherImplementations = [implementation, "python"]
+
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ settings=settings)
+ print(connection.getCipherName(), connection.getCipherImplementation())
+ h = connection.read(min=50000, max=50000)
+ assert(h == b"hello"*10000)
+ connection.write(h)
+ connection.close()
+
+ print("Test 24.a - Next-Protocol Server Negotiation")
+ connection = connect()
+ settings = HandshakeSettings()
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ settings=settings, nextProtos=[b"http/1.1"])
+ testConnServer(connection)
+ connection.close()
+
+ print("Test 24.b - Next-Protocol Server Negotiation")
+ connection = connect()
+ settings = HandshakeSettings()
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ settings=settings, nextProtos=[b"spdy/2", b"http/1.1"])
+ testConnServer(connection)
+ connection.close()
+
+ print("Test 24.c - Next-Protocol Server Negotiation")
+ connection = connect()
+ settings = HandshakeSettings()
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ settings=settings, nextProtos=[b"http/1.1", b"spdy/2"])
+ testConnServer(connection)
+ connection.close()
+
+ print("Test 24.d - Next-Protocol Server Negotiation")
+ connection = connect()
+ settings = HandshakeSettings()
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ settings=settings, nextProtos=[b"spdy/2", b"http/1.1"])
+ testConnServer(connection)
+ connection.close()
+
+ print("Test 24.e - Next-Protocol Server Negotiation")
+ connection = connect()
+ settings = HandshakeSettings()
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ settings=settings, nextProtos=[b"http/1.1", b"spdy/2", b"spdy/3"])
+ testConnServer(connection)
+ connection.close()
+
+ print("Test 24.f - Next-Protocol Server Negotiation")
+ connection = connect()
+ settings = HandshakeSettings()
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ settings=settings, nextProtos=[b"spdy/3", b"spdy/2"])
+ testConnServer(connection)
+ connection.close()
+
+ print("Test 24.g - Next-Protocol Server Negotiation")
+ connection = connect()
+ settings = HandshakeSettings()
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ settings=settings, nextProtos=[])
+ testConnServer(connection)
+ connection.close()
+
+ print("Tests 25-27 - XMLRPXC server")
+ address = address[0], address[1]+1
+ class Server(TLSXMLRPCServer):
+
+ def handshake(self, tlsConnection):
+ try:
+ tlsConnection.handshakeServer(certChain=x509Chain,
+ privateKey=x509Key,
+ sessionCache=sessionCache)
+ tlsConnection.ignoreAbruptClose = True
+ return True
+ except TLSError as error:
+ print("Handshake failure:", str(error))
+ return False
+
+ class MyFuncs:
+ def pow(self, x, y): return pow(x, y)
+ def add(self, x, y): return x + y
+
+ server = Server(address)
+ server.register_instance(MyFuncs())
+ #sa = server.socket.getsockname()
+ #print "Serving HTTPS on", sa[0], "port", sa[1]
+ for i in range(6):
+ server.handle_request()
+
+ print("Test succeeded")
+
+
+if __name__ == '__main__':
+ if len(sys.argv) < 2:
+ printUsage("Missing command")
+ elif sys.argv[1] == "client"[:len(sys.argv[1])]:
+ clientTestCmd(sys.argv[2:])
+ elif sys.argv[1] == "server"[:len(sys.argv[1])]:
+ serverTestCmd(sys.argv[2:])
+ else:
+ printUsage("Unknown command: %s" % sys.argv[1])
« no previous file with comments | « third_party/tlslite/tests/serverX509Key.pem ('k') | third_party/tlslite/tests/verifierDB » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698