| Index: third_party/tlslite/tests/tlstest.py
|
| diff --git a/third_party/tlslite/tests/tlstest.py b/third_party/tlslite/tests/tlstest.py
|
| new file mode 100755
|
| index 0000000000000000000000000000000000000000..fa1b13fa65d3248748de662d75a136ecedf39cba
|
| --- /dev/null
|
| +++ b/third_party/tlslite/tests/tlstest.py
|
| @@ -0,0 +1,793 @@
|
| +#!/usr/bin/env python
|
| +
|
| +# Authors:
|
| +# Trevor Perrin
|
| +# Kees Bos - Added tests for XML-RPC
|
| +# Dimitris Moraitis - Anon ciphersuites
|
| +# Marcelo Fernandez - Added test for NPN
|
| +# Martin von Loewis - python 3 port
|
| +
|
| +#
|
| +# See the LICENSE file for legal information regarding use of this file.
|
| +from __future__ import print_function
|
| +import sys
|
| +import os
|
| +import os.path
|
| +import socket
|
| +import time
|
| +import getopt
|
| +try:
|
| + from BaseHTTPServer import HTTPServer
|
| + from SimpleHTTPServer import SimpleHTTPRequestHandler
|
| +except ImportError:
|
| + from http.server import HTTPServer, SimpleHTTPRequestHandler
|
| +
|
| +from tlslite import TLSConnection, Fault, HandshakeSettings, \
|
| + X509, X509CertChain, IMAP4_TLS, VerifierDB, Session, SessionCache, \
|
| + parsePEMKey, constants, \
|
| + AlertDescription, HTTPTLSConnection, TLSSocketServerMixIn, \
|
| + POP3_TLS, m2cryptoLoaded, pycryptoLoaded, gmpyLoaded, tackpyLoaded, \
|
| + Checker, __version__
|
| +
|
| +from tlslite.errors import *
|
| +from tlslite.utils.cryptomath import prngName
|
| +try:
|
| + import xmlrpclib
|
| +except ImportError:
|
| + # Python 3
|
| + from xmlrpc import client as xmlrpclib
|
| +from tlslite import *
|
| +
|
| +try:
|
| + from tack.structures.Tack import Tack
|
| +
|
| +except ImportError:
|
| + pass
|
| +
|
| +def printUsage(s=None):
|
| + if m2cryptoLoaded:
|
| + crypto = "M2Crypto/OpenSSL"
|
| + else:
|
| + crypto = "Python crypto"
|
| + if s:
|
| + print("ERROR: %s" % s)
|
| + print("""\ntls.py version %s (using %s)
|
| +
|
| +Commands:
|
| + server HOST:PORT DIRECTORY
|
| +
|
| + client HOST:PORT DIRECTORY
|
| +""" % (__version__, crypto))
|
| + sys.exit(-1)
|
| +
|
| +
|
| +def testConnClient(conn):
|
| + b1 = os.urandom(1)
|
| + b10 = os.urandom(10)
|
| + b100 = os.urandom(100)
|
| + b1000 = os.urandom(1000)
|
| + conn.write(b1)
|
| + conn.write(b10)
|
| + conn.write(b100)
|
| + conn.write(b1000)
|
| + assert(conn.read(min=1, max=1) == b1)
|
| + assert(conn.read(min=10, max=10) == b10)
|
| + assert(conn.read(min=100, max=100) == b100)
|
| + assert(conn.read(min=1000, max=1000) == b1000)
|
| +
|
| +def clientTestCmd(argv):
|
| +
|
| + address = argv[0]
|
| + dir = argv[1]
|
| +
|
| + #Split address into hostname/port tuple
|
| + address = address.split(":")
|
| + address = ( address[0], int(address[1]) )
|
| +
|
| + def connect():
|
| + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
| + if hasattr(sock, 'settimeout'): #It's a python 2.3 feature
|
| + sock.settimeout(5)
|
| + sock.connect(address)
|
| + c = TLSConnection(sock)
|
| + return c
|
| +
|
| + test = 0
|
| +
|
| + badFault = False
|
| +
|
| + print("Test 0 - anonymous handshake")
|
| + connection = connect()
|
| + connection.handshakeClientAnonymous()
|
| + testConnClient(connection)
|
| + connection.close()
|
| +
|
| + print("Test 1 - good X509 (plus SNI)")
|
| + connection = connect()
|
| + connection.handshakeClientCert(serverName=address[0])
|
| + testConnClient(connection)
|
| + assert(isinstance(connection.session.serverCertChain, X509CertChain))
|
| + assert(connection.session.serverName == address[0])
|
| + connection.close()
|
| +
|
| + print("Test 1.a - good X509, SSLv3")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + settings.minVersion = (3,0)
|
| + settings.maxVersion = (3,0)
|
| + connection.handshakeClientCert(settings=settings)
|
| + testConnClient(connection)
|
| + assert(isinstance(connection.session.serverCertChain, X509CertChain))
|
| + connection.close()
|
| +
|
| + print("Test 1.b - good X509, RC4-MD5")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + settings.macNames = ["md5"]
|
| + connection.handshakeClientCert(settings=settings)
|
| + testConnClient(connection)
|
| + assert(isinstance(connection.session.serverCertChain, X509CertChain))
|
| + assert(connection.session.cipherSuite == constants.CipherSuite.TLS_RSA_WITH_RC4_128_MD5)
|
| + connection.close()
|
| +
|
| + if tackpyLoaded:
|
| +
|
| + settings = HandshakeSettings()
|
| + settings.useExperimentalTackExtension = True
|
| +
|
| + print("Test 2.a - good X.509, TACK")
|
| + connection = connect()
|
| + connection.handshakeClientCert(settings=settings)
|
| + assert(connection.session.tackExt.tacks[0].getTackId() == "rrted.ptvtl.d2uiq.ox2xe.w4ss3")
|
| + assert(connection.session.tackExt.activation_flags == 1)
|
| + testConnClient(connection)
|
| + connection.close()
|
| +
|
| + print("Test 2.b - good X.509, TACK unrelated to cert chain")
|
| + connection = connect()
|
| + try:
|
| + connection.handshakeClientCert(settings=settings)
|
| + assert(False)
|
| + except TLSLocalAlert as alert:
|
| + if alert.description != AlertDescription.illegal_parameter:
|
| + raise
|
| + connection.close()
|
| +
|
| + print("Test 3 - good SRP")
|
| + connection = connect()
|
| + connection.handshakeClientSRP("test", "password")
|
| + testConnClient(connection)
|
| + connection.close()
|
| +
|
| + print("Test 4 - SRP faults")
|
| + for fault in Fault.clientSrpFaults + Fault.genericFaults:
|
| + connection = connect()
|
| + connection.fault = fault
|
| + try:
|
| + connection.handshakeClientSRP("test", "password")
|
| + print(" Good Fault %s" % (Fault.faultNames[fault]))
|
| + except TLSFaultError as e:
|
| + print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
|
| + badFault = True
|
| +
|
| + print("Test 6 - good SRP: with X.509 certificate, TLSv1.0")
|
| + settings = HandshakeSettings()
|
| + settings.minVersion = (3,1)
|
| + settings.maxVersion = (3,1)
|
| + connection = connect()
|
| + connection.handshakeClientSRP("test", "password", settings=settings)
|
| + assert(isinstance(connection.session.serverCertChain, X509CertChain))
|
| + testConnClient(connection)
|
| + connection.close()
|
| +
|
| + print("Test 7 - X.509 with SRP faults")
|
| + for fault in Fault.clientSrpFaults + Fault.genericFaults:
|
| + connection = connect()
|
| + connection.fault = fault
|
| + try:
|
| + connection.handshakeClientSRP("test", "password")
|
| + print(" Good Fault %s" % (Fault.faultNames[fault]))
|
| + except TLSFaultError as e:
|
| + print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
|
| + badFault = True
|
| +
|
| + print("Test 11 - X.509 faults")
|
| + for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
|
| + connection = connect()
|
| + connection.fault = fault
|
| + try:
|
| + connection.handshakeClientCert()
|
| + print(" Good Fault %s" % (Fault.faultNames[fault]))
|
| + except TLSFaultError as e:
|
| + print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
|
| + badFault = True
|
| +
|
| + print("Test 14 - good mutual X509")
|
| + x509Cert = X509().parse(open(os.path.join(dir, "clientX509Cert.pem")).read())
|
| + x509Chain = X509CertChain([x509Cert])
|
| + s = open(os.path.join(dir, "clientX509Key.pem")).read()
|
| + x509Key = parsePEMKey(s, private=True)
|
| +
|
| + connection = connect()
|
| + connection.handshakeClientCert(x509Chain, x509Key)
|
| + testConnClient(connection)
|
| + assert(isinstance(connection.session.serverCertChain, X509CertChain))
|
| + connection.close()
|
| +
|
| + print("Test 14.a - good mutual X509, SSLv3")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + settings.minVersion = (3,0)
|
| + settings.maxVersion = (3,0)
|
| + connection.handshakeClientCert(x509Chain, x509Key, settings=settings)
|
| + testConnClient(connection)
|
| + assert(isinstance(connection.session.serverCertChain, X509CertChain))
|
| + connection.close()
|
| +
|
| + print("Test 15 - mutual X.509 faults")
|
| + for fault in Fault.clientCertFaults + Fault.genericFaults:
|
| + connection = connect()
|
| + connection.fault = fault
|
| + try:
|
| + connection.handshakeClientCert(x509Chain, x509Key)
|
| + print(" Good Fault %s" % (Fault.faultNames[fault]))
|
| + except TLSFaultError as e:
|
| + print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
|
| + badFault = True
|
| +
|
| + print("Test 18 - good SRP, prepare to resume... (plus SNI)")
|
| + connection = connect()
|
| + connection.handshakeClientSRP("test", "password", serverName=address[0])
|
| + testConnClient(connection)
|
| + connection.close()
|
| + session = connection.session
|
| +
|
| + print("Test 19 - resumption (plus SNI)")
|
| + connection = connect()
|
| + connection.handshakeClientSRP("test", "garbage", serverName=address[0],
|
| + session=session)
|
| + testConnClient(connection)
|
| + #Don't close! -- see below
|
| +
|
| + print("Test 20 - invalidated resumption (plus SNI)")
|
| + connection.sock.close() #Close the socket without a close_notify!
|
| + connection = connect()
|
| + try:
|
| + connection.handshakeClientSRP("test", "garbage",
|
| + serverName=address[0], session=session)
|
| + assert(False)
|
| + except TLSRemoteAlert as alert:
|
| + if alert.description != AlertDescription.bad_record_mac:
|
| + raise
|
| + connection.close()
|
| +
|
| + print("Test 21 - HTTPS test X.509")
|
| + address = address[0], address[1]+1
|
| + if hasattr(socket, "timeout"):
|
| + timeoutEx = socket.timeout
|
| + else:
|
| + timeoutEx = socket.error
|
| + while 1:
|
| + try:
|
| + time.sleep(2)
|
| + htmlBody = bytearray(open(os.path.join(dir, "index.html")).read(), "utf-8")
|
| + fingerprint = None
|
| + for y in range(2):
|
| + checker =Checker(x509Fingerprint=fingerprint)
|
| + h = HTTPTLSConnection(\
|
| + address[0], address[1], checker=checker)
|
| + for x in range(3):
|
| + h.request("GET", "/index.html")
|
| + r = h.getresponse()
|
| + assert(r.status == 200)
|
| + b = bytearray(r.read())
|
| + assert(b == htmlBody)
|
| + fingerprint = h.tlsSession.serverCertChain.getFingerprint()
|
| + assert(fingerprint)
|
| + time.sleep(2)
|
| + break
|
| + except timeoutEx:
|
| + print("timeout, retrying...")
|
| + pass
|
| +
|
| + address = address[0], address[1]+1
|
| +
|
| + implementations = []
|
| + if m2cryptoLoaded:
|
| + implementations.append("openssl")
|
| + if pycryptoLoaded:
|
| + implementations.append("pycrypto")
|
| + implementations.append("python")
|
| +
|
| + print("Test 22 - different ciphers, TLSv1.0")
|
| + for implementation in implementations:
|
| + for cipher in ["aes128", "aes256", "rc4"]:
|
| +
|
| + print("Test 22:", end=' ')
|
| + connection = connect()
|
| +
|
| + settings = HandshakeSettings()
|
| + settings.cipherNames = [cipher]
|
| + settings.cipherImplementations = [implementation, "python"]
|
| + settings.minVersion = (3,1)
|
| + settings.maxVersion = (3,1)
|
| + connection.handshakeClientCert(settings=settings)
|
| + testConnClient(connection)
|
| + print("%s %s" % (connection.getCipherName(), connection.getCipherImplementation()))
|
| + connection.close()
|
| +
|
| + print("Test 23 - throughput test")
|
| + for implementation in implementations:
|
| + for cipher in ["aes128", "aes256", "3des", "rc4"]:
|
| + if cipher == "3des" and implementation not in ("openssl", "pycrypto"):
|
| + continue
|
| +
|
| + print("Test 23:", end=' ')
|
| + connection = connect()
|
| +
|
| + settings = HandshakeSettings()
|
| + settings.cipherNames = [cipher]
|
| + settings.cipherImplementations = [implementation, "python"]
|
| + connection.handshakeClientCert(settings=settings)
|
| + print("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation()), end=' ')
|
| +
|
| + startTime = time.clock()
|
| + connection.write(b"hello"*10000)
|
| + h = connection.read(min=50000, max=50000)
|
| + stopTime = time.clock()
|
| + if stopTime-startTime:
|
| + print("100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime-startTime)))
|
| + else:
|
| + print("100K exchanged very fast")
|
| +
|
| + assert(h == b"hello"*10000)
|
| + connection.close()
|
| +
|
| + print("Test 24.a - Next-Protocol Client Negotiation")
|
| + connection = connect()
|
| + connection.handshakeClientCert(nextProtos=[b"http/1.1"])
|
| + #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
|
| + assert(connection.next_proto == b'http/1.1')
|
| + connection.close()
|
| +
|
| + print("Test 24.b - Next-Protocol Client Negotiation")
|
| + connection = connect()
|
| + connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"])
|
| + #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
|
| + assert(connection.next_proto == b'spdy/2')
|
| + connection.close()
|
| +
|
| + print("Test 24.c - Next-Protocol Client Negotiation")
|
| + connection = connect()
|
| + connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"])
|
| + #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
|
| + assert(connection.next_proto == b'spdy/2')
|
| + connection.close()
|
| +
|
| + print("Test 24.d - Next-Protocol Client Negotiation")
|
| + connection = connect()
|
| + connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"])
|
| + #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
|
| + assert(connection.next_proto == b'spdy/2')
|
| + connection.close()
|
| +
|
| + print("Test 24.e - Next-Protocol Client Negotiation")
|
| + connection = connect()
|
| + connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"])
|
| + #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
|
| + assert(connection.next_proto == b'spdy/3')
|
| + connection.close()
|
| +
|
| + print("Test 24.f - Next-Protocol Client Negotiation")
|
| + connection = connect()
|
| + connection.handshakeClientCert(nextProtos=[b"http/1.1"])
|
| + #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
|
| + assert(connection.next_proto == b'http/1.1')
|
| + connection.close()
|
| +
|
| + print("Test 24.g - Next-Protocol Client Negotiation")
|
| + connection = connect()
|
| + connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"])
|
| + #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
|
| + assert(connection.next_proto == b'spdy/2')
|
| + connection.close()
|
| +
|
| + print('Test 25 - good standard XMLRPC https client')
|
| + time.sleep(2) # Hack for lack of ability to set timeout here
|
| + address = address[0], address[1]+1
|
| + server = xmlrpclib.Server('https://%s:%s' % address)
|
| + assert server.add(1,2) == 3
|
| + assert server.pow(2,4) == 16
|
| +
|
| + print('Test 26 - good tlslite XMLRPC client')
|
| + transport = XMLRPCTransport(ignoreAbruptClose=True)
|
| + server = xmlrpclib.Server('https://%s:%s' % address, transport)
|
| + assert server.add(1,2) == 3
|
| + assert server.pow(2,4) == 16
|
| +
|
| + print('Test 27 - good XMLRPC ignored protocol')
|
| + server = xmlrpclib.Server('http://%s:%s' % address, transport)
|
| + assert server.add(1,2) == 3
|
| + assert server.pow(2,4) == 16
|
| +
|
| + print("Test 28 - Internet servers test")
|
| + try:
|
| + i = IMAP4_TLS("cyrus.andrew.cmu.edu")
|
| + i.login("anonymous", "anonymous@anonymous.net")
|
| + i.logout()
|
| + print("Test 28: IMAP4 good")
|
| + p = POP3_TLS("pop.gmail.com")
|
| + p.quit()
|
| + print("Test 29: POP3 good")
|
| + except socket.error as e:
|
| + print("Non-critical error: socket error trying to reach internet server: ", e)
|
| +
|
| + if not badFault:
|
| + print("Test succeeded")
|
| + else:
|
| + print("Test failed")
|
| +
|
| +
|
| +
|
| +def testConnServer(connection):
|
| + count = 0
|
| + while 1:
|
| + s = connection.read()
|
| + count += len(s)
|
| + if len(s) == 0:
|
| + break
|
| + connection.write(s)
|
| + if count == 1111:
|
| + break
|
| +
|
| +def serverTestCmd(argv):
|
| +
|
| + address = argv[0]
|
| + dir = argv[1]
|
| +
|
| + #Split address into hostname/port tuple
|
| + address = address.split(":")
|
| + address = ( address[0], int(address[1]) )
|
| +
|
| + #Connect to server
|
| + lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
| + lsock.bind(address)
|
| + lsock.listen(5)
|
| +
|
| + def connect():
|
| + return TLSConnection(lsock.accept()[0])
|
| +
|
| + x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read())
|
| + x509Chain = X509CertChain([x509Cert])
|
| + s = open(os.path.join(dir, "serverX509Key.pem")).read()
|
| + x509Key = parsePEMKey(s, private=True)
|
| +
|
| + print("Test 0 - Anonymous server handshake")
|
| + connection = connect()
|
| + connection.handshakeServer(anon=True)
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + print("Test 1 - good X.509")
|
| + connection = connect()
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
|
| + assert(connection.session.serverName == address[0])
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + print("Test 1.a - good X.509, SSL v3")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + settings.minVersion = (3,0)
|
| + settings.maxVersion = (3,0)
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + print("Test 1.b - good X.509, RC4-MD5")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + settings.macNames = ["sha", "md5"]
|
| + settings.cipherNames = ["rc4"]
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + if tackpyLoaded:
|
| + tack = Tack.createFromPem(open("./TACK1.pem", "rU").read())
|
| + tackUnrelated = Tack.createFromPem(open("./TACKunrelated.pem", "rU").read())
|
| +
|
| + settings = HandshakeSettings()
|
| + settings.useExperimentalTackExtension = True
|
| +
|
| + print("Test 2.a - good X.509, TACK")
|
| + connection = connect()
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + tacks=[tack], activationFlags=1, settings=settings)
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + print("Test 2.b - good X.509, TACK unrelated to cert chain")
|
| + connection = connect()
|
| + try:
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + tacks=[tackUnrelated], settings=settings)
|
| + assert(False)
|
| + except TLSRemoteAlert as alert:
|
| + if alert.description != AlertDescription.illegal_parameter:
|
| + raise
|
| +
|
| + print("Test 3 - good SRP")
|
| + verifierDB = VerifierDB()
|
| + verifierDB.create()
|
| + entry = VerifierDB.makeVerifier("test", "password", 1536)
|
| + verifierDB["test"] = entry
|
| +
|
| + connection = connect()
|
| + connection.handshakeServer(verifierDB=verifierDB)
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + print("Test 4 - SRP faults")
|
| + for fault in Fault.clientSrpFaults + Fault.genericFaults:
|
| + connection = connect()
|
| + connection.fault = fault
|
| + try:
|
| + connection.handshakeServer(verifierDB=verifierDB)
|
| + assert()
|
| + except:
|
| + pass
|
| + connection.close()
|
| +
|
| + print("Test 6 - good SRP: with X.509 cert")
|
| + connection = connect()
|
| + connection.handshakeServer(verifierDB=verifierDB, \
|
| + certChain=x509Chain, privateKey=x509Key)
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + print("Test 7 - X.509 with SRP faults")
|
| + for fault in Fault.clientSrpFaults + Fault.genericFaults:
|
| + connection = connect()
|
| + connection.fault = fault
|
| + try:
|
| + connection.handshakeServer(verifierDB=verifierDB, \
|
| + certChain=x509Chain, privateKey=x509Key)
|
| + assert()
|
| + except:
|
| + pass
|
| + connection.close()
|
| +
|
| + print("Test 11 - X.509 faults")
|
| + for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
|
| + connection = connect()
|
| + connection.fault = fault
|
| + try:
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
|
| + assert()
|
| + except:
|
| + pass
|
| + connection.close()
|
| +
|
| + print("Test 14 - good mutual X.509")
|
| + connection = connect()
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
|
| + testConnServer(connection)
|
| + assert(isinstance(connection.session.serverCertChain, X509CertChain))
|
| + connection.close()
|
| +
|
| + print("Test 14a - good mutual X.509, SSLv3")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + settings.minVersion = (3,0)
|
| + settings.maxVersion = (3,0)
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings)
|
| + testConnServer(connection)
|
| + assert(isinstance(connection.session.serverCertChain, X509CertChain))
|
| + connection.close()
|
| +
|
| + print("Test 15 - mutual X.509 faults")
|
| + for fault in Fault.clientCertFaults + Fault.genericFaults:
|
| + connection = connect()
|
| + connection.fault = fault
|
| + try:
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
|
| + assert()
|
| + except:
|
| + pass
|
| + connection.close()
|
| +
|
| + print("Test 18 - good SRP, prepare to resume")
|
| + sessionCache = SessionCache()
|
| + connection = connect()
|
| + connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
|
| + assert(connection.session.serverName == address[0])
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + print("Test 19 - resumption")
|
| + connection = connect()
|
| + connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
|
| + assert(connection.session.serverName == address[0])
|
| + testConnServer(connection)
|
| + #Don't close! -- see next test
|
| +
|
| + print("Test 20 - invalidated resumption")
|
| + try:
|
| + connection.read(min=1, max=1)
|
| + assert() #Client is going to close the socket without a close_notify
|
| + except TLSAbruptCloseError as e:
|
| + pass
|
| + connection = connect()
|
| + try:
|
| + connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
|
| + except TLSLocalAlert as alert:
|
| + if alert.description != AlertDescription.bad_record_mac:
|
| + raise
|
| + connection.close()
|
| +
|
| + print("Test 21 - HTTPS test X.509")
|
| +
|
| + #Close the current listening socket
|
| + lsock.close()
|
| +
|
| + #Create and run an HTTP Server using TLSSocketServerMixIn
|
| + class MyHTTPServer(TLSSocketServerMixIn,
|
| + HTTPServer):
|
| + def handshake(self, tlsConnection):
|
| + tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
|
| + return True
|
| + cd = os.getcwd()
|
| + os.chdir(dir)
|
| + address = address[0], address[1]+1
|
| + httpd = MyHTTPServer(address, SimpleHTTPRequestHandler)
|
| + for x in range(6):
|
| + httpd.handle_request()
|
| + httpd.server_close()
|
| + cd = os.chdir(cd)
|
| +
|
| + #Re-connect the listening socket
|
| + lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
| + address = address[0], address[1]+1
|
| + lsock.bind(address)
|
| + lsock.listen(5)
|
| +
|
| + implementations = []
|
| + if m2cryptoLoaded:
|
| + implementations.append("openssl")
|
| + if pycryptoLoaded:
|
| + implementations.append("pycrypto")
|
| + implementations.append("python")
|
| +
|
| + print("Test 22 - different ciphers")
|
| + for implementation in ["python"] * len(implementations):
|
| + for cipher in ["aes128", "aes256", "rc4"]:
|
| +
|
| + print("Test 22:", end=' ')
|
| + connection = connect()
|
| +
|
| + settings = HandshakeSettings()
|
| + settings.cipherNames = [cipher]
|
| + settings.cipherImplementations = [implementation, "python"]
|
| +
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + settings=settings)
|
| + print(connection.getCipherName(), connection.getCipherImplementation())
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + print("Test 23 - throughput test")
|
| + for implementation in implementations:
|
| + for cipher in ["aes128", "aes256", "3des", "rc4"]:
|
| + if cipher == "3des" and implementation not in ("openssl", "pycrypto"):
|
| + continue
|
| +
|
| + print("Test 23:", end=' ')
|
| + connection = connect()
|
| +
|
| + settings = HandshakeSettings()
|
| + settings.cipherNames = [cipher]
|
| + settings.cipherImplementations = [implementation, "python"]
|
| +
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + settings=settings)
|
| + print(connection.getCipherName(), connection.getCipherImplementation())
|
| + h = connection.read(min=50000, max=50000)
|
| + assert(h == b"hello"*10000)
|
| + connection.write(h)
|
| + connection.close()
|
| +
|
| + print("Test 24.a - Next-Protocol Server Negotiation")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + settings=settings, nextProtos=[b"http/1.1"])
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + print("Test 24.b - Next-Protocol Server Negotiation")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + settings=settings, nextProtos=[b"spdy/2", b"http/1.1"])
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + print("Test 24.c - Next-Protocol Server Negotiation")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + settings=settings, nextProtos=[b"http/1.1", b"spdy/2"])
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + print("Test 24.d - Next-Protocol Server Negotiation")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + settings=settings, nextProtos=[b"spdy/2", b"http/1.1"])
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + print("Test 24.e - Next-Protocol Server Negotiation")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + settings=settings, nextProtos=[b"http/1.1", b"spdy/2", b"spdy/3"])
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + print("Test 24.f - Next-Protocol Server Negotiation")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + settings=settings, nextProtos=[b"spdy/3", b"spdy/2"])
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + print("Test 24.g - Next-Protocol Server Negotiation")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + settings=settings, nextProtos=[])
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + print("Tests 25-27 - XMLRPXC server")
|
| + address = address[0], address[1]+1
|
| + class Server(TLSXMLRPCServer):
|
| +
|
| + def handshake(self, tlsConnection):
|
| + try:
|
| + tlsConnection.handshakeServer(certChain=x509Chain,
|
| + privateKey=x509Key,
|
| + sessionCache=sessionCache)
|
| + tlsConnection.ignoreAbruptClose = True
|
| + return True
|
| + except TLSError as error:
|
| + print("Handshake failure:", str(error))
|
| + return False
|
| +
|
| + class MyFuncs:
|
| + def pow(self, x, y): return pow(x, y)
|
| + def add(self, x, y): return x + y
|
| +
|
| + server = Server(address)
|
| + server.register_instance(MyFuncs())
|
| + #sa = server.socket.getsockname()
|
| + #print "Serving HTTPS on", sa[0], "port", sa[1]
|
| + for i in range(6):
|
| + server.handle_request()
|
| +
|
| + print("Test succeeded")
|
| +
|
| +
|
| +if __name__ == '__main__':
|
| + if len(sys.argv) < 2:
|
| + printUsage("Missing command")
|
| + elif sys.argv[1] == "client"[:len(sys.argv[1])]:
|
| + clientTestCmd(sys.argv[2:])
|
| + elif sys.argv[1] == "server"[:len(sys.argv[1])]:
|
| + serverTestCmd(sys.argv[2:])
|
| + else:
|
| + printUsage("Unknown command: %s" % sys.argv[1])
|
|
|