Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(136)

Unified Diff: third_party/tlslite/tests/tlstest.py

Issue 210323002: Update tlslite to 0.4.6. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Rietveld, please behave Created 6 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/tlslite/tests/tlstest.py
diff --git a/third_party/tlslite/scripts/tls.py b/third_party/tlslite/tests/tlstest.py
old mode 100644
new mode 100755
similarity index 29%
copy from third_party/tlslite/scripts/tls.py
copy to third_party/tlslite/tests/tlstest.py
index fa2c663f3b06bff1a2567e240e0eb5a110f2416b..fa1b13fa65d3248748de662d75a136ecedf39cba
--- a/third_party/tlslite/scripts/tls.py
+++ b/third_party/tlslite/tests/tlstest.py
@@ -1,47 +1,87 @@
-#! python
-
+#!/usr/bin/env python
+
+# Authors:
+# Trevor Perrin
+# Kees Bos - Added tests for XML-RPC
+# Dimitris Moraitis - Anon ciphersuites
+# Marcelo Fernandez - Added test for NPN
+# Martin von Loewis - python 3 port
+
+#
+# See the LICENSE file for legal information regarding use of this file.
+from __future__ import print_function
import sys
import os
import os.path
import socket
-import thread
import time
-import httplib
-import BaseHTTPServer
-import SimpleHTTPServer
-
-
+import getopt
try:
- from cryptoIDlib.api import *
- cryptoIDlibLoaded = True
-except:
- cryptoIDlibLoaded = False
-
-if __name__ != "__main__":
- raise "This must be run as a command, not used as a module!"
-
-#import tlslite
-#from tlslite.constants import AlertDescription, Fault
-
-#from tlslite.utils.jython_compat import formatExceptionTrace
-#from tlslite.X509 import X509, X509CertChain
-
-from tlslite.api import *
-
-def parsePrivateKey(s):
- try:
- return parsePEMKey(s, private=True)
- except Exception, e:
- print e
- return parseXMLKey(s, private=True)
+ from BaseHTTPServer import HTTPServer
+ from SimpleHTTPServer import SimpleHTTPRequestHandler
+except ImportError:
+ from http.server import HTTPServer, SimpleHTTPRequestHandler
+
+from tlslite import TLSConnection, Fault, HandshakeSettings, \
+ X509, X509CertChain, IMAP4_TLS, VerifierDB, Session, SessionCache, \
+ parsePEMKey, constants, \
+ AlertDescription, HTTPTLSConnection, TLSSocketServerMixIn, \
+ POP3_TLS, m2cryptoLoaded, pycryptoLoaded, gmpyLoaded, tackpyLoaded, \
+ Checker, __version__
+
+from tlslite.errors import *
+from tlslite.utils.cryptomath import prngName
+try:
+ import xmlrpclib
+except ImportError:
+ # Python 3
+ from xmlrpc import client as xmlrpclib
+from tlslite import *
+try:
+ from tack.structures.Tack import Tack
+
+except ImportError:
+ pass
-def clientTest(address, dir):
+def printUsage(s=None):
+ if m2cryptoLoaded:
+ crypto = "M2Crypto/OpenSSL"
+ else:
+ crypto = "Python crypto"
+ if s:
+ print("ERROR: %s" % s)
+ print("""\ntls.py version %s (using %s)
+
+Commands:
+ server HOST:PORT DIRECTORY
+
+ client HOST:PORT DIRECTORY
+""" % (__version__, crypto))
+ sys.exit(-1)
+
+
+def testConnClient(conn):
+ b1 = os.urandom(1)
+ b10 = os.urandom(10)
+ b100 = os.urandom(100)
+ b1000 = os.urandom(1000)
+ conn.write(b1)
+ conn.write(b10)
+ conn.write(b100)
+ conn.write(b1000)
+ assert(conn.read(min=1, max=1) == b1)
+ assert(conn.read(min=10, max=10) == b10)
+ assert(conn.read(min=100, max=100) == b100)
+ assert(conn.read(min=1000, max=1000) == b1000)
+
+def clientTestCmd(argv):
+
+ address = argv[0]
+ dir = argv[1]
#Split address into hostname/port tuple
address = address.split(":")
- if len(address)==1:
- address.append("4443")
address = ( address[0], int(address[1]) )
def connect():
@@ -56,142 +96,113 @@ def clientTest(address, dir):
badFault = False
- print "Test 1 - good shared key"
+ print("Test 0 - anonymous handshake")
+ connection = connect()
+ connection.handshakeClientAnonymous()
+ testConnClient(connection)
+ connection.close()
+
+ print("Test 1 - good X509 (plus SNI)")
+ connection = connect()
+ connection.handshakeClientCert(serverName=address[0])
+ testConnClient(connection)
+ assert(isinstance(connection.session.serverCertChain, X509CertChain))
+ assert(connection.session.serverName == address[0])
+ connection.close()
+
+ print("Test 1.a - good X509, SSLv3")
+ connection = connect()
+ settings = HandshakeSettings()
+ settings.minVersion = (3,0)
+ settings.maxVersion = (3,0)
+ connection.handshakeClientCert(settings=settings)
+ testConnClient(connection)
+ assert(isinstance(connection.session.serverCertChain, X509CertChain))
+ connection.close()
+
+ print("Test 1.b - good X509, RC4-MD5")
connection = connect()
- connection.handshakeClientSharedKey("shared", "key")
+ settings = HandshakeSettings()
+ settings.macNames = ["md5"]
+ connection.handshakeClientCert(settings=settings)
+ testConnClient(connection)
+ assert(isinstance(connection.session.serverCertChain, X509CertChain))
+ assert(connection.session.cipherSuite == constants.CipherSuite.TLS_RSA_WITH_RC4_128_MD5)
connection.close()
- connection.sock.close()
- print "Test 2 - shared key faults"
- for fault in Fault.clientSharedKeyFaults + Fault.genericFaults:
+ if tackpyLoaded:
+
+ settings = HandshakeSettings()
+ settings.useExperimentalTackExtension = True
+
+ print("Test 2.a - good X.509, TACK")
+ connection = connect()
+ connection.handshakeClientCert(settings=settings)
+ assert(connection.session.tackExt.tacks[0].getTackId() == "rrted.ptvtl.d2uiq.ox2xe.w4ss3")
+ assert(connection.session.tackExt.activation_flags == 1)
+ testConnClient(connection)
+ connection.close()
+
+ print("Test 2.b - good X.509, TACK unrelated to cert chain")
connection = connect()
- connection.fault = fault
try:
- connection.handshakeClientSharedKey("shared", "key")
- print " Good Fault %s" % (Fault.faultNames[fault])
- except TLSFaultError, e:
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
- badFault = True
- connection.sock.close()
+ connection.handshakeClientCert(settings=settings)
+ assert(False)
+ except TLSLocalAlert as alert:
+ if alert.description != AlertDescription.illegal_parameter:
+ raise
+ connection.close()
- print "Test 3 - good SRP"
+ print("Test 3 - good SRP")
connection = connect()
connection.handshakeClientSRP("test", "password")
+ testConnClient(connection)
connection.close()
- print "Test 4 - SRP faults"
+ print("Test 4 - SRP faults")
for fault in Fault.clientSrpFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
try:
connection.handshakeClientSRP("test", "password")
- print " Good Fault %s" % (Fault.faultNames[fault])
- except TLSFaultError, e:
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
+ print(" Good Fault %s" % (Fault.faultNames[fault]))
+ except TLSFaultError as e:
+ print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
badFault = True
- connection.sock.close()
-
- print "Test 5 - good SRP: unknown_srp_username idiom"
- def srpCallback():
- return ("test", "password")
- connection = connect()
- connection.handshakeClientUnknown(srpCallback=srpCallback)
- connection.close()
- connection.sock.close()
- print "Test 6 - good SRP: with X.509 certificate"
+ print("Test 6 - good SRP: with X.509 certificate, TLSv1.0")
+ settings = HandshakeSettings()
+ settings.minVersion = (3,1)
+ settings.maxVersion = (3,1)
connection = connect()
- connection.handshakeClientSRP("test", "password")
+ connection.handshakeClientSRP("test", "password", settings=settings)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
+ testConnClient(connection)
connection.close()
- connection.sock.close()
- print "Test 7 - X.509 with SRP faults"
+ print("Test 7 - X.509 with SRP faults")
for fault in Fault.clientSrpFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
try:
connection.handshakeClientSRP("test", "password")
- print " Good Fault %s" % (Fault.faultNames[fault])
- except TLSFaultError, e:
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
+ print(" Good Fault %s" % (Fault.faultNames[fault]))
+ except TLSFaultError as e:
+ print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
badFault = True
- connection.sock.close()
-
- if cryptoIDlibLoaded:
- print "Test 8 - good SRP: with cryptoID certificate chain"
- connection = connect()
- connection.handshakeClientSRP("test", "password")
- assert(isinstance(connection.session.serverCertChain, CertChain))
- if not (connection.session.serverCertChain.validate()):
- print connection.session.serverCertChain.validate(listProblems=True)
-
- connection.close()
- connection.sock.close()
-
- print "Test 9 - CryptoID with SRP faults"
- for fault in Fault.clientSrpFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeClientSRP("test", "password")
- print " Good Fault %s" % (Fault.faultNames[fault])
- except TLSFaultError, e:
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
- badFault = True
- connection.sock.close()
-
- print "Test 10 - good X509"
- connection = connect()
- connection.handshakeClientCert()
- assert(isinstance(connection.session.serverCertChain, X509CertChain))
- connection.close()
- connection.sock.close()
-
- print "Test 10.a - good X509, SSLv3"
- connection = connect()
- settings = HandshakeSettings()
- settings.minVersion = (3,0)
- settings.maxVersion = (3,0)
- connection.handshakeClientCert(settings=settings)
- assert(isinstance(connection.session.serverCertChain, X509CertChain))
- connection.close()
- connection.sock.close()
- print "Test 11 - X.509 faults"
+ print("Test 11 - X.509 faults")
for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
try:
connection.handshakeClientCert()
- print " Good Fault %s" % (Fault.faultNames[fault])
- except TLSFaultError, e:
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
+ print(" Good Fault %s" % (Fault.faultNames[fault]))
+ except TLSFaultError as e:
+ print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
badFault = True
- connection.sock.close()
- if cryptoIDlibLoaded:
- print "Test 12 - good cryptoID"
- connection = connect()
- connection.handshakeClientCert()
- assert(isinstance(connection.session.serverCertChain, CertChain))
- assert(connection.session.serverCertChain.validate())
- connection.close()
- connection.sock.close()
-
- print "Test 13 - cryptoID faults"
- for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeClientCert()
- print " Good Fault %s" % (Fault.faultNames[fault])
- except TLSFaultError, e:
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
- badFault = True
- connection.sock.close()
-
- print "Test 14 - good mutual X509"
+ print("Test 14 - good mutual X509")
x509Cert = X509().parse(open(os.path.join(dir, "clientX509Cert.pem")).read())
x509Chain = X509CertChain([x509Cert])
s = open(os.path.join(dir, "clientX509Key.pem")).read()
@@ -199,80 +210,58 @@ def clientTest(address, dir):
connection = connect()
connection.handshakeClientCert(x509Chain, x509Key)
+ testConnClient(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
connection.close()
- connection.sock.close()
- print "Test 14.a - good mutual X509, SSLv3"
+ print("Test 14.a - good mutual X509, SSLv3")
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3,0)
settings.maxVersion = (3,0)
connection.handshakeClientCert(x509Chain, x509Key, settings=settings)
+ testConnClient(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
connection.close()
- connection.sock.close()
- print "Test 15 - mutual X.509 faults"
+ print("Test 15 - mutual X.509 faults")
for fault in Fault.clientCertFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
try:
connection.handshakeClientCert(x509Chain, x509Key)
- print " Good Fault %s" % (Fault.faultNames[fault])
- except TLSFaultError, e:
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
+ print(" Good Fault %s" % (Fault.faultNames[fault]))
+ except TLSFaultError as e:
+ print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
badFault = True
- connection.sock.close()
-
- if cryptoIDlibLoaded:
- print "Test 16 - good mutual cryptoID"
- cryptoIDChain = CertChain().parse(open(os.path.join(dir, "serverCryptoIDChain.xml"), "r").read())
- cryptoIDKey = parseXMLKey(open(os.path.join(dir, "serverCryptoIDKey.xml"), "r").read(), private=True)
-
- connection = connect()
- connection.handshakeClientCert(cryptoIDChain, cryptoIDKey)
- assert(isinstance(connection.session.serverCertChain, CertChain))
- assert(connection.session.serverCertChain.validate())
- connection.close()
- connection.sock.close()
- print "Test 17 - mutual cryptoID faults"
- for fault in Fault.clientCertFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeClientCert(cryptoIDChain, cryptoIDKey)
- print " Good Fault %s" % (Fault.faultNames[fault])
- except TLSFaultError, e:
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
- badFault = True
- connection.sock.close()
-
- print "Test 18 - good SRP, prepare to resume..."
+ print("Test 18 - good SRP, prepare to resume... (plus SNI)")
connection = connect()
- connection.handshakeClientSRP("test", "password")
+ connection.handshakeClientSRP("test", "password", serverName=address[0])
+ testConnClient(connection)
connection.close()
- connection.sock.close()
session = connection.session
- print "Test 19 - resumption"
+ print("Test 19 - resumption (plus SNI)")
connection = connect()
- connection.handshakeClientSRP("test", "garbage", session=session)
+ connection.handshakeClientSRP("test", "garbage", serverName=address[0],
+ session=session)
+ testConnClient(connection)
#Don't close! -- see below
- print "Test 20 - invalidated resumption"
+ print("Test 20 - invalidated resumption (plus SNI)")
connection.sock.close() #Close the socket without a close_notify!
connection = connect()
try:
- connection.handshakeClientSRP("test", "garbage", session=session)
- assert()
- except TLSRemoteAlert, alert:
+ connection.handshakeClientSRP("test", "garbage",
+ serverName=address[0], session=session)
+ assert(False)
+ except TLSRemoteAlert as alert:
if alert.description != AlertDescription.bad_record_mac:
raise
- connection.sock.close()
-
- print "Test 21 - HTTPS test X.509"
+ connection.close()
+
+ print("Test 21 - HTTPS test X.509")
address = address[0], address[1]+1
if hasattr(socket, "timeout"):
timeoutEx = socket.timeout
@@ -281,136 +270,183 @@ def clientTest(address, dir):
while 1:
try:
time.sleep(2)
- htmlBody = open(os.path.join(dir, "index.html")).read()
+ htmlBody = bytearray(open(os.path.join(dir, "index.html")).read(), "utf-8")
fingerprint = None
for y in range(2):
+ checker =Checker(x509Fingerprint=fingerprint)
h = HTTPTLSConnection(\
- address[0], address[1], x509Fingerprint=fingerprint)
+ address[0], address[1], checker=checker)
for x in range(3):
h.request("GET", "/index.html")
r = h.getresponse()
assert(r.status == 200)
- s = r.read()
- assert(s == htmlBody)
+ b = bytearray(r.read())
+ assert(b == htmlBody)
fingerprint = h.tlsSession.serverCertChain.getFingerprint()
assert(fingerprint)
time.sleep(2)
break
except timeoutEx:
- print "timeout, retrying..."
+ print("timeout, retrying...")
pass
- if cryptoIDlibLoaded:
- print "Test 21a - HTTPS test SRP+cryptoID"
- address = address[0], address[1]+1
- if hasattr(socket, "timeout"):
- timeoutEx = socket.timeout
- else:
- timeoutEx = socket.error
- while 1:
- try:
- time.sleep(2) #Time to generate key and cryptoID
- htmlBody = open(os.path.join(dir, "index.html")).read()
- fingerprint = None
- protocol = None
- for y in range(2):
- h = HTTPTLSConnection(\
- address[0], address[1],
- username="test", password="password",
- cryptoID=fingerprint, protocol=protocol)
- for x in range(3):
- h.request("GET", "/index.html")
- r = h.getresponse()
- assert(r.status == 200)
- s = r.read()
- assert(s == htmlBody)
- fingerprint = h.tlsSession.serverCertChain.cryptoID
- assert(fingerprint)
- protocol = "urn:whatever"
- time.sleep(2)
- break
- except timeoutEx:
- print "timeout, retrying..."
- pass
-
address = address[0], address[1]+1
implementations = []
- if cryptlibpyLoaded:
- implementations.append("cryptlib")
if m2cryptoLoaded:
implementations.append("openssl")
if pycryptoLoaded:
implementations.append("pycrypto")
implementations.append("python")
- print "Test 22 - different ciphers"
+ print("Test 22 - different ciphers, TLSv1.0")
for implementation in implementations:
for cipher in ["aes128", "aes256", "rc4"]:
- print "Test 22:",
+ print("Test 22:", end=' ')
connection = connect()
settings = HandshakeSettings()
settings.cipherNames = [cipher]
settings.cipherImplementations = [implementation, "python"]
- connection.handshakeClientSharedKey("shared", "key", settings=settings)
- print ("%s %s" % (connection.getCipherName(), connection.getCipherImplementation()))
-
- connection.write("hello")
- h = connection.read(min=5, max=5)
- assert(h == "hello")
+ settings.minVersion = (3,1)
+ settings.maxVersion = (3,1)
+ connection.handshakeClientCert(settings=settings)
+ testConnClient(connection)
+ print("%s %s" % (connection.getCipherName(), connection.getCipherImplementation()))
connection.close()
- connection.sock.close()
- print "Test 23 - throughput test"
+ print("Test 23 - throughput test")
for implementation in implementations:
for cipher in ["aes128", "aes256", "3des", "rc4"]:
- if cipher == "3des" and implementation not in ("openssl", "cryptlib", "pycrypto"):
+ if cipher == "3des" and implementation not in ("openssl", "pycrypto"):
continue
- print "Test 23:",
+ print("Test 23:", end=' ')
connection = connect()
settings = HandshakeSettings()
settings.cipherNames = [cipher]
settings.cipherImplementations = [implementation, "python"]
- connection.handshakeClientSharedKey("shared", "key", settings=settings)
- print ("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation())),
+ connection.handshakeClientCert(settings=settings)
+ print("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation()), end=' ')
startTime = time.clock()
- connection.write("hello"*10000)
+ connection.write(b"hello"*10000)
h = connection.read(min=50000, max=50000)
stopTime = time.clock()
- print "100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime-startTime))
+ if stopTime-startTime:
+ print("100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime-startTime)))
+ else:
+ print("100K exchanged very fast")
- assert(h == "hello"*10000)
+ assert(h == b"hello"*10000)
connection.close()
- connection.sock.close()
+
+ print("Test 24.a - Next-Protocol Client Negotiation")
+ connection = connect()
+ connection.handshakeClientCert(nextProtos=[b"http/1.1"])
+ #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
+ assert(connection.next_proto == b'http/1.1')
+ connection.close()
+
+ print("Test 24.b - Next-Protocol Client Negotiation")
+ connection = connect()
+ connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"])
+ #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
+ assert(connection.next_proto == b'spdy/2')
+ connection.close()
+
+ print("Test 24.c - Next-Protocol Client Negotiation")
+ connection = connect()
+ connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"])
+ #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
+ assert(connection.next_proto == b'spdy/2')
+ connection.close()
+
+ print("Test 24.d - Next-Protocol Client Negotiation")
+ connection = connect()
+ connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"])
+ #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
+ assert(connection.next_proto == b'spdy/2')
+ connection.close()
+
+ print("Test 24.e - Next-Protocol Client Negotiation")
+ connection = connect()
+ connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"])
+ #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
+ assert(connection.next_proto == b'spdy/3')
+ connection.close()
- print "Test 24 - Internet servers test"
+ print("Test 24.f - Next-Protocol Client Negotiation")
+ connection = connect()
+ connection.handshakeClientCert(nextProtos=[b"http/1.1"])
+ #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
+ assert(connection.next_proto == b'http/1.1')
+ connection.close()
+
+ print("Test 24.g - Next-Protocol Client Negotiation")
+ connection = connect()
+ connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"])
+ #print(" Next-Protocol Negotiated: %s" % connection.next_proto)
+ assert(connection.next_proto == b'spdy/2')
+ connection.close()
+
+ print('Test 25 - good standard XMLRPC https client')
+ time.sleep(2) # Hack for lack of ability to set timeout here
+ address = address[0], address[1]+1
+ server = xmlrpclib.Server('https://%s:%s' % address)
+ assert server.add(1,2) == 3
+ assert server.pow(2,4) == 16
+
+ print('Test 26 - good tlslite XMLRPC client')
+ transport = XMLRPCTransport(ignoreAbruptClose=True)
+ server = xmlrpclib.Server('https://%s:%s' % address, transport)
+ assert server.add(1,2) == 3
+ assert server.pow(2,4) == 16
+
+ print('Test 27 - good XMLRPC ignored protocol')
+ server = xmlrpclib.Server('http://%s:%s' % address, transport)
+ assert server.add(1,2) == 3
+ assert server.pow(2,4) == 16
+
+ print("Test 28 - Internet servers test")
try:
i = IMAP4_TLS("cyrus.andrew.cmu.edu")
i.login("anonymous", "anonymous@anonymous.net")
i.logout()
- print "Test 24: IMAP4 good"
+ print("Test 28: IMAP4 good")
p = POP3_TLS("pop.gmail.com")
p.quit()
- print "Test 24: POP3 good"
- except socket.error, e:
- print "Non-critical error: socket error trying to reach internet server: ", e
+ print("Test 29: POP3 good")
+ except socket.error as e:
+ print("Non-critical error: socket error trying to reach internet server: ", e)
if not badFault:
- print "Test succeeded"
+ print("Test succeeded")
else:
- print "Test failed"
+ print("Test failed")
+
+
+def testConnServer(connection):
+ count = 0
+ while 1:
+ s = connection.read()
+ count += len(s)
+ if len(s) == 0:
+ break
+ connection.write(s)
+ if count == 1111:
+ break
+
+def serverTestCmd(argv):
-def serverTest(address, dir):
+ address = argv[0]
+ dir = argv[1]
+
#Split address into hostname/port tuple
address = address.split(":")
- if len(address)==1:
- address.append("4443")
address = ( address[0], int(address[1]) )
#Connect to server
@@ -421,29 +457,67 @@ def serverTest(address, dir):
def connect():
return TLSConnection(lsock.accept()[0])
- print "Test 1 - good shared key"
- sharedKeyDB = SharedKeyDB()
- sharedKeyDB["shared"] = "key"
- sharedKeyDB["shared2"] = "key2"
+ x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read())
+ x509Chain = X509CertChain([x509Cert])
+ s = open(os.path.join(dir, "serverX509Key.pem")).read()
+ x509Key = parsePEMKey(s, private=True)
+
+ print("Test 0 - Anonymous server handshake")
+ connection = connect()
+ connection.handshakeServer(anon=True)
+ testConnServer(connection)
+ connection.close()
+
+ print("Test 1 - good X.509")
connection = connect()
- connection.handshakeServer(sharedKeyDB=sharedKeyDB)
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
+ assert(connection.session.serverName == address[0])
+ testConnServer(connection)
connection.close()
- connection.sock.close()
- print "Test 2 - shared key faults"
- for fault in Fault.clientSharedKeyFaults + Fault.genericFaults:
+ print("Test 1.a - good X.509, SSL v3")
+ connection = connect()
+ settings = HandshakeSettings()
+ settings.minVersion = (3,0)
+ settings.maxVersion = (3,0)
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
+ testConnServer(connection)
+ connection.close()
+
+ print("Test 1.b - good X.509, RC4-MD5")
+ connection = connect()
+ settings = HandshakeSettings()
+ settings.macNames = ["sha", "md5"]
+ settings.cipherNames = ["rc4"]
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
+ testConnServer(connection)
+ connection.close()
+
+ if tackpyLoaded:
+ tack = Tack.createFromPem(open("./TACK1.pem", "rU").read())
+ tackUnrelated = Tack.createFromPem(open("./TACKunrelated.pem", "rU").read())
+
+ settings = HandshakeSettings()
+ settings.useExperimentalTackExtension = True
+
+ print("Test 2.a - good X.509, TACK")
connection = connect()
- connection.fault = fault
- try:
- connection.handshakeServer(sharedKeyDB=sharedKeyDB)
- assert()
- except:
- pass
- connection.sock.close()
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ tacks=[tack], activationFlags=1, settings=settings)
+ testConnServer(connection)
+ connection.close()
- print "Test 3 - good SRP"
- #verifierDB = tlslite.VerifierDB(os.path.join(dir, "verifierDB"))
- #verifierDB.open()
+ print("Test 2.b - good X.509, TACK unrelated to cert chain")
+ connection = connect()
+ try:
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ tacks=[tackUnrelated], settings=settings)
+ assert(False)
+ except TLSRemoteAlert as alert:
+ if alert.description != AlertDescription.illegal_parameter:
+ raise
+
+ print("Test 3 - good SRP")
verifierDB = VerifierDB()
verifierDB.create()
entry = VerifierDB.makeVerifier("test", "password", 1536)
@@ -451,10 +525,10 @@ def serverTest(address, dir):
connection = connect()
connection.handshakeServer(verifierDB=verifierDB)
+ testConnServer(connection)
connection.close()
- connection.sock.close()
- print "Test 4 - SRP faults"
+ print("Test 4 - SRP faults")
for fault in Fault.clientSrpFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
@@ -463,27 +537,16 @@ def serverTest(address, dir):
assert()
except:
pass
- connection.sock.close()
-
- print "Test 5 - good SRP: unknown_srp_username idiom"
- connection = connect()
- connection.handshakeServer(verifierDB=verifierDB)
- connection.close()
- connection.sock.close()
-
- print "Test 6 - good SRP: with X.509 cert"
- x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read())
- x509Chain = X509CertChain([x509Cert])
- s = open(os.path.join(dir, "serverX509Key.pem")).read()
- x509Key = parsePEMKey(s, private=True)
+ connection.close()
+ print("Test 6 - good SRP: with X.509 cert")
connection = connect()
connection.handshakeServer(verifierDB=verifierDB, \
certChain=x509Chain, privateKey=x509Key)
+ testConnServer(connection)
connection.close()
- connection.sock.close()
- print "Test 7 - X.509 with SRP faults"
+ print("Test 7 - X.509 with SRP faults")
for fault in Fault.clientSrpFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
@@ -493,46 +556,9 @@ def serverTest(address, dir):
assert()
except:
pass
- connection.sock.close()
-
- if cryptoIDlibLoaded:
- print "Test 8 - good SRP: with cryptoID certs"
- cryptoIDChain = CertChain().parse(open(os.path.join(dir, "serverCryptoIDChain.xml"), "r").read())
- cryptoIDKey = parseXMLKey(open(os.path.join(dir, "serverCryptoIDKey.xml"), "r").read(), private=True)
- connection = connect()
- connection.handshakeServer(verifierDB=verifierDB, \
- certChain=cryptoIDChain, privateKey=cryptoIDKey)
connection.close()
- connection.sock.close()
- print "Test 9 - cryptoID with SRP faults"
- for fault in Fault.clientSrpFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeServer(verifierDB=verifierDB, \
- certChain=cryptoIDChain, privateKey=cryptoIDKey)
- assert()
- except:
- pass
- connection.sock.close()
-
- print "Test 10 - good X.509"
- connection = connect()
- connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
- connection.close()
- connection.sock.close()
-
- print "Test 10.a - good X.509, SSL v3"
- connection = connect()
- settings = HandshakeSettings()
- settings.minVersion = (3,0)
- settings.maxVersion = (3,0)
- connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
- connection.close()
- connection.sock.close()
-
- print "Test 11 - X.509 faults"
+ print("Test 11 - X.509 faults")
for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
@@ -541,44 +567,26 @@ def serverTest(address, dir):
assert()
except:
pass
- connection.sock.close()
-
- if cryptoIDlibLoaded:
- print "Test 12 - good cryptoID"
- connection = connect()
- connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey)
connection.close()
- connection.sock.close()
- print "Test 13 - cryptoID faults"
- for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey)
- assert()
- except:
- pass
- connection.sock.close()
-
- print "Test 14 - good mutual X.509"
+ print("Test 14 - good mutual X.509")
connection = connect()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
+ testConnServer(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
connection.close()
- connection.sock.close()
- print "Test 14a - good mutual X.509, SSLv3"
+ print("Test 14a - good mutual X.509, SSLv3")
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3,0)
settings.maxVersion = (3,0)
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings)
+ testConnServer(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
connection.close()
- connection.sock.close()
- print "Test 15 - mutual X.509 faults"
+ print("Test 15 - mutual X.509 faults")
for fault in Fault.clientCertFaults + Fault.genericFaults:
connection = connect()
connection.fault = fault
@@ -587,488 +595,199 @@ def serverTest(address, dir):
assert()
except:
pass
- connection.sock.close()
-
- if cryptoIDlibLoaded:
- print "Test 16 - good mutual cryptoID"
- connection = connect()
- connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey, reqCert=True)
- assert(isinstance(connection.session.serverCertChain, CertChain))
- assert(connection.session.serverCertChain.validate())
connection.close()
- connection.sock.close()
- print "Test 17 - mutual cryptoID faults"
- for fault in Fault.clientCertFaults + Fault.genericFaults:
- connection = connect()
- connection.fault = fault
- try:
- connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey, reqCert=True)
- assert()
- except:
- pass
- connection.sock.close()
-
- print "Test 18 - good SRP, prepare to resume"
+ print("Test 18 - good SRP, prepare to resume")
sessionCache = SessionCache()
connection = connect()
connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
+ assert(connection.session.serverName == address[0])
+ testConnServer(connection)
connection.close()
- connection.sock.close()
- print "Test 19 - resumption"
+ print("Test 19 - resumption")
connection = connect()
connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
+ assert(connection.session.serverName == address[0])
+ testConnServer(connection)
#Don't close! -- see next test
- print "Test 20 - invalidated resumption"
+ print("Test 20 - invalidated resumption")
try:
connection.read(min=1, max=1)
assert() #Client is going to close the socket without a close_notify
- except TLSAbruptCloseError, e:
+ except TLSAbruptCloseError as e:
pass
connection = connect()
try:
connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
- except TLSLocalAlert, alert:
+ except TLSLocalAlert as alert:
if alert.description != AlertDescription.bad_record_mac:
raise
- connection.sock.close()
+ connection.close()
- print "Test 21 - HTTPS test X.509"
+ print("Test 21 - HTTPS test X.509")
#Close the current listening socket
lsock.close()
#Create and run an HTTP Server using TLSSocketServerMixIn
class MyHTTPServer(TLSSocketServerMixIn,
- BaseHTTPServer.HTTPServer):
+ HTTPServer):
def handshake(self, tlsConnection):
tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
return True
cd = os.getcwd()
os.chdir(dir)
address = address[0], address[1]+1
- httpd = MyHTTPServer(address, SimpleHTTPServer.SimpleHTTPRequestHandler)
+ httpd = MyHTTPServer(address, SimpleHTTPRequestHandler)
for x in range(6):
httpd.handle_request()
httpd.server_close()
cd = os.chdir(cd)
- if cryptoIDlibLoaded:
- print "Test 21a - HTTPS test SRP+cryptoID"
-
- #Create and run an HTTP Server using TLSSocketServerMixIn
- class MyHTTPServer(TLSSocketServerMixIn,
- BaseHTTPServer.HTTPServer):
- def handshake(self, tlsConnection):
- tlsConnection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey,
- verifierDB=verifierDB)
- return True
- cd = os.getcwd()
- os.chdir(dir)
- address = address[0], address[1]+1
- httpd = MyHTTPServer(address, SimpleHTTPServer.SimpleHTTPRequestHandler)
- for x in range(6):
- httpd.handle_request()
- httpd.server_close()
- cd = os.chdir(cd)
-
#Re-connect the listening socket
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
address = address[0], address[1]+1
lsock.bind(address)
lsock.listen(5)
- def connect():
- return TLSConnection(lsock.accept()[0])
-
implementations = []
- if cryptlibpyLoaded:
- implementations.append("cryptlib")
if m2cryptoLoaded:
implementations.append("openssl")
if pycryptoLoaded:
implementations.append("pycrypto")
implementations.append("python")
- print "Test 22 - different ciphers"
+ print("Test 22 - different ciphers")
for implementation in ["python"] * len(implementations):
for cipher in ["aes128", "aes256", "rc4"]:
- print "Test 22:",
+ print("Test 22:", end=' ')
connection = connect()
settings = HandshakeSettings()
settings.cipherNames = [cipher]
settings.cipherImplementations = [implementation, "python"]
- connection.handshakeServer(sharedKeyDB=sharedKeyDB, settings=settings)
- print connection.getCipherName(), connection.getCipherImplementation()
- h = connection.read(min=5, max=5)
- assert(h == "hello")
- connection.write(h)
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ settings=settings)
+ print(connection.getCipherName(), connection.getCipherImplementation())
+ testConnServer(connection)
connection.close()
- connection.sock.close()
- print "Test 23 - throughput test"
+ print("Test 23 - throughput test")
for implementation in implementations:
for cipher in ["aes128", "aes256", "3des", "rc4"]:
- if cipher == "3des" and implementation not in ("openssl", "cryptlib", "pycrypto"):
+ if cipher == "3des" and implementation not in ("openssl", "pycrypto"):
continue
- print "Test 23:",
+ print("Test 23:", end=' ')
connection = connect()
settings = HandshakeSettings()
settings.cipherNames = [cipher]
settings.cipherImplementations = [implementation, "python"]
- connection.handshakeServer(sharedKeyDB=sharedKeyDB, settings=settings)
- print connection.getCipherName(), connection.getCipherImplementation()
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ settings=settings)
+ print(connection.getCipherName(), connection.getCipherImplementation())
h = connection.read(min=50000, max=50000)
- assert(h == "hello"*10000)
+ assert(h == b"hello"*10000)
connection.write(h)
connection.close()
- connection.sock.close()
-
- print "Test succeeded"
-
+ print("Test 24.a - Next-Protocol Server Negotiation")
+ connection = connect()
+ settings = HandshakeSettings()
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ settings=settings, nextProtos=[b"http/1.1"])
+ testConnServer(connection)
+ connection.close()
+ print("Test 24.b - Next-Protocol Server Negotiation")
+ connection = connect()
+ settings = HandshakeSettings()
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ settings=settings, nextProtos=[b"spdy/2", b"http/1.1"])
+ testConnServer(connection)
+ connection.close()
+
+ print("Test 24.c - Next-Protocol Server Negotiation")
+ connection = connect()
+ settings = HandshakeSettings()
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ settings=settings, nextProtos=[b"http/1.1", b"spdy/2"])
+ testConnServer(connection)
+ connection.close()
+ print("Test 24.d - Next-Protocol Server Negotiation")
+ connection = connect()
+ settings = HandshakeSettings()
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ settings=settings, nextProtos=[b"spdy/2", b"http/1.1"])
+ testConnServer(connection)
+ connection.close()
+
+ print("Test 24.e - Next-Protocol Server Negotiation")
+ connection = connect()
+ settings = HandshakeSettings()
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ settings=settings, nextProtos=[b"http/1.1", b"spdy/2", b"spdy/3"])
+ testConnServer(connection)
+ connection.close()
+
+ print("Test 24.f - Next-Protocol Server Negotiation")
+ connection = connect()
+ settings = HandshakeSettings()
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ settings=settings, nextProtos=[b"spdy/3", b"spdy/2"])
+ testConnServer(connection)
+ connection.close()
+
+ print("Test 24.g - Next-Protocol Server Negotiation")
+ connection = connect()
+ settings = HandshakeSettings()
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
+ settings=settings, nextProtos=[])
+ testConnServer(connection)
+ connection.close()
+ print("Tests 25-27 - XMLRPXC server")
+ address = address[0], address[1]+1
+ class Server(TLSXMLRPCServer):
-
-
-
-
-
-
-if len(sys.argv) == 1 or (len(sys.argv)==2 and sys.argv[1].lower().endswith("help")):
- print ""
- print "Version: 0.3.8"
- print ""
- print "RNG: %s" % prngName
- print ""
- print "Modules:"
- if cryptlibpyLoaded:
- print " cryptlib_py : Loaded"
- else:
- print " cryptlib_py : Not Loaded"
- if m2cryptoLoaded:
- print " M2Crypto : Loaded"
- else:
- print " M2Crypto : Not Loaded"
- if pycryptoLoaded:
- print " pycrypto : Loaded"
- else:
- print " pycrypto : Not Loaded"
- if gmpyLoaded:
- print " GMPY : Loaded"
- else:
- print " GMPY : Not Loaded"
- if cryptoIDlibLoaded:
- print " cryptoIDlib : Loaded"
- else:
- print " cryptoIDlib : Not Loaded"
- print ""
- print "Commands:"
- print ""
- print " clientcert <server> [<chain> <key>]"
- print " clientsharedkey <server> <user> <pass>"
- print " clientsrp <server> <user> <pass>"
- print " clienttest <server> <dir>"
- print ""
- print " serversrp <server> <verifierDB>"
- print " servercert <server> <chain> <key> [req]"
- print " serversrpcert <server> <verifierDB> <chain> <key>"
- print " serversharedkey <server> <sharedkeyDB>"
- print " servertest <server> <dir>"
- sys.exit()
-
-cmd = sys.argv[1].lower()
-
-class Args:
- def __init__(self, argv):
- self.argv = argv
- def get(self, index):
- if len(self.argv)<=index:
- raise SyntaxError("Not enough arguments")
- return self.argv[index]
- def getLast(self, index):
- if len(self.argv)>index+1:
- raise SyntaxError("Too many arguments")
- return self.get(index)
-
-args = Args(sys.argv)
-
-def reformatDocString(s):
- lines = s.splitlines()
- newLines = []
- for line in lines:
- newLines.append(" " + line.strip())
- return "\n".join(newLines)
-
-try:
- if cmd == "clienttest":
- address = args.get(2)
- dir = args.getLast(3)
- clientTest(address, dir)
- sys.exit()
-
- elif cmd.startswith("client"):
- address = args.get(2)
-
- #Split address into hostname/port tuple
- address = address.split(":")
- if len(address)==1:
- address.append("4443")
- address = ( address[0], int(address[1]) )
-
- def connect():
- #Connect to server
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- if hasattr(sock, "settimeout"):
- sock.settimeout(5)
- sock.connect(address)
-
- #Instantiate TLSConnections
- return TLSConnection(sock)
-
- try:
- if cmd == "clientsrp":
- username = args.get(3)
- password = args.getLast(4)
- connection = connect()
- start = time.clock()
- connection.handshakeClientSRP(username, password)
- elif cmd == "clientsharedkey":
- username = args.get(3)
- password = args.getLast(4)
- connection = connect()
- start = time.clock()
- connection.handshakeClientSharedKey(username, password)
- elif cmd == "clientcert":
- certChain = None
- privateKey = None
- if len(sys.argv) > 3:
- certFilename = args.get(3)
- keyFilename = args.getLast(4)
-
- s1 = open(certFilename, "rb").read()
- s2 = open(keyFilename, "rb").read()
-
- #Try to create cryptoID cert chain
- if cryptoIDlibLoaded:
- try:
- certChain = CertChain().parse(s1)
- privateKey = parsePrivateKey(s2)
- except:
- certChain = None
- privateKey = None
-
- #Try to create X.509 cert chain
- if not certChain:
- x509 = X509()
- x509.parse(s1)
- certChain = X509CertChain([x509])
- privateKey = parsePrivateKey(s2)
-
- connection = connect()
- start = time.clock()
- connection.handshakeClientCert(certChain, privateKey)
- else:
- raise SyntaxError("Unknown command")
-
- except TLSLocalAlert, a:
- if a.description == AlertDescription.bad_record_mac:
- if cmd == "clientsharedkey":
- print "Bad sharedkey password"
- else:
- raise
- elif a.description == AlertDescription.user_canceled:
- print str(a)
- else:
- raise
- sys.exit()
- except TLSRemoteAlert, a:
- if a.description == AlertDescription.unknown_srp_username:
- if cmd == "clientsrp":
- print "Unknown username"
- else:
- raise
- elif a.description == AlertDescription.bad_record_mac:
- if cmd == "clientsrp":
- print "Bad username or password"
- else:
- raise
- elif a.description == AlertDescription.handshake_failure:
- print "Unable to negotiate mutually acceptable parameters"
- else:
- raise
- sys.exit()
-
- stop = time.clock()
- print "Handshake success"
- print " Handshake time: %.4f seconds" % (stop - start)
- print " Version: %s.%s" % connection.version
- print " Cipher: %s %s" % (connection.getCipherName(), connection.getCipherImplementation())
- if connection.session.srpUsername:
- print " Client SRP username: %s" % connection.session.srpUsername
- if connection.session.sharedKeyUsername:
- print " Client shared key username: %s" % connection.session.sharedKeyUsername
- if connection.session.clientCertChain:
- print " Client fingerprint: %s" % connection.session.clientCertChain.getFingerprint()
- if connection.session.serverCertChain:
- print " Server fingerprint: %s" % connection.session.serverCertChain.getFingerprint()
- connection.close()
- connection.sock.close()
-
- elif cmd.startswith("server"):
- address = args.get(2)
-
- #Split address into hostname/port tuple
- address = address.split(":")
- if len(address)==1:
- address.append("4443")
- address = ( address[0], int(address[1]) )
-
- verifierDBFilename = None
- sharedKeyDBFilename = None
- certFilename = None
- keyFilename = None
- sharedKeyDB = None
- reqCert = False
-
- if cmd == "serversrp":
- verifierDBFilename = args.getLast(3)
- elif cmd == "servercert":
- certFilename = args.get(3)
- keyFilename = args.get(4)
- if len(sys.argv)>=6:
- req = args.getLast(5)
- if req.lower() != "req":
- raise SyntaxError()
- reqCert = True
- elif cmd == "serversrpcert":
- verifierDBFilename = args.get(3)
- certFilename = args.get(4)
- keyFilename = args.getLast(5)
- elif cmd == "serversharedkey":
- sharedKeyDBFilename = args.getLast(3)
- elif cmd == "servertest":
- address = args.get(2)
- dir = args.getLast(3)
- serverTest(address, dir)
- sys.exit()
-
- verifierDB = None
- if verifierDBFilename:
- verifierDB = VerifierDB(verifierDBFilename)
- verifierDB.open()
-
- sharedKeyDB = None
- if sharedKeyDBFilename:
- sharedKeyDB = SharedKeyDB(sharedKeyDBFilename)
- sharedKeyDB.open()
-
- certChain = None
- privateKey = None
- if certFilename:
- s1 = open(certFilename, "rb").read()
- s2 = open(keyFilename, "rb").read()
-
- #Try to create cryptoID cert chain
- if cryptoIDlibLoaded:
- try:
- certChain = CertChain().parse(s1)
- privateKey = parsePrivateKey(s2)
- except:
- certChain = None
- privateKey = None
-
- #Try to create X.509 cert chain
- if not certChain:
- x509 = X509()
- x509.parse(s1)
- certChain = X509CertChain([x509])
- privateKey = parsePrivateKey(s2)
-
-
-
- #Create handler function - performs handshake, then echos all bytes received
- def handler(sock):
- try:
- connection = TLSConnection(sock)
- settings = HandshakeSettings()
- connection.handshakeServer(sharedKeyDB=sharedKeyDB, verifierDB=verifierDB, \
- certChain=certChain, privateKey=privateKey, \
- reqCert=reqCert, settings=settings)
- print "Handshake success"
- print " Version: %s.%s" % connection.version
- print " Cipher: %s %s" % (connection.getCipherName(), connection.getCipherImplementation())
- if connection.session.srpUsername:
- print " Client SRP username: %s" % connection.session.srpUsername
- if connection.session.sharedKeyUsername:
- print " Client shared key username: %s" % connection.session.sharedKeyUsername
- if connection.session.clientCertChain:
- print " Client fingerprint: %s" % connection.session.clientCertChain.getFingerprint()
- if connection.session.serverCertChain:
- print " Server fingerprint: %s" % connection.session.serverCertChain.getFingerprint()
-
- s = ""
- while 1:
- newS = connection.read()
- if not newS:
- break
- s += newS
- if s[-1]=='\n':
- connection.write(s)
- s = ""
- except TLSLocalAlert, a:
- if a.description == AlertDescription.unknown_srp_username:
- print "Unknown SRP username"
- elif a.description == AlertDescription.bad_record_mac:
- if cmd == "serversrp" or cmd == "serversrpcert":
- print "Bad SRP password for:", connection.allegedSrpUsername
- else:
- raise
- elif a.description == AlertDescription.handshake_failure:
- print "Unable to negotiate mutually acceptable parameters"
- else:
- raise
- except TLSRemoteAlert, a:
- if a.description == AlertDescription.bad_record_mac:
- if cmd == "serversharedkey":
- print "Bad sharedkey password for:", connection.allegedSharedKeyUsername
- else:
- raise
- elif a.description == AlertDescription.user_canceled:
- print "Handshake cancelled"
- elif a.description == AlertDescription.handshake_failure:
- print "Unable to negotiate mutually acceptable parameters"
- elif a.description == AlertDescription.close_notify:
- pass
- else:
- raise
-
- #Run multi-threaded server
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.bind(address)
- sock.listen(5)
- while 1:
- (newsock, cliAddress) = sock.accept()
- thread.start_new_thread(handler, (newsock,))
-
-
+ def handshake(self, tlsConnection):
+ try:
+ tlsConnection.handshakeServer(certChain=x509Chain,
+ privateKey=x509Key,
+ sessionCache=sessionCache)
+ tlsConnection.ignoreAbruptClose = True
+ return True
+ except TLSError as error:
+ print("Handshake failure:", str(error))
+ return False
+
+ class MyFuncs:
+ def pow(self, x, y): return pow(x, y)
+ def add(self, x, y): return x + y
+
+ server = Server(address)
+ server.register_instance(MyFuncs())
+ #sa = server.socket.getsockname()
+ #print "Serving HTTPS on", sa[0], "port", sa[1]
+ for i in range(6):
+ server.handle_request()
+
+ print("Test succeeded")
+
+
+if __name__ == '__main__':
+ if len(sys.argv) < 2:
+ printUsage("Missing command")
+ elif sys.argv[1] == "client"[:len(sys.argv[1])]:
+ clientTestCmd(sys.argv[2:])
+ elif sys.argv[1] == "server"[:len(sys.argv[1])]:
+ serverTestCmd(sys.argv[2:])
else:
- print "Bad command: '%s'" % cmd
-except TLSRemoteAlert, a:
- print str(a)
- raise
-
-
-
-
-
+ printUsage("Unknown command: %s" % sys.argv[1])

Powered by Google App Engine
This is Rietveld 408576698