| Index: third_party/tlslite/tests/tlstest.py
|
| diff --git a/third_party/tlslite/scripts/tls.py b/third_party/tlslite/tests/tlstest.py
|
| old mode 100644
|
| new mode 100755
|
| similarity index 29%
|
| copy from third_party/tlslite/scripts/tls.py
|
| copy to third_party/tlslite/tests/tlstest.py
|
| index fa2c663f3b06bff1a2567e240e0eb5a110f2416b..fa1b13fa65d3248748de662d75a136ecedf39cba
|
| --- a/third_party/tlslite/scripts/tls.py
|
| +++ b/third_party/tlslite/tests/tlstest.py
|
| @@ -1,47 +1,87 @@
|
| -#! python
|
| -
|
| +#!/usr/bin/env python
|
| +
|
| +# Authors:
|
| +# Trevor Perrin
|
| +# Kees Bos - Added tests for XML-RPC
|
| +# Dimitris Moraitis - Anon ciphersuites
|
| +# Marcelo Fernandez - Added test for NPN
|
| +# Martin von Loewis - python 3 port
|
| +
|
| +#
|
| +# See the LICENSE file for legal information regarding use of this file.
|
| +from __future__ import print_function
|
| import sys
|
| import os
|
| import os.path
|
| import socket
|
| -import thread
|
| import time
|
| -import httplib
|
| -import BaseHTTPServer
|
| -import SimpleHTTPServer
|
| -
|
| -
|
| +import getopt
|
| try:
|
| - from cryptoIDlib.api import *
|
| - cryptoIDlibLoaded = True
|
| -except:
|
| - cryptoIDlibLoaded = False
|
| -
|
| -if __name__ != "__main__":
|
| - raise "This must be run as a command, not used as a module!"
|
| -
|
| -#import tlslite
|
| -#from tlslite.constants import AlertDescription, Fault
|
| -
|
| -#from tlslite.utils.jython_compat import formatExceptionTrace
|
| -#from tlslite.X509 import X509, X509CertChain
|
| -
|
| -from tlslite.api import *
|
| -
|
| -def parsePrivateKey(s):
|
| - try:
|
| - return parsePEMKey(s, private=True)
|
| - except Exception, e:
|
| - print e
|
| - return parseXMLKey(s, private=True)
|
| + from BaseHTTPServer import HTTPServer
|
| + from SimpleHTTPServer import SimpleHTTPRequestHandler
|
| +except ImportError:
|
| + from http.server import HTTPServer, SimpleHTTPRequestHandler
|
| +
|
| +from tlslite import TLSConnection, Fault, HandshakeSettings, \
|
| + X509, X509CertChain, IMAP4_TLS, VerifierDB, Session, SessionCache, \
|
| + parsePEMKey, constants, \
|
| + AlertDescription, HTTPTLSConnection, TLSSocketServerMixIn, \
|
| + POP3_TLS, m2cryptoLoaded, pycryptoLoaded, gmpyLoaded, tackpyLoaded, \
|
| + Checker, __version__
|
| +
|
| +from tlslite.errors import *
|
| +from tlslite.utils.cryptomath import prngName
|
| +try:
|
| + import xmlrpclib
|
| +except ImportError:
|
| + # Python 3
|
| + from xmlrpc import client as xmlrpclib
|
| +from tlslite import *
|
|
|
| +try:
|
| + from tack.structures.Tack import Tack
|
| +
|
| +except ImportError:
|
| + pass
|
|
|
| -def clientTest(address, dir):
|
| +def printUsage(s=None):
|
| + if m2cryptoLoaded:
|
| + crypto = "M2Crypto/OpenSSL"
|
| + else:
|
| + crypto = "Python crypto"
|
| + if s:
|
| + print("ERROR: %s" % s)
|
| + print("""\ntls.py version %s (using %s)
|
| +
|
| +Commands:
|
| + server HOST:PORT DIRECTORY
|
| +
|
| + client HOST:PORT DIRECTORY
|
| +""" % (__version__, crypto))
|
| + sys.exit(-1)
|
| +
|
| +
|
| +def testConnClient(conn):
|
| + b1 = os.urandom(1)
|
| + b10 = os.urandom(10)
|
| + b100 = os.urandom(100)
|
| + b1000 = os.urandom(1000)
|
| + conn.write(b1)
|
| + conn.write(b10)
|
| + conn.write(b100)
|
| + conn.write(b1000)
|
| + assert(conn.read(min=1, max=1) == b1)
|
| + assert(conn.read(min=10, max=10) == b10)
|
| + assert(conn.read(min=100, max=100) == b100)
|
| + assert(conn.read(min=1000, max=1000) == b1000)
|
| +
|
| +def clientTestCmd(argv):
|
| +
|
| + address = argv[0]
|
| + dir = argv[1]
|
|
|
| #Split address into hostname/port tuple
|
| address = address.split(":")
|
| - if len(address)==1:
|
| - address.append("4443")
|
| address = ( address[0], int(address[1]) )
|
|
|
| def connect():
|
| @@ -56,142 +96,113 @@ def clientTest(address, dir):
|
|
|
| badFault = False
|
|
|
| - print "Test 1 - good shared key"
|
| + print("Test 0 - anonymous handshake")
|
| + connection = connect()
|
| + connection.handshakeClientAnonymous()
|
| + testConnClient(connection)
|
| + connection.close()
|
| +
|
| + print("Test 1 - good X509 (plus SNI)")
|
| + connection = connect()
|
| + connection.handshakeClientCert(serverName=address[0])
|
| + testConnClient(connection)
|
| + assert(isinstance(connection.session.serverCertChain, X509CertChain))
|
| + assert(connection.session.serverName == address[0])
|
| + connection.close()
|
| +
|
| + print("Test 1.a - good X509, SSLv3")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + settings.minVersion = (3,0)
|
| + settings.maxVersion = (3,0)
|
| + connection.handshakeClientCert(settings=settings)
|
| + testConnClient(connection)
|
| + assert(isinstance(connection.session.serverCertChain, X509CertChain))
|
| + connection.close()
|
| +
|
| + print("Test 1.b - good X509, RC4-MD5")
|
| connection = connect()
|
| - connection.handshakeClientSharedKey("shared", "key")
|
| + settings = HandshakeSettings()
|
| + settings.macNames = ["md5"]
|
| + connection.handshakeClientCert(settings=settings)
|
| + testConnClient(connection)
|
| + assert(isinstance(connection.session.serverCertChain, X509CertChain))
|
| + assert(connection.session.cipherSuite == constants.CipherSuite.TLS_RSA_WITH_RC4_128_MD5)
|
| connection.close()
|
| - connection.sock.close()
|
|
|
| - print "Test 2 - shared key faults"
|
| - for fault in Fault.clientSharedKeyFaults + Fault.genericFaults:
|
| + if tackpyLoaded:
|
| +
|
| + settings = HandshakeSettings()
|
| + settings.useExperimentalTackExtension = True
|
| +
|
| + print("Test 2.a - good X.509, TACK")
|
| + connection = connect()
|
| + connection.handshakeClientCert(settings=settings)
|
| + assert(connection.session.tackExt.tacks[0].getTackId() == "rrted.ptvtl.d2uiq.ox2xe.w4ss3")
|
| + assert(connection.session.tackExt.activation_flags == 1)
|
| + testConnClient(connection)
|
| + connection.close()
|
| +
|
| + print("Test 2.b - good X.509, TACK unrelated to cert chain")
|
| connection = connect()
|
| - connection.fault = fault
|
| try:
|
| - connection.handshakeClientSharedKey("shared", "key")
|
| - print " Good Fault %s" % (Fault.faultNames[fault])
|
| - except TLSFaultError, e:
|
| - print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
|
| - badFault = True
|
| - connection.sock.close()
|
| + connection.handshakeClientCert(settings=settings)
|
| + assert(False)
|
| + except TLSLocalAlert as alert:
|
| + if alert.description != AlertDescription.illegal_parameter:
|
| + raise
|
| + connection.close()
|
|
|
| - print "Test 3 - good SRP"
|
| + print("Test 3 - good SRP")
|
| connection = connect()
|
| connection.handshakeClientSRP("test", "password")
|
| + testConnClient(connection)
|
| connection.close()
|
|
|
| - print "Test 4 - SRP faults"
|
| + print("Test 4 - SRP faults")
|
| for fault in Fault.clientSrpFaults + Fault.genericFaults:
|
| connection = connect()
|
| connection.fault = fault
|
| try:
|
| connection.handshakeClientSRP("test", "password")
|
| - print " Good Fault %s" % (Fault.faultNames[fault])
|
| - except TLSFaultError, e:
|
| - print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
|
| + print(" Good Fault %s" % (Fault.faultNames[fault]))
|
| + except TLSFaultError as e:
|
| + print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
|
| badFault = True
|
| - connection.sock.close()
|
| -
|
| - print "Test 5 - good SRP: unknown_srp_username idiom"
|
| - def srpCallback():
|
| - return ("test", "password")
|
| - connection = connect()
|
| - connection.handshakeClientUnknown(srpCallback=srpCallback)
|
| - connection.close()
|
| - connection.sock.close()
|
|
|
| - print "Test 6 - good SRP: with X.509 certificate"
|
| + print("Test 6 - good SRP: with X.509 certificate, TLSv1.0")
|
| + settings = HandshakeSettings()
|
| + settings.minVersion = (3,1)
|
| + settings.maxVersion = (3,1)
|
| connection = connect()
|
| - connection.handshakeClientSRP("test", "password")
|
| + connection.handshakeClientSRP("test", "password", settings=settings)
|
| assert(isinstance(connection.session.serverCertChain, X509CertChain))
|
| + testConnClient(connection)
|
| connection.close()
|
| - connection.sock.close()
|
|
|
| - print "Test 7 - X.509 with SRP faults"
|
| + print("Test 7 - X.509 with SRP faults")
|
| for fault in Fault.clientSrpFaults + Fault.genericFaults:
|
| connection = connect()
|
| connection.fault = fault
|
| try:
|
| connection.handshakeClientSRP("test", "password")
|
| - print " Good Fault %s" % (Fault.faultNames[fault])
|
| - except TLSFaultError, e:
|
| - print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
|
| + print(" Good Fault %s" % (Fault.faultNames[fault]))
|
| + except TLSFaultError as e:
|
| + print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
|
| badFault = True
|
| - connection.sock.close()
|
| -
|
| - if cryptoIDlibLoaded:
|
| - print "Test 8 - good SRP: with cryptoID certificate chain"
|
| - connection = connect()
|
| - connection.handshakeClientSRP("test", "password")
|
| - assert(isinstance(connection.session.serverCertChain, CertChain))
|
| - if not (connection.session.serverCertChain.validate()):
|
| - print connection.session.serverCertChain.validate(listProblems=True)
|
| -
|
| - connection.close()
|
| - connection.sock.close()
|
| -
|
| - print "Test 9 - CryptoID with SRP faults"
|
| - for fault in Fault.clientSrpFaults + Fault.genericFaults:
|
| - connection = connect()
|
| - connection.fault = fault
|
| - try:
|
| - connection.handshakeClientSRP("test", "password")
|
| - print " Good Fault %s" % (Fault.faultNames[fault])
|
| - except TLSFaultError, e:
|
| - print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
|
| - badFault = True
|
| - connection.sock.close()
|
| -
|
| - print "Test 10 - good X509"
|
| - connection = connect()
|
| - connection.handshakeClientCert()
|
| - assert(isinstance(connection.session.serverCertChain, X509CertChain))
|
| - connection.close()
|
| - connection.sock.close()
|
| -
|
| - print "Test 10.a - good X509, SSLv3"
|
| - connection = connect()
|
| - settings = HandshakeSettings()
|
| - settings.minVersion = (3,0)
|
| - settings.maxVersion = (3,0)
|
| - connection.handshakeClientCert(settings=settings)
|
| - assert(isinstance(connection.session.serverCertChain, X509CertChain))
|
| - connection.close()
|
| - connection.sock.close()
|
|
|
| - print "Test 11 - X.509 faults"
|
| + print("Test 11 - X.509 faults")
|
| for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
|
| connection = connect()
|
| connection.fault = fault
|
| try:
|
| connection.handshakeClientCert()
|
| - print " Good Fault %s" % (Fault.faultNames[fault])
|
| - except TLSFaultError, e:
|
| - print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
|
| + print(" Good Fault %s" % (Fault.faultNames[fault]))
|
| + except TLSFaultError as e:
|
| + print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
|
| badFault = True
|
| - connection.sock.close()
|
|
|
| - if cryptoIDlibLoaded:
|
| - print "Test 12 - good cryptoID"
|
| - connection = connect()
|
| - connection.handshakeClientCert()
|
| - assert(isinstance(connection.session.serverCertChain, CertChain))
|
| - assert(connection.session.serverCertChain.validate())
|
| - connection.close()
|
| - connection.sock.close()
|
| -
|
| - print "Test 13 - cryptoID faults"
|
| - for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
|
| - connection = connect()
|
| - connection.fault = fault
|
| - try:
|
| - connection.handshakeClientCert()
|
| - print " Good Fault %s" % (Fault.faultNames[fault])
|
| - except TLSFaultError, e:
|
| - print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
|
| - badFault = True
|
| - connection.sock.close()
|
| -
|
| - print "Test 14 - good mutual X509"
|
| + print("Test 14 - good mutual X509")
|
| x509Cert = X509().parse(open(os.path.join(dir, "clientX509Cert.pem")).read())
|
| x509Chain = X509CertChain([x509Cert])
|
| s = open(os.path.join(dir, "clientX509Key.pem")).read()
|
| @@ -199,80 +210,58 @@ def clientTest(address, dir):
|
|
|
| connection = connect()
|
| connection.handshakeClientCert(x509Chain, x509Key)
|
| + testConnClient(connection)
|
| assert(isinstance(connection.session.serverCertChain, X509CertChain))
|
| connection.close()
|
| - connection.sock.close()
|
|
|
| - print "Test 14.a - good mutual X509, SSLv3"
|
| + print("Test 14.a - good mutual X509, SSLv3")
|
| connection = connect()
|
| settings = HandshakeSettings()
|
| settings.minVersion = (3,0)
|
| settings.maxVersion = (3,0)
|
| connection.handshakeClientCert(x509Chain, x509Key, settings=settings)
|
| + testConnClient(connection)
|
| assert(isinstance(connection.session.serverCertChain, X509CertChain))
|
| connection.close()
|
| - connection.sock.close()
|
|
|
| - print "Test 15 - mutual X.509 faults"
|
| + print("Test 15 - mutual X.509 faults")
|
| for fault in Fault.clientCertFaults + Fault.genericFaults:
|
| connection = connect()
|
| connection.fault = fault
|
| try:
|
| connection.handshakeClientCert(x509Chain, x509Key)
|
| - print " Good Fault %s" % (Fault.faultNames[fault])
|
| - except TLSFaultError, e:
|
| - print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
|
| + print(" Good Fault %s" % (Fault.faultNames[fault]))
|
| + except TLSFaultError as e:
|
| + print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
|
| badFault = True
|
| - connection.sock.close()
|
| -
|
| - if cryptoIDlibLoaded:
|
| - print "Test 16 - good mutual cryptoID"
|
| - cryptoIDChain = CertChain().parse(open(os.path.join(dir, "serverCryptoIDChain.xml"), "r").read())
|
| - cryptoIDKey = parseXMLKey(open(os.path.join(dir, "serverCryptoIDKey.xml"), "r").read(), private=True)
|
| -
|
| - connection = connect()
|
| - connection.handshakeClientCert(cryptoIDChain, cryptoIDKey)
|
| - assert(isinstance(connection.session.serverCertChain, CertChain))
|
| - assert(connection.session.serverCertChain.validate())
|
| - connection.close()
|
| - connection.sock.close()
|
|
|
| - print "Test 17 - mutual cryptoID faults"
|
| - for fault in Fault.clientCertFaults + Fault.genericFaults:
|
| - connection = connect()
|
| - connection.fault = fault
|
| - try:
|
| - connection.handshakeClientCert(cryptoIDChain, cryptoIDKey)
|
| - print " Good Fault %s" % (Fault.faultNames[fault])
|
| - except TLSFaultError, e:
|
| - print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
|
| - badFault = True
|
| - connection.sock.close()
|
| -
|
| - print "Test 18 - good SRP, prepare to resume..."
|
| + print("Test 18 - good SRP, prepare to resume... (plus SNI)")
|
| connection = connect()
|
| - connection.handshakeClientSRP("test", "password")
|
| + connection.handshakeClientSRP("test", "password", serverName=address[0])
|
| + testConnClient(connection)
|
| connection.close()
|
| - connection.sock.close()
|
| session = connection.session
|
|
|
| - print "Test 19 - resumption"
|
| + print("Test 19 - resumption (plus SNI)")
|
| connection = connect()
|
| - connection.handshakeClientSRP("test", "garbage", session=session)
|
| + connection.handshakeClientSRP("test", "garbage", serverName=address[0],
|
| + session=session)
|
| + testConnClient(connection)
|
| #Don't close! -- see below
|
|
|
| - print "Test 20 - invalidated resumption"
|
| + print("Test 20 - invalidated resumption (plus SNI)")
|
| connection.sock.close() #Close the socket without a close_notify!
|
| connection = connect()
|
| try:
|
| - connection.handshakeClientSRP("test", "garbage", session=session)
|
| - assert()
|
| - except TLSRemoteAlert, alert:
|
| + connection.handshakeClientSRP("test", "garbage",
|
| + serverName=address[0], session=session)
|
| + assert(False)
|
| + except TLSRemoteAlert as alert:
|
| if alert.description != AlertDescription.bad_record_mac:
|
| raise
|
| - connection.sock.close()
|
| -
|
| - print "Test 21 - HTTPS test X.509"
|
| + connection.close()
|
| +
|
| + print("Test 21 - HTTPS test X.509")
|
| address = address[0], address[1]+1
|
| if hasattr(socket, "timeout"):
|
| timeoutEx = socket.timeout
|
| @@ -281,136 +270,183 @@ def clientTest(address, dir):
|
| while 1:
|
| try:
|
| time.sleep(2)
|
| - htmlBody = open(os.path.join(dir, "index.html")).read()
|
| + htmlBody = bytearray(open(os.path.join(dir, "index.html")).read(), "utf-8")
|
| fingerprint = None
|
| for y in range(2):
|
| + checker =Checker(x509Fingerprint=fingerprint)
|
| h = HTTPTLSConnection(\
|
| - address[0], address[1], x509Fingerprint=fingerprint)
|
| + address[0], address[1], checker=checker)
|
| for x in range(3):
|
| h.request("GET", "/index.html")
|
| r = h.getresponse()
|
| assert(r.status == 200)
|
| - s = r.read()
|
| - assert(s == htmlBody)
|
| + b = bytearray(r.read())
|
| + assert(b == htmlBody)
|
| fingerprint = h.tlsSession.serverCertChain.getFingerprint()
|
| assert(fingerprint)
|
| time.sleep(2)
|
| break
|
| except timeoutEx:
|
| - print "timeout, retrying..."
|
| + print("timeout, retrying...")
|
| pass
|
|
|
| - if cryptoIDlibLoaded:
|
| - print "Test 21a - HTTPS test SRP+cryptoID"
|
| - address = address[0], address[1]+1
|
| - if hasattr(socket, "timeout"):
|
| - timeoutEx = socket.timeout
|
| - else:
|
| - timeoutEx = socket.error
|
| - while 1:
|
| - try:
|
| - time.sleep(2) #Time to generate key and cryptoID
|
| - htmlBody = open(os.path.join(dir, "index.html")).read()
|
| - fingerprint = None
|
| - protocol = None
|
| - for y in range(2):
|
| - h = HTTPTLSConnection(\
|
| - address[0], address[1],
|
| - username="test", password="password",
|
| - cryptoID=fingerprint, protocol=protocol)
|
| - for x in range(3):
|
| - h.request("GET", "/index.html")
|
| - r = h.getresponse()
|
| - assert(r.status == 200)
|
| - s = r.read()
|
| - assert(s == htmlBody)
|
| - fingerprint = h.tlsSession.serverCertChain.cryptoID
|
| - assert(fingerprint)
|
| - protocol = "urn:whatever"
|
| - time.sleep(2)
|
| - break
|
| - except timeoutEx:
|
| - print "timeout, retrying..."
|
| - pass
|
| -
|
| address = address[0], address[1]+1
|
|
|
| implementations = []
|
| - if cryptlibpyLoaded:
|
| - implementations.append("cryptlib")
|
| if m2cryptoLoaded:
|
| implementations.append("openssl")
|
| if pycryptoLoaded:
|
| implementations.append("pycrypto")
|
| implementations.append("python")
|
|
|
| - print "Test 22 - different ciphers"
|
| + print("Test 22 - different ciphers, TLSv1.0")
|
| for implementation in implementations:
|
| for cipher in ["aes128", "aes256", "rc4"]:
|
|
|
| - print "Test 22:",
|
| + print("Test 22:", end=' ')
|
| connection = connect()
|
|
|
| settings = HandshakeSettings()
|
| settings.cipherNames = [cipher]
|
| settings.cipherImplementations = [implementation, "python"]
|
| - connection.handshakeClientSharedKey("shared", "key", settings=settings)
|
| - print ("%s %s" % (connection.getCipherName(), connection.getCipherImplementation()))
|
| -
|
| - connection.write("hello")
|
| - h = connection.read(min=5, max=5)
|
| - assert(h == "hello")
|
| + settings.minVersion = (3,1)
|
| + settings.maxVersion = (3,1)
|
| + connection.handshakeClientCert(settings=settings)
|
| + testConnClient(connection)
|
| + print("%s %s" % (connection.getCipherName(), connection.getCipherImplementation()))
|
| connection.close()
|
| - connection.sock.close()
|
|
|
| - print "Test 23 - throughput test"
|
| + print("Test 23 - throughput test")
|
| for implementation in implementations:
|
| for cipher in ["aes128", "aes256", "3des", "rc4"]:
|
| - if cipher == "3des" and implementation not in ("openssl", "cryptlib", "pycrypto"):
|
| + if cipher == "3des" and implementation not in ("openssl", "pycrypto"):
|
| continue
|
|
|
| - print "Test 23:",
|
| + print("Test 23:", end=' ')
|
| connection = connect()
|
|
|
| settings = HandshakeSettings()
|
| settings.cipherNames = [cipher]
|
| settings.cipherImplementations = [implementation, "python"]
|
| - connection.handshakeClientSharedKey("shared", "key", settings=settings)
|
| - print ("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation())),
|
| + connection.handshakeClientCert(settings=settings)
|
| + print("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation()), end=' ')
|
|
|
| startTime = time.clock()
|
| - connection.write("hello"*10000)
|
| + connection.write(b"hello"*10000)
|
| h = connection.read(min=50000, max=50000)
|
| stopTime = time.clock()
|
| - print "100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime-startTime))
|
| + if stopTime-startTime:
|
| + print("100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime-startTime)))
|
| + else:
|
| + print("100K exchanged very fast")
|
|
|
| - assert(h == "hello"*10000)
|
| + assert(h == b"hello"*10000)
|
| connection.close()
|
| - connection.sock.close()
|
| +
|
| + print("Test 24.a - Next-Protocol Client Negotiation")
|
| + connection = connect()
|
| + connection.handshakeClientCert(nextProtos=[b"http/1.1"])
|
| + #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
|
| + assert(connection.next_proto == b'http/1.1')
|
| + connection.close()
|
| +
|
| + print("Test 24.b - Next-Protocol Client Negotiation")
|
| + connection = connect()
|
| + connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"])
|
| + #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
|
| + assert(connection.next_proto == b'spdy/2')
|
| + connection.close()
|
| +
|
| + print("Test 24.c - Next-Protocol Client Negotiation")
|
| + connection = connect()
|
| + connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"])
|
| + #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
|
| + assert(connection.next_proto == b'spdy/2')
|
| + connection.close()
|
| +
|
| + print("Test 24.d - Next-Protocol Client Negotiation")
|
| + connection = connect()
|
| + connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"])
|
| + #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
|
| + assert(connection.next_proto == b'spdy/2')
|
| + connection.close()
|
| +
|
| + print("Test 24.e - Next-Protocol Client Negotiation")
|
| + connection = connect()
|
| + connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"])
|
| + #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
|
| + assert(connection.next_proto == b'spdy/3')
|
| + connection.close()
|
|
|
| - print "Test 24 - Internet servers test"
|
| + print("Test 24.f - Next-Protocol Client Negotiation")
|
| + connection = connect()
|
| + connection.handshakeClientCert(nextProtos=[b"http/1.1"])
|
| + #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
|
| + assert(connection.next_proto == b'http/1.1')
|
| + connection.close()
|
| +
|
| + print("Test 24.g - Next-Protocol Client Negotiation")
|
| + connection = connect()
|
| + connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"])
|
| + #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
|
| + assert(connection.next_proto == b'spdy/2')
|
| + connection.close()
|
| +
|
| + print('Test 25 - good standard XMLRPC https client')
|
| + time.sleep(2) # Hack for lack of ability to set timeout here
|
| + address = address[0], address[1]+1
|
| + server = xmlrpclib.Server('https://%s:%s' % address)
|
| + assert server.add(1,2) == 3
|
| + assert server.pow(2,4) == 16
|
| +
|
| + print('Test 26 - good tlslite XMLRPC client')
|
| + transport = XMLRPCTransport(ignoreAbruptClose=True)
|
| + server = xmlrpclib.Server('https://%s:%s' % address, transport)
|
| + assert server.add(1,2) == 3
|
| + assert server.pow(2,4) == 16
|
| +
|
| + print('Test 27 - good XMLRPC ignored protocol')
|
| + server = xmlrpclib.Server('http://%s:%s' % address, transport)
|
| + assert server.add(1,2) == 3
|
| + assert server.pow(2,4) == 16
|
| +
|
| + print("Test 28 - Internet servers test")
|
| try:
|
| i = IMAP4_TLS("cyrus.andrew.cmu.edu")
|
| i.login("anonymous", "anonymous@anonymous.net")
|
| i.logout()
|
| - print "Test 24: IMAP4 good"
|
| + print("Test 28: IMAP4 good")
|
| p = POP3_TLS("pop.gmail.com")
|
| p.quit()
|
| - print "Test 24: POP3 good"
|
| - except socket.error, e:
|
| - print "Non-critical error: socket error trying to reach internet server: ", e
|
| + print("Test 29: POP3 good")
|
| + except socket.error as e:
|
| + print("Non-critical error: socket error trying to reach internet server: ", e)
|
|
|
| if not badFault:
|
| - print "Test succeeded"
|
| + print("Test succeeded")
|
| else:
|
| - print "Test failed"
|
| + print("Test failed")
|
| +
|
| +
|
|
|
| +def testConnServer(connection):
|
| + count = 0
|
| + while 1:
|
| + s = connection.read()
|
| + count += len(s)
|
| + if len(s) == 0:
|
| + break
|
| + connection.write(s)
|
| + if count == 1111:
|
| + break
|
| +
|
| +def serverTestCmd(argv):
|
|
|
| -def serverTest(address, dir):
|
| + address = argv[0]
|
| + dir = argv[1]
|
| +
|
| #Split address into hostname/port tuple
|
| address = address.split(":")
|
| - if len(address)==1:
|
| - address.append("4443")
|
| address = ( address[0], int(address[1]) )
|
|
|
| #Connect to server
|
| @@ -421,29 +457,67 @@ def serverTest(address, dir):
|
| def connect():
|
| return TLSConnection(lsock.accept()[0])
|
|
|
| - print "Test 1 - good shared key"
|
| - sharedKeyDB = SharedKeyDB()
|
| - sharedKeyDB["shared"] = "key"
|
| - sharedKeyDB["shared2"] = "key2"
|
| + x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read())
|
| + x509Chain = X509CertChain([x509Cert])
|
| + s = open(os.path.join(dir, "serverX509Key.pem")).read()
|
| + x509Key = parsePEMKey(s, private=True)
|
| +
|
| + print("Test 0 - Anonymous server handshake")
|
| + connection = connect()
|
| + connection.handshakeServer(anon=True)
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + print("Test 1 - good X.509")
|
| connection = connect()
|
| - connection.handshakeServer(sharedKeyDB=sharedKeyDB)
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
|
| + assert(connection.session.serverName == address[0])
|
| + testConnServer(connection)
|
| connection.close()
|
| - connection.sock.close()
|
|
|
| - print "Test 2 - shared key faults"
|
| - for fault in Fault.clientSharedKeyFaults + Fault.genericFaults:
|
| + print("Test 1.a - good X.509, SSL v3")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + settings.minVersion = (3,0)
|
| + settings.maxVersion = (3,0)
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + print("Test 1.b - good X.509, RC4-MD5")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + settings.macNames = ["sha", "md5"]
|
| + settings.cipherNames = ["rc4"]
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + if tackpyLoaded:
|
| + tack = Tack.createFromPem(open("./TACK1.pem", "rU").read())
|
| + tackUnrelated = Tack.createFromPem(open("./TACKunrelated.pem", "rU").read())
|
| +
|
| + settings = HandshakeSettings()
|
| + settings.useExperimentalTackExtension = True
|
| +
|
| + print("Test 2.a - good X.509, TACK")
|
| connection = connect()
|
| - connection.fault = fault
|
| - try:
|
| - connection.handshakeServer(sharedKeyDB=sharedKeyDB)
|
| - assert()
|
| - except:
|
| - pass
|
| - connection.sock.close()
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + tacks=[tack], activationFlags=1, settings=settings)
|
| + testConnServer(connection)
|
| + connection.close()
|
|
|
| - print "Test 3 - good SRP"
|
| - #verifierDB = tlslite.VerifierDB(os.path.join(dir, "verifierDB"))
|
| - #verifierDB.open()
|
| + print("Test 2.b - good X.509, TACK unrelated to cert chain")
|
| + connection = connect()
|
| + try:
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + tacks=[tackUnrelated], settings=settings)
|
| + assert(False)
|
| + except TLSRemoteAlert as alert:
|
| + if alert.description != AlertDescription.illegal_parameter:
|
| + raise
|
| +
|
| + print("Test 3 - good SRP")
|
| verifierDB = VerifierDB()
|
| verifierDB.create()
|
| entry = VerifierDB.makeVerifier("test", "password", 1536)
|
| @@ -451,10 +525,10 @@ def serverTest(address, dir):
|
|
|
| connection = connect()
|
| connection.handshakeServer(verifierDB=verifierDB)
|
| + testConnServer(connection)
|
| connection.close()
|
| - connection.sock.close()
|
|
|
| - print "Test 4 - SRP faults"
|
| + print("Test 4 - SRP faults")
|
| for fault in Fault.clientSrpFaults + Fault.genericFaults:
|
| connection = connect()
|
| connection.fault = fault
|
| @@ -463,27 +537,16 @@ def serverTest(address, dir):
|
| assert()
|
| except:
|
| pass
|
| - connection.sock.close()
|
| -
|
| - print "Test 5 - good SRP: unknown_srp_username idiom"
|
| - connection = connect()
|
| - connection.handshakeServer(verifierDB=verifierDB)
|
| - connection.close()
|
| - connection.sock.close()
|
| -
|
| - print "Test 6 - good SRP: with X.509 cert"
|
| - x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read())
|
| - x509Chain = X509CertChain([x509Cert])
|
| - s = open(os.path.join(dir, "serverX509Key.pem")).read()
|
| - x509Key = parsePEMKey(s, private=True)
|
| + connection.close()
|
|
|
| + print("Test 6 - good SRP: with X.509 cert")
|
| connection = connect()
|
| connection.handshakeServer(verifierDB=verifierDB, \
|
| certChain=x509Chain, privateKey=x509Key)
|
| + testConnServer(connection)
|
| connection.close()
|
| - connection.sock.close()
|
|
|
| - print "Test 7 - X.509 with SRP faults"
|
| + print("Test 7 - X.509 with SRP faults")
|
| for fault in Fault.clientSrpFaults + Fault.genericFaults:
|
| connection = connect()
|
| connection.fault = fault
|
| @@ -493,46 +556,9 @@ def serverTest(address, dir):
|
| assert()
|
| except:
|
| pass
|
| - connection.sock.close()
|
| -
|
| - if cryptoIDlibLoaded:
|
| - print "Test 8 - good SRP: with cryptoID certs"
|
| - cryptoIDChain = CertChain().parse(open(os.path.join(dir, "serverCryptoIDChain.xml"), "r").read())
|
| - cryptoIDKey = parseXMLKey(open(os.path.join(dir, "serverCryptoIDKey.xml"), "r").read(), private=True)
|
| - connection = connect()
|
| - connection.handshakeServer(verifierDB=verifierDB, \
|
| - certChain=cryptoIDChain, privateKey=cryptoIDKey)
|
| connection.close()
|
| - connection.sock.close()
|
|
|
| - print "Test 9 - cryptoID with SRP faults"
|
| - for fault in Fault.clientSrpFaults + Fault.genericFaults:
|
| - connection = connect()
|
| - connection.fault = fault
|
| - try:
|
| - connection.handshakeServer(verifierDB=verifierDB, \
|
| - certChain=cryptoIDChain, privateKey=cryptoIDKey)
|
| - assert()
|
| - except:
|
| - pass
|
| - connection.sock.close()
|
| -
|
| - print "Test 10 - good X.509"
|
| - connection = connect()
|
| - connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
|
| - connection.close()
|
| - connection.sock.close()
|
| -
|
| - print "Test 10.a - good X.509, SSL v3"
|
| - connection = connect()
|
| - settings = HandshakeSettings()
|
| - settings.minVersion = (3,0)
|
| - settings.maxVersion = (3,0)
|
| - connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
|
| - connection.close()
|
| - connection.sock.close()
|
| -
|
| - print "Test 11 - X.509 faults"
|
| + print("Test 11 - X.509 faults")
|
| for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
|
| connection = connect()
|
| connection.fault = fault
|
| @@ -541,44 +567,26 @@ def serverTest(address, dir):
|
| assert()
|
| except:
|
| pass
|
| - connection.sock.close()
|
| -
|
| - if cryptoIDlibLoaded:
|
| - print "Test 12 - good cryptoID"
|
| - connection = connect()
|
| - connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey)
|
| connection.close()
|
| - connection.sock.close()
|
|
|
| - print "Test 13 - cryptoID faults"
|
| - for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
|
| - connection = connect()
|
| - connection.fault = fault
|
| - try:
|
| - connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey)
|
| - assert()
|
| - except:
|
| - pass
|
| - connection.sock.close()
|
| -
|
| - print "Test 14 - good mutual X.509"
|
| + print("Test 14 - good mutual X.509")
|
| connection = connect()
|
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
|
| + testConnServer(connection)
|
| assert(isinstance(connection.session.serverCertChain, X509CertChain))
|
| connection.close()
|
| - connection.sock.close()
|
|
|
| - print "Test 14a - good mutual X.509, SSLv3"
|
| + print("Test 14a - good mutual X.509, SSLv3")
|
| connection = connect()
|
| settings = HandshakeSettings()
|
| settings.minVersion = (3,0)
|
| settings.maxVersion = (3,0)
|
| connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings)
|
| + testConnServer(connection)
|
| assert(isinstance(connection.session.serverCertChain, X509CertChain))
|
| connection.close()
|
| - connection.sock.close()
|
|
|
| - print "Test 15 - mutual X.509 faults"
|
| + print("Test 15 - mutual X.509 faults")
|
| for fault in Fault.clientCertFaults + Fault.genericFaults:
|
| connection = connect()
|
| connection.fault = fault
|
| @@ -587,488 +595,199 @@ def serverTest(address, dir):
|
| assert()
|
| except:
|
| pass
|
| - connection.sock.close()
|
| -
|
| - if cryptoIDlibLoaded:
|
| - print "Test 16 - good mutual cryptoID"
|
| - connection = connect()
|
| - connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey, reqCert=True)
|
| - assert(isinstance(connection.session.serverCertChain, CertChain))
|
| - assert(connection.session.serverCertChain.validate())
|
| connection.close()
|
| - connection.sock.close()
|
|
|
| - print "Test 17 - mutual cryptoID faults"
|
| - for fault in Fault.clientCertFaults + Fault.genericFaults:
|
| - connection = connect()
|
| - connection.fault = fault
|
| - try:
|
| - connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey, reqCert=True)
|
| - assert()
|
| - except:
|
| - pass
|
| - connection.sock.close()
|
| -
|
| - print "Test 18 - good SRP, prepare to resume"
|
| + print("Test 18 - good SRP, prepare to resume")
|
| sessionCache = SessionCache()
|
| connection = connect()
|
| connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
|
| + assert(connection.session.serverName == address[0])
|
| + testConnServer(connection)
|
| connection.close()
|
| - connection.sock.close()
|
|
|
| - print "Test 19 - resumption"
|
| + print("Test 19 - resumption")
|
| connection = connect()
|
| connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
|
| + assert(connection.session.serverName == address[0])
|
| + testConnServer(connection)
|
| #Don't close! -- see next test
|
|
|
| - print "Test 20 - invalidated resumption"
|
| + print("Test 20 - invalidated resumption")
|
| try:
|
| connection.read(min=1, max=1)
|
| assert() #Client is going to close the socket without a close_notify
|
| - except TLSAbruptCloseError, e:
|
| + except TLSAbruptCloseError as e:
|
| pass
|
| connection = connect()
|
| try:
|
| connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
|
| - except TLSLocalAlert, alert:
|
| + except TLSLocalAlert as alert:
|
| if alert.description != AlertDescription.bad_record_mac:
|
| raise
|
| - connection.sock.close()
|
| + connection.close()
|
|
|
| - print "Test 21 - HTTPS test X.509"
|
| + print("Test 21 - HTTPS test X.509")
|
|
|
| #Close the current listening socket
|
| lsock.close()
|
|
|
| #Create and run an HTTP Server using TLSSocketServerMixIn
|
| class MyHTTPServer(TLSSocketServerMixIn,
|
| - BaseHTTPServer.HTTPServer):
|
| + HTTPServer):
|
| def handshake(self, tlsConnection):
|
| tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
|
| return True
|
| cd = os.getcwd()
|
| os.chdir(dir)
|
| address = address[0], address[1]+1
|
| - httpd = MyHTTPServer(address, SimpleHTTPServer.SimpleHTTPRequestHandler)
|
| + httpd = MyHTTPServer(address, SimpleHTTPRequestHandler)
|
| for x in range(6):
|
| httpd.handle_request()
|
| httpd.server_close()
|
| cd = os.chdir(cd)
|
|
|
| - if cryptoIDlibLoaded:
|
| - print "Test 21a - HTTPS test SRP+cryptoID"
|
| -
|
| - #Create and run an HTTP Server using TLSSocketServerMixIn
|
| - class MyHTTPServer(TLSSocketServerMixIn,
|
| - BaseHTTPServer.HTTPServer):
|
| - def handshake(self, tlsConnection):
|
| - tlsConnection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey,
|
| - verifierDB=verifierDB)
|
| - return True
|
| - cd = os.getcwd()
|
| - os.chdir(dir)
|
| - address = address[0], address[1]+1
|
| - httpd = MyHTTPServer(address, SimpleHTTPServer.SimpleHTTPRequestHandler)
|
| - for x in range(6):
|
| - httpd.handle_request()
|
| - httpd.server_close()
|
| - cd = os.chdir(cd)
|
| -
|
| #Re-connect the listening socket
|
| lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
| address = address[0], address[1]+1
|
| lsock.bind(address)
|
| lsock.listen(5)
|
|
|
| - def connect():
|
| - return TLSConnection(lsock.accept()[0])
|
| -
|
| implementations = []
|
| - if cryptlibpyLoaded:
|
| - implementations.append("cryptlib")
|
| if m2cryptoLoaded:
|
| implementations.append("openssl")
|
| if pycryptoLoaded:
|
| implementations.append("pycrypto")
|
| implementations.append("python")
|
|
|
| - print "Test 22 - different ciphers"
|
| + print("Test 22 - different ciphers")
|
| for implementation in ["python"] * len(implementations):
|
| for cipher in ["aes128", "aes256", "rc4"]:
|
|
|
| - print "Test 22:",
|
| + print("Test 22:", end=' ')
|
| connection = connect()
|
|
|
| settings = HandshakeSettings()
|
| settings.cipherNames = [cipher]
|
| settings.cipherImplementations = [implementation, "python"]
|
|
|
| - connection.handshakeServer(sharedKeyDB=sharedKeyDB, settings=settings)
|
| - print connection.getCipherName(), connection.getCipherImplementation()
|
| - h = connection.read(min=5, max=5)
|
| - assert(h == "hello")
|
| - connection.write(h)
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + settings=settings)
|
| + print(connection.getCipherName(), connection.getCipherImplementation())
|
| + testConnServer(connection)
|
| connection.close()
|
| - connection.sock.close()
|
|
|
| - print "Test 23 - throughput test"
|
| + print("Test 23 - throughput test")
|
| for implementation in implementations:
|
| for cipher in ["aes128", "aes256", "3des", "rc4"]:
|
| - if cipher == "3des" and implementation not in ("openssl", "cryptlib", "pycrypto"):
|
| + if cipher == "3des" and implementation not in ("openssl", "pycrypto"):
|
| continue
|
|
|
| - print "Test 23:",
|
| + print("Test 23:", end=' ')
|
| connection = connect()
|
|
|
| settings = HandshakeSettings()
|
| settings.cipherNames = [cipher]
|
| settings.cipherImplementations = [implementation, "python"]
|
|
|
| - connection.handshakeServer(sharedKeyDB=sharedKeyDB, settings=settings)
|
| - print connection.getCipherName(), connection.getCipherImplementation()
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + settings=settings)
|
| + print(connection.getCipherName(), connection.getCipherImplementation())
|
| h = connection.read(min=50000, max=50000)
|
| - assert(h == "hello"*10000)
|
| + assert(h == b"hello"*10000)
|
| connection.write(h)
|
| connection.close()
|
| - connection.sock.close()
|
| -
|
| - print "Test succeeded"
|
| -
|
|
|
| + print("Test 24.a - Next-Protocol Server Negotiation")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + settings=settings, nextProtos=[b"http/1.1"])
|
| + testConnServer(connection)
|
| + connection.close()
|
|
|
| + print("Test 24.b - Next-Protocol Server Negotiation")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + settings=settings, nextProtos=[b"spdy/2", b"http/1.1"])
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + print("Test 24.c - Next-Protocol Server Negotiation")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + settings=settings, nextProtos=[b"http/1.1", b"spdy/2"])
|
| + testConnServer(connection)
|
| + connection.close()
|
|
|
| + print("Test 24.d - Next-Protocol Server Negotiation")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + settings=settings, nextProtos=[b"spdy/2", b"http/1.1"])
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + print("Test 24.e - Next-Protocol Server Negotiation")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + settings=settings, nextProtos=[b"http/1.1", b"spdy/2", b"spdy/3"])
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + print("Test 24.f - Next-Protocol Server Negotiation")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + settings=settings, nextProtos=[b"spdy/3", b"spdy/2"])
|
| + testConnServer(connection)
|
| + connection.close()
|
| +
|
| + print("Test 24.g - Next-Protocol Server Negotiation")
|
| + connection = connect()
|
| + settings = HandshakeSettings()
|
| + connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
|
| + settings=settings, nextProtos=[])
|
| + testConnServer(connection)
|
| + connection.close()
|
|
|
| + print("Tests 25-27 - XMLRPXC server")
|
| + address = address[0], address[1]+1
|
| + class Server(TLSXMLRPCServer):
|
|
|
| -
|
| -
|
| -
|
| -
|
| -
|
| -
|
| -if len(sys.argv) == 1 or (len(sys.argv)==2 and sys.argv[1].lower().endswith("help")):
|
| - print ""
|
| - print "Version: 0.3.8"
|
| - print ""
|
| - print "RNG: %s" % prngName
|
| - print ""
|
| - print "Modules:"
|
| - if cryptlibpyLoaded:
|
| - print " cryptlib_py : Loaded"
|
| - else:
|
| - print " cryptlib_py : Not Loaded"
|
| - if m2cryptoLoaded:
|
| - print " M2Crypto : Loaded"
|
| - else:
|
| - print " M2Crypto : Not Loaded"
|
| - if pycryptoLoaded:
|
| - print " pycrypto : Loaded"
|
| - else:
|
| - print " pycrypto : Not Loaded"
|
| - if gmpyLoaded:
|
| - print " GMPY : Loaded"
|
| - else:
|
| - print " GMPY : Not Loaded"
|
| - if cryptoIDlibLoaded:
|
| - print " cryptoIDlib : Loaded"
|
| - else:
|
| - print " cryptoIDlib : Not Loaded"
|
| - print ""
|
| - print "Commands:"
|
| - print ""
|
| - print " clientcert <server> [<chain> <key>]"
|
| - print " clientsharedkey <server> <user> <pass>"
|
| - print " clientsrp <server> <user> <pass>"
|
| - print " clienttest <server> <dir>"
|
| - print ""
|
| - print " serversrp <server> <verifierDB>"
|
| - print " servercert <server> <chain> <key> [req]"
|
| - print " serversrpcert <server> <verifierDB> <chain> <key>"
|
| - print " serversharedkey <server> <sharedkeyDB>"
|
| - print " servertest <server> <dir>"
|
| - sys.exit()
|
| -
|
| -cmd = sys.argv[1].lower()
|
| -
|
| -class Args:
|
| - def __init__(self, argv):
|
| - self.argv = argv
|
| - def get(self, index):
|
| - if len(self.argv)<=index:
|
| - raise SyntaxError("Not enough arguments")
|
| - return self.argv[index]
|
| - def getLast(self, index):
|
| - if len(self.argv)>index+1:
|
| - raise SyntaxError("Too many arguments")
|
| - return self.get(index)
|
| -
|
| -args = Args(sys.argv)
|
| -
|
| -def reformatDocString(s):
|
| - lines = s.splitlines()
|
| - newLines = []
|
| - for line in lines:
|
| - newLines.append(" " + line.strip())
|
| - return "\n".join(newLines)
|
| -
|
| -try:
|
| - if cmd == "clienttest":
|
| - address = args.get(2)
|
| - dir = args.getLast(3)
|
| - clientTest(address, dir)
|
| - sys.exit()
|
| -
|
| - elif cmd.startswith("client"):
|
| - address = args.get(2)
|
| -
|
| - #Split address into hostname/port tuple
|
| - address = address.split(":")
|
| - if len(address)==1:
|
| - address.append("4443")
|
| - address = ( address[0], int(address[1]) )
|
| -
|
| - def connect():
|
| - #Connect to server
|
| - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
| - if hasattr(sock, "settimeout"):
|
| - sock.settimeout(5)
|
| - sock.connect(address)
|
| -
|
| - #Instantiate TLSConnections
|
| - return TLSConnection(sock)
|
| -
|
| - try:
|
| - if cmd == "clientsrp":
|
| - username = args.get(3)
|
| - password = args.getLast(4)
|
| - connection = connect()
|
| - start = time.clock()
|
| - connection.handshakeClientSRP(username, password)
|
| - elif cmd == "clientsharedkey":
|
| - username = args.get(3)
|
| - password = args.getLast(4)
|
| - connection = connect()
|
| - start = time.clock()
|
| - connection.handshakeClientSharedKey(username, password)
|
| - elif cmd == "clientcert":
|
| - certChain = None
|
| - privateKey = None
|
| - if len(sys.argv) > 3:
|
| - certFilename = args.get(3)
|
| - keyFilename = args.getLast(4)
|
| -
|
| - s1 = open(certFilename, "rb").read()
|
| - s2 = open(keyFilename, "rb").read()
|
| -
|
| - #Try to create cryptoID cert chain
|
| - if cryptoIDlibLoaded:
|
| - try:
|
| - certChain = CertChain().parse(s1)
|
| - privateKey = parsePrivateKey(s2)
|
| - except:
|
| - certChain = None
|
| - privateKey = None
|
| -
|
| - #Try to create X.509 cert chain
|
| - if not certChain:
|
| - x509 = X509()
|
| - x509.parse(s1)
|
| - certChain = X509CertChain([x509])
|
| - privateKey = parsePrivateKey(s2)
|
| -
|
| - connection = connect()
|
| - start = time.clock()
|
| - connection.handshakeClientCert(certChain, privateKey)
|
| - else:
|
| - raise SyntaxError("Unknown command")
|
| -
|
| - except TLSLocalAlert, a:
|
| - if a.description == AlertDescription.bad_record_mac:
|
| - if cmd == "clientsharedkey":
|
| - print "Bad sharedkey password"
|
| - else:
|
| - raise
|
| - elif a.description == AlertDescription.user_canceled:
|
| - print str(a)
|
| - else:
|
| - raise
|
| - sys.exit()
|
| - except TLSRemoteAlert, a:
|
| - if a.description == AlertDescription.unknown_srp_username:
|
| - if cmd == "clientsrp":
|
| - print "Unknown username"
|
| - else:
|
| - raise
|
| - elif a.description == AlertDescription.bad_record_mac:
|
| - if cmd == "clientsrp":
|
| - print "Bad username or password"
|
| - else:
|
| - raise
|
| - elif a.description == AlertDescription.handshake_failure:
|
| - print "Unable to negotiate mutually acceptable parameters"
|
| - else:
|
| - raise
|
| - sys.exit()
|
| -
|
| - stop = time.clock()
|
| - print "Handshake success"
|
| - print " Handshake time: %.4f seconds" % (stop - start)
|
| - print " Version: %s.%s" % connection.version
|
| - print " Cipher: %s %s" % (connection.getCipherName(), connection.getCipherImplementation())
|
| - if connection.session.srpUsername:
|
| - print " Client SRP username: %s" % connection.session.srpUsername
|
| - if connection.session.sharedKeyUsername:
|
| - print " Client shared key username: %s" % connection.session.sharedKeyUsername
|
| - if connection.session.clientCertChain:
|
| - print " Client fingerprint: %s" % connection.session.clientCertChain.getFingerprint()
|
| - if connection.session.serverCertChain:
|
| - print " Server fingerprint: %s" % connection.session.serverCertChain.getFingerprint()
|
| - connection.close()
|
| - connection.sock.close()
|
| -
|
| - elif cmd.startswith("server"):
|
| - address = args.get(2)
|
| -
|
| - #Split address into hostname/port tuple
|
| - address = address.split(":")
|
| - if len(address)==1:
|
| - address.append("4443")
|
| - address = ( address[0], int(address[1]) )
|
| -
|
| - verifierDBFilename = None
|
| - sharedKeyDBFilename = None
|
| - certFilename = None
|
| - keyFilename = None
|
| - sharedKeyDB = None
|
| - reqCert = False
|
| -
|
| - if cmd == "serversrp":
|
| - verifierDBFilename = args.getLast(3)
|
| - elif cmd == "servercert":
|
| - certFilename = args.get(3)
|
| - keyFilename = args.get(4)
|
| - if len(sys.argv)>=6:
|
| - req = args.getLast(5)
|
| - if req.lower() != "req":
|
| - raise SyntaxError()
|
| - reqCert = True
|
| - elif cmd == "serversrpcert":
|
| - verifierDBFilename = args.get(3)
|
| - certFilename = args.get(4)
|
| - keyFilename = args.getLast(5)
|
| - elif cmd == "serversharedkey":
|
| - sharedKeyDBFilename = args.getLast(3)
|
| - elif cmd == "servertest":
|
| - address = args.get(2)
|
| - dir = args.getLast(3)
|
| - serverTest(address, dir)
|
| - sys.exit()
|
| -
|
| - verifierDB = None
|
| - if verifierDBFilename:
|
| - verifierDB = VerifierDB(verifierDBFilename)
|
| - verifierDB.open()
|
| -
|
| - sharedKeyDB = None
|
| - if sharedKeyDBFilename:
|
| - sharedKeyDB = SharedKeyDB(sharedKeyDBFilename)
|
| - sharedKeyDB.open()
|
| -
|
| - certChain = None
|
| - privateKey = None
|
| - if certFilename:
|
| - s1 = open(certFilename, "rb").read()
|
| - s2 = open(keyFilename, "rb").read()
|
| -
|
| - #Try to create cryptoID cert chain
|
| - if cryptoIDlibLoaded:
|
| - try:
|
| - certChain = CertChain().parse(s1)
|
| - privateKey = parsePrivateKey(s2)
|
| - except:
|
| - certChain = None
|
| - privateKey = None
|
| -
|
| - #Try to create X.509 cert chain
|
| - if not certChain:
|
| - x509 = X509()
|
| - x509.parse(s1)
|
| - certChain = X509CertChain([x509])
|
| - privateKey = parsePrivateKey(s2)
|
| -
|
| -
|
| -
|
| - #Create handler function - performs handshake, then echos all bytes received
|
| - def handler(sock):
|
| - try:
|
| - connection = TLSConnection(sock)
|
| - settings = HandshakeSettings()
|
| - connection.handshakeServer(sharedKeyDB=sharedKeyDB, verifierDB=verifierDB, \
|
| - certChain=certChain, privateKey=privateKey, \
|
| - reqCert=reqCert, settings=settings)
|
| - print "Handshake success"
|
| - print " Version: %s.%s" % connection.version
|
| - print " Cipher: %s %s" % (connection.getCipherName(), connection.getCipherImplementation())
|
| - if connection.session.srpUsername:
|
| - print " Client SRP username: %s" % connection.session.srpUsername
|
| - if connection.session.sharedKeyUsername:
|
| - print " Client shared key username: %s" % connection.session.sharedKeyUsername
|
| - if connection.session.clientCertChain:
|
| - print " Client fingerprint: %s" % connection.session.clientCertChain.getFingerprint()
|
| - if connection.session.serverCertChain:
|
| - print " Server fingerprint: %s" % connection.session.serverCertChain.getFingerprint()
|
| -
|
| - s = ""
|
| - while 1:
|
| - newS = connection.read()
|
| - if not newS:
|
| - break
|
| - s += newS
|
| - if s[-1]=='\n':
|
| - connection.write(s)
|
| - s = ""
|
| - except TLSLocalAlert, a:
|
| - if a.description == AlertDescription.unknown_srp_username:
|
| - print "Unknown SRP username"
|
| - elif a.description == AlertDescription.bad_record_mac:
|
| - if cmd == "serversrp" or cmd == "serversrpcert":
|
| - print "Bad SRP password for:", connection.allegedSrpUsername
|
| - else:
|
| - raise
|
| - elif a.description == AlertDescription.handshake_failure:
|
| - print "Unable to negotiate mutually acceptable parameters"
|
| - else:
|
| - raise
|
| - except TLSRemoteAlert, a:
|
| - if a.description == AlertDescription.bad_record_mac:
|
| - if cmd == "serversharedkey":
|
| - print "Bad sharedkey password for:", connection.allegedSharedKeyUsername
|
| - else:
|
| - raise
|
| - elif a.description == AlertDescription.user_canceled:
|
| - print "Handshake cancelled"
|
| - elif a.description == AlertDescription.handshake_failure:
|
| - print "Unable to negotiate mutually acceptable parameters"
|
| - elif a.description == AlertDescription.close_notify:
|
| - pass
|
| - else:
|
| - raise
|
| -
|
| - #Run multi-threaded server
|
| - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
| - sock.bind(address)
|
| - sock.listen(5)
|
| - while 1:
|
| - (newsock, cliAddress) = sock.accept()
|
| - thread.start_new_thread(handler, (newsock,))
|
| -
|
| -
|
| + def handshake(self, tlsConnection):
|
| + try:
|
| + tlsConnection.handshakeServer(certChain=x509Chain,
|
| + privateKey=x509Key,
|
| + sessionCache=sessionCache)
|
| + tlsConnection.ignoreAbruptClose = True
|
| + return True
|
| + except TLSError as error:
|
| + print("Handshake failure:", str(error))
|
| + return False
|
| +
|
| + class MyFuncs:
|
| + def pow(self, x, y): return pow(x, y)
|
| + def add(self, x, y): return x + y
|
| +
|
| + server = Server(address)
|
| + server.register_instance(MyFuncs())
|
| + #sa = server.socket.getsockname()
|
| + #print "Serving HTTPS on", sa[0], "port", sa[1]
|
| + for i in range(6):
|
| + server.handle_request()
|
| +
|
| + print("Test succeeded")
|
| +
|
| +
|
| +if __name__ == '__main__':
|
| + if len(sys.argv) < 2:
|
| + printUsage("Missing command")
|
| + elif sys.argv[1] == "client"[:len(sys.argv[1])]:
|
| + clientTestCmd(sys.argv[2:])
|
| + elif sys.argv[1] == "server"[:len(sys.argv[1])]:
|
| + serverTestCmd(sys.argv[2:])
|
| else:
|
| - print "Bad command: '%s'" % cmd
|
| -except TLSRemoteAlert, a:
|
| - print str(a)
|
| - raise
|
| -
|
| -
|
| -
|
| -
|
| -
|
| + printUsage("Unknown command: %s" % sys.argv[1])
|
|
|