Index: third_party/tlslite/tests/tlstest.py |
diff --git a/third_party/tlslite/scripts/tls.py b/third_party/tlslite/tests/tlstest.py |
old mode 100644 |
new mode 100755 |
similarity index 29% |
copy from third_party/tlslite/scripts/tls.py |
copy to third_party/tlslite/tests/tlstest.py |
index fa2c663f3b06bff1a2567e240e0eb5a110f2416b..fa1b13fa65d3248748de662d75a136ecedf39cba |
--- a/third_party/tlslite/scripts/tls.py |
+++ b/third_party/tlslite/tests/tlstest.py |
@@ -1,47 +1,87 @@ |
-#! python |
- |
+#!/usr/bin/env python |
+ |
+# Authors: |
+# Trevor Perrin |
+# Kees Bos - Added tests for XML-RPC |
+# Dimitris Moraitis - Anon ciphersuites |
+# Marcelo Fernandez - Added test for NPN |
+# Martin von Loewis - python 3 port |
+ |
+# |
+# See the LICENSE file for legal information regarding use of this file. |
+from __future__ import print_function |
import sys |
import os |
import os.path |
import socket |
-import thread |
import time |
-import httplib |
-import BaseHTTPServer |
-import SimpleHTTPServer |
- |
- |
+import getopt |
try: |
- from cryptoIDlib.api import * |
- cryptoIDlibLoaded = True |
-except: |
- cryptoIDlibLoaded = False |
- |
-if __name__ != "__main__": |
- raise "This must be run as a command, not used as a module!" |
- |
-#import tlslite |
-#from tlslite.constants import AlertDescription, Fault |
- |
-#from tlslite.utils.jython_compat import formatExceptionTrace |
-#from tlslite.X509 import X509, X509CertChain |
- |
-from tlslite.api import * |
- |
-def parsePrivateKey(s): |
- try: |
- return parsePEMKey(s, private=True) |
- except Exception, e: |
- print e |
- return parseXMLKey(s, private=True) |
+ from BaseHTTPServer import HTTPServer |
+ from SimpleHTTPServer import SimpleHTTPRequestHandler |
+except ImportError: |
+ from http.server import HTTPServer, SimpleHTTPRequestHandler |
+ |
+from tlslite import TLSConnection, Fault, HandshakeSettings, \ |
+ X509, X509CertChain, IMAP4_TLS, VerifierDB, Session, SessionCache, \ |
+ parsePEMKey, constants, \ |
+ AlertDescription, HTTPTLSConnection, TLSSocketServerMixIn, \ |
+ POP3_TLS, m2cryptoLoaded, pycryptoLoaded, gmpyLoaded, tackpyLoaded, \ |
+ Checker, __version__ |
+ |
+from tlslite.errors import * |
+from tlslite.utils.cryptomath import prngName |
+try: |
+ import xmlrpclib |
+except ImportError: |
+ # Python 3 |
+ from xmlrpc import client as xmlrpclib |
+from tlslite import * |
+try: |
+ from tack.structures.Tack import Tack |
+ |
+except ImportError: |
+ pass |
-def clientTest(address, dir): |
+def printUsage(s=None): |
+ if m2cryptoLoaded: |
+ crypto = "M2Crypto/OpenSSL" |
+ else: |
+ crypto = "Python crypto" |
+ if s: |
+ print("ERROR: %s" % s) |
+ print("""\ntls.py version %s (using %s) |
+ |
+Commands: |
+ server HOST:PORT DIRECTORY |
+ |
+ client HOST:PORT DIRECTORY |
+""" % (__version__, crypto)) |
+ sys.exit(-1) |
+ |
+ |
+def testConnClient(conn): |
+ b1 = os.urandom(1) |
+ b10 = os.urandom(10) |
+ b100 = os.urandom(100) |
+ b1000 = os.urandom(1000) |
+ conn.write(b1) |
+ conn.write(b10) |
+ conn.write(b100) |
+ conn.write(b1000) |
+ assert(conn.read(min=1, max=1) == b1) |
+ assert(conn.read(min=10, max=10) == b10) |
+ assert(conn.read(min=100, max=100) == b100) |
+ assert(conn.read(min=1000, max=1000) == b1000) |
+ |
+def clientTestCmd(argv): |
+ |
+ address = argv[0] |
+ dir = argv[1] |
#Split address into hostname/port tuple |
address = address.split(":") |
- if len(address)==1: |
- address.append("4443") |
address = ( address[0], int(address[1]) ) |
def connect(): |
@@ -56,142 +96,113 @@ def clientTest(address, dir): |
badFault = False |
- print "Test 1 - good shared key" |
+ print("Test 0 - anonymous handshake") |
+ connection = connect() |
+ connection.handshakeClientAnonymous() |
+ testConnClient(connection) |
+ connection.close() |
+ |
+ print("Test 1 - good X509 (plus SNI)") |
+ connection = connect() |
+ connection.handshakeClientCert(serverName=address[0]) |
+ testConnClient(connection) |
+ assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
+ assert(connection.session.serverName == address[0]) |
+ connection.close() |
+ |
+ print("Test 1.a - good X509, SSLv3") |
+ connection = connect() |
+ settings = HandshakeSettings() |
+ settings.minVersion = (3,0) |
+ settings.maxVersion = (3,0) |
+ connection.handshakeClientCert(settings=settings) |
+ testConnClient(connection) |
+ assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
+ connection.close() |
+ |
+ print("Test 1.b - good X509, RC4-MD5") |
connection = connect() |
- connection.handshakeClientSharedKey("shared", "key") |
+ settings = HandshakeSettings() |
+ settings.macNames = ["md5"] |
+ connection.handshakeClientCert(settings=settings) |
+ testConnClient(connection) |
+ assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
+ assert(connection.session.cipherSuite == constants.CipherSuite.TLS_RSA_WITH_RC4_128_MD5) |
connection.close() |
- connection.sock.close() |
- print "Test 2 - shared key faults" |
- for fault in Fault.clientSharedKeyFaults + Fault.genericFaults: |
+ if tackpyLoaded: |
+ |
+ settings = HandshakeSettings() |
+ settings.useExperimentalTackExtension = True |
+ |
+ print("Test 2.a - good X.509, TACK") |
+ connection = connect() |
+ connection.handshakeClientCert(settings=settings) |
+ assert(connection.session.tackExt.tacks[0].getTackId() == "rrted.ptvtl.d2uiq.ox2xe.w4ss3") |
+ assert(connection.session.tackExt.activation_flags == 1) |
+ testConnClient(connection) |
+ connection.close() |
+ |
+ print("Test 2.b - good X.509, TACK unrelated to cert chain") |
connection = connect() |
- connection.fault = fault |
try: |
- connection.handshakeClientSharedKey("shared", "key") |
- print " Good Fault %s" % (Fault.faultNames[fault]) |
- except TLSFaultError, e: |
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) |
- badFault = True |
- connection.sock.close() |
+ connection.handshakeClientCert(settings=settings) |
+ assert(False) |
+ except TLSLocalAlert as alert: |
+ if alert.description != AlertDescription.illegal_parameter: |
+ raise |
+ connection.close() |
- print "Test 3 - good SRP" |
+ print("Test 3 - good SRP") |
connection = connect() |
connection.handshakeClientSRP("test", "password") |
+ testConnClient(connection) |
connection.close() |
- print "Test 4 - SRP faults" |
+ print("Test 4 - SRP faults") |
for fault in Fault.clientSrpFaults + Fault.genericFaults: |
connection = connect() |
connection.fault = fault |
try: |
connection.handshakeClientSRP("test", "password") |
- print " Good Fault %s" % (Fault.faultNames[fault]) |
- except TLSFaultError, e: |
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) |
+ print(" Good Fault %s" % (Fault.faultNames[fault])) |
+ except TLSFaultError as e: |
+ print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) |
badFault = True |
- connection.sock.close() |
- |
- print "Test 5 - good SRP: unknown_srp_username idiom" |
- def srpCallback(): |
- return ("test", "password") |
- connection = connect() |
- connection.handshakeClientUnknown(srpCallback=srpCallback) |
- connection.close() |
- connection.sock.close() |
- print "Test 6 - good SRP: with X.509 certificate" |
+ print("Test 6 - good SRP: with X.509 certificate, TLSv1.0") |
+ settings = HandshakeSettings() |
+ settings.minVersion = (3,1) |
+ settings.maxVersion = (3,1) |
connection = connect() |
- connection.handshakeClientSRP("test", "password") |
+ connection.handshakeClientSRP("test", "password", settings=settings) |
assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
+ testConnClient(connection) |
connection.close() |
- connection.sock.close() |
- print "Test 7 - X.509 with SRP faults" |
+ print("Test 7 - X.509 with SRP faults") |
for fault in Fault.clientSrpFaults + Fault.genericFaults: |
connection = connect() |
connection.fault = fault |
try: |
connection.handshakeClientSRP("test", "password") |
- print " Good Fault %s" % (Fault.faultNames[fault]) |
- except TLSFaultError, e: |
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) |
+ print(" Good Fault %s" % (Fault.faultNames[fault])) |
+ except TLSFaultError as e: |
+ print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) |
badFault = True |
- connection.sock.close() |
- |
- if cryptoIDlibLoaded: |
- print "Test 8 - good SRP: with cryptoID certificate chain" |
- connection = connect() |
- connection.handshakeClientSRP("test", "password") |
- assert(isinstance(connection.session.serverCertChain, CertChain)) |
- if not (connection.session.serverCertChain.validate()): |
- print connection.session.serverCertChain.validate(listProblems=True) |
- |
- connection.close() |
- connection.sock.close() |
- |
- print "Test 9 - CryptoID with SRP faults" |
- for fault in Fault.clientSrpFaults + Fault.genericFaults: |
- connection = connect() |
- connection.fault = fault |
- try: |
- connection.handshakeClientSRP("test", "password") |
- print " Good Fault %s" % (Fault.faultNames[fault]) |
- except TLSFaultError, e: |
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) |
- badFault = True |
- connection.sock.close() |
- |
- print "Test 10 - good X509" |
- connection = connect() |
- connection.handshakeClientCert() |
- assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
- connection.close() |
- connection.sock.close() |
- |
- print "Test 10.a - good X509, SSLv3" |
- connection = connect() |
- settings = HandshakeSettings() |
- settings.minVersion = (3,0) |
- settings.maxVersion = (3,0) |
- connection.handshakeClientCert(settings=settings) |
- assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
- connection.close() |
- connection.sock.close() |
- print "Test 11 - X.509 faults" |
+ print("Test 11 - X.509 faults") |
for fault in Fault.clientNoAuthFaults + Fault.genericFaults: |
connection = connect() |
connection.fault = fault |
try: |
connection.handshakeClientCert() |
- print " Good Fault %s" % (Fault.faultNames[fault]) |
- except TLSFaultError, e: |
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) |
+ print(" Good Fault %s" % (Fault.faultNames[fault])) |
+ except TLSFaultError as e: |
+ print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) |
badFault = True |
- connection.sock.close() |
- if cryptoIDlibLoaded: |
- print "Test 12 - good cryptoID" |
- connection = connect() |
- connection.handshakeClientCert() |
- assert(isinstance(connection.session.serverCertChain, CertChain)) |
- assert(connection.session.serverCertChain.validate()) |
- connection.close() |
- connection.sock.close() |
- |
- print "Test 13 - cryptoID faults" |
- for fault in Fault.clientNoAuthFaults + Fault.genericFaults: |
- connection = connect() |
- connection.fault = fault |
- try: |
- connection.handshakeClientCert() |
- print " Good Fault %s" % (Fault.faultNames[fault]) |
- except TLSFaultError, e: |
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) |
- badFault = True |
- connection.sock.close() |
- |
- print "Test 14 - good mutual X509" |
+ print("Test 14 - good mutual X509") |
x509Cert = X509().parse(open(os.path.join(dir, "clientX509Cert.pem")).read()) |
x509Chain = X509CertChain([x509Cert]) |
s = open(os.path.join(dir, "clientX509Key.pem")).read() |
@@ -199,80 +210,58 @@ def clientTest(address, dir): |
connection = connect() |
connection.handshakeClientCert(x509Chain, x509Key) |
+ testConnClient(connection) |
assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
connection.close() |
- connection.sock.close() |
- print "Test 14.a - good mutual X509, SSLv3" |
+ print("Test 14.a - good mutual X509, SSLv3") |
connection = connect() |
settings = HandshakeSettings() |
settings.minVersion = (3,0) |
settings.maxVersion = (3,0) |
connection.handshakeClientCert(x509Chain, x509Key, settings=settings) |
+ testConnClient(connection) |
assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
connection.close() |
- connection.sock.close() |
- print "Test 15 - mutual X.509 faults" |
+ print("Test 15 - mutual X.509 faults") |
for fault in Fault.clientCertFaults + Fault.genericFaults: |
connection = connect() |
connection.fault = fault |
try: |
connection.handshakeClientCert(x509Chain, x509Key) |
- print " Good Fault %s" % (Fault.faultNames[fault]) |
- except TLSFaultError, e: |
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) |
+ print(" Good Fault %s" % (Fault.faultNames[fault])) |
+ except TLSFaultError as e: |
+ print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) |
badFault = True |
- connection.sock.close() |
- |
- if cryptoIDlibLoaded: |
- print "Test 16 - good mutual cryptoID" |
- cryptoIDChain = CertChain().parse(open(os.path.join(dir, "serverCryptoIDChain.xml"), "r").read()) |
- cryptoIDKey = parseXMLKey(open(os.path.join(dir, "serverCryptoIDKey.xml"), "r").read(), private=True) |
- |
- connection = connect() |
- connection.handshakeClientCert(cryptoIDChain, cryptoIDKey) |
- assert(isinstance(connection.session.serverCertChain, CertChain)) |
- assert(connection.session.serverCertChain.validate()) |
- connection.close() |
- connection.sock.close() |
- print "Test 17 - mutual cryptoID faults" |
- for fault in Fault.clientCertFaults + Fault.genericFaults: |
- connection = connect() |
- connection.fault = fault |
- try: |
- connection.handshakeClientCert(cryptoIDChain, cryptoIDKey) |
- print " Good Fault %s" % (Fault.faultNames[fault]) |
- except TLSFaultError, e: |
- print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)) |
- badFault = True |
- connection.sock.close() |
- |
- print "Test 18 - good SRP, prepare to resume..." |
+ print("Test 18 - good SRP, prepare to resume... (plus SNI)") |
connection = connect() |
- connection.handshakeClientSRP("test", "password") |
+ connection.handshakeClientSRP("test", "password", serverName=address[0]) |
+ testConnClient(connection) |
connection.close() |
- connection.sock.close() |
session = connection.session |
- print "Test 19 - resumption" |
+ print("Test 19 - resumption (plus SNI)") |
connection = connect() |
- connection.handshakeClientSRP("test", "garbage", session=session) |
+ connection.handshakeClientSRP("test", "garbage", serverName=address[0], |
+ session=session) |
+ testConnClient(connection) |
#Don't close! -- see below |
- print "Test 20 - invalidated resumption" |
+ print("Test 20 - invalidated resumption (plus SNI)") |
connection.sock.close() #Close the socket without a close_notify! |
connection = connect() |
try: |
- connection.handshakeClientSRP("test", "garbage", session=session) |
- assert() |
- except TLSRemoteAlert, alert: |
+ connection.handshakeClientSRP("test", "garbage", |
+ serverName=address[0], session=session) |
+ assert(False) |
+ except TLSRemoteAlert as alert: |
if alert.description != AlertDescription.bad_record_mac: |
raise |
- connection.sock.close() |
- |
- print "Test 21 - HTTPS test X.509" |
+ connection.close() |
+ |
+ print("Test 21 - HTTPS test X.509") |
address = address[0], address[1]+1 |
if hasattr(socket, "timeout"): |
timeoutEx = socket.timeout |
@@ -281,136 +270,183 @@ def clientTest(address, dir): |
while 1: |
try: |
time.sleep(2) |
- htmlBody = open(os.path.join(dir, "index.html")).read() |
+ htmlBody = bytearray(open(os.path.join(dir, "index.html")).read(), "utf-8") |
fingerprint = None |
for y in range(2): |
+ checker =Checker(x509Fingerprint=fingerprint) |
h = HTTPTLSConnection(\ |
- address[0], address[1], x509Fingerprint=fingerprint) |
+ address[0], address[1], checker=checker) |
for x in range(3): |
h.request("GET", "/index.html") |
r = h.getresponse() |
assert(r.status == 200) |
- s = r.read() |
- assert(s == htmlBody) |
+ b = bytearray(r.read()) |
+ assert(b == htmlBody) |
fingerprint = h.tlsSession.serverCertChain.getFingerprint() |
assert(fingerprint) |
time.sleep(2) |
break |
except timeoutEx: |
- print "timeout, retrying..." |
+ print("timeout, retrying...") |
pass |
- if cryptoIDlibLoaded: |
- print "Test 21a - HTTPS test SRP+cryptoID" |
- address = address[0], address[1]+1 |
- if hasattr(socket, "timeout"): |
- timeoutEx = socket.timeout |
- else: |
- timeoutEx = socket.error |
- while 1: |
- try: |
- time.sleep(2) #Time to generate key and cryptoID |
- htmlBody = open(os.path.join(dir, "index.html")).read() |
- fingerprint = None |
- protocol = None |
- for y in range(2): |
- h = HTTPTLSConnection(\ |
- address[0], address[1], |
- username="test", password="password", |
- cryptoID=fingerprint, protocol=protocol) |
- for x in range(3): |
- h.request("GET", "/index.html") |
- r = h.getresponse() |
- assert(r.status == 200) |
- s = r.read() |
- assert(s == htmlBody) |
- fingerprint = h.tlsSession.serverCertChain.cryptoID |
- assert(fingerprint) |
- protocol = "urn:whatever" |
- time.sleep(2) |
- break |
- except timeoutEx: |
- print "timeout, retrying..." |
- pass |
- |
address = address[0], address[1]+1 |
implementations = [] |
- if cryptlibpyLoaded: |
- implementations.append("cryptlib") |
if m2cryptoLoaded: |
implementations.append("openssl") |
if pycryptoLoaded: |
implementations.append("pycrypto") |
implementations.append("python") |
- print "Test 22 - different ciphers" |
+ print("Test 22 - different ciphers, TLSv1.0") |
for implementation in implementations: |
for cipher in ["aes128", "aes256", "rc4"]: |
- print "Test 22:", |
+ print("Test 22:", end=' ') |
connection = connect() |
settings = HandshakeSettings() |
settings.cipherNames = [cipher] |
settings.cipherImplementations = [implementation, "python"] |
- connection.handshakeClientSharedKey("shared", "key", settings=settings) |
- print ("%s %s" % (connection.getCipherName(), connection.getCipherImplementation())) |
- |
- connection.write("hello") |
- h = connection.read(min=5, max=5) |
- assert(h == "hello") |
+ settings.minVersion = (3,1) |
+ settings.maxVersion = (3,1) |
+ connection.handshakeClientCert(settings=settings) |
+ testConnClient(connection) |
+ print("%s %s" % (connection.getCipherName(), connection.getCipherImplementation())) |
connection.close() |
- connection.sock.close() |
- print "Test 23 - throughput test" |
+ print("Test 23 - throughput test") |
for implementation in implementations: |
for cipher in ["aes128", "aes256", "3des", "rc4"]: |
- if cipher == "3des" and implementation not in ("openssl", "cryptlib", "pycrypto"): |
+ if cipher == "3des" and implementation not in ("openssl", "pycrypto"): |
continue |
- print "Test 23:", |
+ print("Test 23:", end=' ') |
connection = connect() |
settings = HandshakeSettings() |
settings.cipherNames = [cipher] |
settings.cipherImplementations = [implementation, "python"] |
- connection.handshakeClientSharedKey("shared", "key", settings=settings) |
- print ("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation())), |
+ connection.handshakeClientCert(settings=settings) |
+ print("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation()), end=' ') |
startTime = time.clock() |
- connection.write("hello"*10000) |
+ connection.write(b"hello"*10000) |
h = connection.read(min=50000, max=50000) |
stopTime = time.clock() |
- print "100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime-startTime)) |
+ if stopTime-startTime: |
+ print("100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime-startTime))) |
+ else: |
+ print("100K exchanged very fast") |
- assert(h == "hello"*10000) |
+ assert(h == b"hello"*10000) |
connection.close() |
- connection.sock.close() |
+ |
+ print("Test 24.a - Next-Protocol Client Negotiation") |
+ connection = connect() |
+ connection.handshakeClientCert(nextProtos=[b"http/1.1"]) |
+ #print(" Next-Protocol Negotiated: %s" % connection.next_proto) |
+ assert(connection.next_proto == b'http/1.1') |
+ connection.close() |
+ |
+ print("Test 24.b - Next-Protocol Client Negotiation") |
+ connection = connect() |
+ connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"]) |
+ #print(" Next-Protocol Negotiated: %s" % connection.next_proto) |
+ assert(connection.next_proto == b'spdy/2') |
+ connection.close() |
+ |
+ print("Test 24.c - Next-Protocol Client Negotiation") |
+ connection = connect() |
+ connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"]) |
+ #print(" Next-Protocol Negotiated: %s" % connection.next_proto) |
+ assert(connection.next_proto == b'spdy/2') |
+ connection.close() |
+ |
+ print("Test 24.d - Next-Protocol Client Negotiation") |
+ connection = connect() |
+ connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"]) |
+ #print(" Next-Protocol Negotiated: %s" % connection.next_proto) |
+ assert(connection.next_proto == b'spdy/2') |
+ connection.close() |
+ |
+ print("Test 24.e - Next-Protocol Client Negotiation") |
+ connection = connect() |
+ connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"]) |
+ #print(" Next-Protocol Negotiated: %s" % connection.next_proto) |
+ assert(connection.next_proto == b'spdy/3') |
+ connection.close() |
- print "Test 24 - Internet servers test" |
+ print("Test 24.f - Next-Protocol Client Negotiation") |
+ connection = connect() |
+ connection.handshakeClientCert(nextProtos=[b"http/1.1"]) |
+ #print(" Next-Protocol Negotiated: %s" % connection.next_proto) |
+ assert(connection.next_proto == b'http/1.1') |
+ connection.close() |
+ |
+ print("Test 24.g - Next-Protocol Client Negotiation") |
+ connection = connect() |
+ connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"]) |
+ #print(" Next-Protocol Negotiated: %s" % connection.next_proto) |
+ assert(connection.next_proto == b'spdy/2') |
+ connection.close() |
+ |
+ print('Test 25 - good standard XMLRPC https client') |
+ time.sleep(2) # Hack for lack of ability to set timeout here |
+ address = address[0], address[1]+1 |
+ server = xmlrpclib.Server('https://%s:%s' % address) |
+ assert server.add(1,2) == 3 |
+ assert server.pow(2,4) == 16 |
+ |
+ print('Test 26 - good tlslite XMLRPC client') |
+ transport = XMLRPCTransport(ignoreAbruptClose=True) |
+ server = xmlrpclib.Server('https://%s:%s' % address, transport) |
+ assert server.add(1,2) == 3 |
+ assert server.pow(2,4) == 16 |
+ |
+ print('Test 27 - good XMLRPC ignored protocol') |
+ server = xmlrpclib.Server('http://%s:%s' % address, transport) |
+ assert server.add(1,2) == 3 |
+ assert server.pow(2,4) == 16 |
+ |
+ print("Test 28 - Internet servers test") |
try: |
i = IMAP4_TLS("cyrus.andrew.cmu.edu") |
i.login("anonymous", "anonymous@anonymous.net") |
i.logout() |
- print "Test 24: IMAP4 good" |
+ print("Test 28: IMAP4 good") |
p = POP3_TLS("pop.gmail.com") |
p.quit() |
- print "Test 24: POP3 good" |
- except socket.error, e: |
- print "Non-critical error: socket error trying to reach internet server: ", e |
+ print("Test 29: POP3 good") |
+ except socket.error as e: |
+ print("Non-critical error: socket error trying to reach internet server: ", e) |
if not badFault: |
- print "Test succeeded" |
+ print("Test succeeded") |
else: |
- print "Test failed" |
+ print("Test failed") |
+ |
+ |
+def testConnServer(connection): |
+ count = 0 |
+ while 1: |
+ s = connection.read() |
+ count += len(s) |
+ if len(s) == 0: |
+ break |
+ connection.write(s) |
+ if count == 1111: |
+ break |
+ |
+def serverTestCmd(argv): |
-def serverTest(address, dir): |
+ address = argv[0] |
+ dir = argv[1] |
+ |
#Split address into hostname/port tuple |
address = address.split(":") |
- if len(address)==1: |
- address.append("4443") |
address = ( address[0], int(address[1]) ) |
#Connect to server |
@@ -421,29 +457,67 @@ def serverTest(address, dir): |
def connect(): |
return TLSConnection(lsock.accept()[0]) |
- print "Test 1 - good shared key" |
- sharedKeyDB = SharedKeyDB() |
- sharedKeyDB["shared"] = "key" |
- sharedKeyDB["shared2"] = "key2" |
+ x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read()) |
+ x509Chain = X509CertChain([x509Cert]) |
+ s = open(os.path.join(dir, "serverX509Key.pem")).read() |
+ x509Key = parsePEMKey(s, private=True) |
+ |
+ print("Test 0 - Anonymous server handshake") |
+ connection = connect() |
+ connection.handshakeServer(anon=True) |
+ testConnServer(connection) |
+ connection.close() |
+ |
+ print("Test 1 - good X.509") |
connection = connect() |
- connection.handshakeServer(sharedKeyDB=sharedKeyDB) |
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key) |
+ assert(connection.session.serverName == address[0]) |
+ testConnServer(connection) |
connection.close() |
- connection.sock.close() |
- print "Test 2 - shared key faults" |
- for fault in Fault.clientSharedKeyFaults + Fault.genericFaults: |
+ print("Test 1.a - good X.509, SSL v3") |
+ connection = connect() |
+ settings = HandshakeSettings() |
+ settings.minVersion = (3,0) |
+ settings.maxVersion = (3,0) |
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) |
+ testConnServer(connection) |
+ connection.close() |
+ |
+ print("Test 1.b - good X.509, RC4-MD5") |
+ connection = connect() |
+ settings = HandshakeSettings() |
+ settings.macNames = ["sha", "md5"] |
+ settings.cipherNames = ["rc4"] |
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) |
+ testConnServer(connection) |
+ connection.close() |
+ |
+ if tackpyLoaded: |
+ tack = Tack.createFromPem(open("./TACK1.pem", "rU").read()) |
+ tackUnrelated = Tack.createFromPem(open("./TACKunrelated.pem", "rU").read()) |
+ |
+ settings = HandshakeSettings() |
+ settings.useExperimentalTackExtension = True |
+ |
+ print("Test 2.a - good X.509, TACK") |
connection = connect() |
- connection.fault = fault |
- try: |
- connection.handshakeServer(sharedKeyDB=sharedKeyDB) |
- assert() |
- except: |
- pass |
- connection.sock.close() |
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
+ tacks=[tack], activationFlags=1, settings=settings) |
+ testConnServer(connection) |
+ connection.close() |
- print "Test 3 - good SRP" |
- #verifierDB = tlslite.VerifierDB(os.path.join(dir, "verifierDB")) |
- #verifierDB.open() |
+ print("Test 2.b - good X.509, TACK unrelated to cert chain") |
+ connection = connect() |
+ try: |
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
+ tacks=[tackUnrelated], settings=settings) |
+ assert(False) |
+ except TLSRemoteAlert as alert: |
+ if alert.description != AlertDescription.illegal_parameter: |
+ raise |
+ |
+ print("Test 3 - good SRP") |
verifierDB = VerifierDB() |
verifierDB.create() |
entry = VerifierDB.makeVerifier("test", "password", 1536) |
@@ -451,10 +525,10 @@ def serverTest(address, dir): |
connection = connect() |
connection.handshakeServer(verifierDB=verifierDB) |
+ testConnServer(connection) |
connection.close() |
- connection.sock.close() |
- print "Test 4 - SRP faults" |
+ print("Test 4 - SRP faults") |
for fault in Fault.clientSrpFaults + Fault.genericFaults: |
connection = connect() |
connection.fault = fault |
@@ -463,27 +537,16 @@ def serverTest(address, dir): |
assert() |
except: |
pass |
- connection.sock.close() |
- |
- print "Test 5 - good SRP: unknown_srp_username idiom" |
- connection = connect() |
- connection.handshakeServer(verifierDB=verifierDB) |
- connection.close() |
- connection.sock.close() |
- |
- print "Test 6 - good SRP: with X.509 cert" |
- x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read()) |
- x509Chain = X509CertChain([x509Cert]) |
- s = open(os.path.join(dir, "serverX509Key.pem")).read() |
- x509Key = parsePEMKey(s, private=True) |
+ connection.close() |
+ print("Test 6 - good SRP: with X.509 cert") |
connection = connect() |
connection.handshakeServer(verifierDB=verifierDB, \ |
certChain=x509Chain, privateKey=x509Key) |
+ testConnServer(connection) |
connection.close() |
- connection.sock.close() |
- print "Test 7 - X.509 with SRP faults" |
+ print("Test 7 - X.509 with SRP faults") |
for fault in Fault.clientSrpFaults + Fault.genericFaults: |
connection = connect() |
connection.fault = fault |
@@ -493,46 +556,9 @@ def serverTest(address, dir): |
assert() |
except: |
pass |
- connection.sock.close() |
- |
- if cryptoIDlibLoaded: |
- print "Test 8 - good SRP: with cryptoID certs" |
- cryptoIDChain = CertChain().parse(open(os.path.join(dir, "serverCryptoIDChain.xml"), "r").read()) |
- cryptoIDKey = parseXMLKey(open(os.path.join(dir, "serverCryptoIDKey.xml"), "r").read(), private=True) |
- connection = connect() |
- connection.handshakeServer(verifierDB=verifierDB, \ |
- certChain=cryptoIDChain, privateKey=cryptoIDKey) |
connection.close() |
- connection.sock.close() |
- print "Test 9 - cryptoID with SRP faults" |
- for fault in Fault.clientSrpFaults + Fault.genericFaults: |
- connection = connect() |
- connection.fault = fault |
- try: |
- connection.handshakeServer(verifierDB=verifierDB, \ |
- certChain=cryptoIDChain, privateKey=cryptoIDKey) |
- assert() |
- except: |
- pass |
- connection.sock.close() |
- |
- print "Test 10 - good X.509" |
- connection = connect() |
- connection.handshakeServer(certChain=x509Chain, privateKey=x509Key) |
- connection.close() |
- connection.sock.close() |
- |
- print "Test 10.a - good X.509, SSL v3" |
- connection = connect() |
- settings = HandshakeSettings() |
- settings.minVersion = (3,0) |
- settings.maxVersion = (3,0) |
- connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) |
- connection.close() |
- connection.sock.close() |
- |
- print "Test 11 - X.509 faults" |
+ print("Test 11 - X.509 faults") |
for fault in Fault.clientNoAuthFaults + Fault.genericFaults: |
connection = connect() |
connection.fault = fault |
@@ -541,44 +567,26 @@ def serverTest(address, dir): |
assert() |
except: |
pass |
- connection.sock.close() |
- |
- if cryptoIDlibLoaded: |
- print "Test 12 - good cryptoID" |
- connection = connect() |
- connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey) |
connection.close() |
- connection.sock.close() |
- print "Test 13 - cryptoID faults" |
- for fault in Fault.clientNoAuthFaults + Fault.genericFaults: |
- connection = connect() |
- connection.fault = fault |
- try: |
- connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey) |
- assert() |
- except: |
- pass |
- connection.sock.close() |
- |
- print "Test 14 - good mutual X.509" |
+ print("Test 14 - good mutual X.509") |
connection = connect() |
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True) |
+ testConnServer(connection) |
assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
connection.close() |
- connection.sock.close() |
- print "Test 14a - good mutual X.509, SSLv3" |
+ print("Test 14a - good mutual X.509, SSLv3") |
connection = connect() |
settings = HandshakeSettings() |
settings.minVersion = (3,0) |
settings.maxVersion = (3,0) |
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings) |
+ testConnServer(connection) |
assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
connection.close() |
- connection.sock.close() |
- print "Test 15 - mutual X.509 faults" |
+ print("Test 15 - mutual X.509 faults") |
for fault in Fault.clientCertFaults + Fault.genericFaults: |
connection = connect() |
connection.fault = fault |
@@ -587,488 +595,199 @@ def serverTest(address, dir): |
assert() |
except: |
pass |
- connection.sock.close() |
- |
- if cryptoIDlibLoaded: |
- print "Test 16 - good mutual cryptoID" |
- connection = connect() |
- connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey, reqCert=True) |
- assert(isinstance(connection.session.serverCertChain, CertChain)) |
- assert(connection.session.serverCertChain.validate()) |
connection.close() |
- connection.sock.close() |
- print "Test 17 - mutual cryptoID faults" |
- for fault in Fault.clientCertFaults + Fault.genericFaults: |
- connection = connect() |
- connection.fault = fault |
- try: |
- connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey, reqCert=True) |
- assert() |
- except: |
- pass |
- connection.sock.close() |
- |
- print "Test 18 - good SRP, prepare to resume" |
+ print("Test 18 - good SRP, prepare to resume") |
sessionCache = SessionCache() |
connection = connect() |
connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) |
+ assert(connection.session.serverName == address[0]) |
+ testConnServer(connection) |
connection.close() |
- connection.sock.close() |
- print "Test 19 - resumption" |
+ print("Test 19 - resumption") |
connection = connect() |
connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) |
+ assert(connection.session.serverName == address[0]) |
+ testConnServer(connection) |
#Don't close! -- see next test |
- print "Test 20 - invalidated resumption" |
+ print("Test 20 - invalidated resumption") |
try: |
connection.read(min=1, max=1) |
assert() #Client is going to close the socket without a close_notify |
- except TLSAbruptCloseError, e: |
+ except TLSAbruptCloseError as e: |
pass |
connection = connect() |
try: |
connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) |
- except TLSLocalAlert, alert: |
+ except TLSLocalAlert as alert: |
if alert.description != AlertDescription.bad_record_mac: |
raise |
- connection.sock.close() |
+ connection.close() |
- print "Test 21 - HTTPS test X.509" |
+ print("Test 21 - HTTPS test X.509") |
#Close the current listening socket |
lsock.close() |
#Create and run an HTTP Server using TLSSocketServerMixIn |
class MyHTTPServer(TLSSocketServerMixIn, |
- BaseHTTPServer.HTTPServer): |
+ HTTPServer): |
def handshake(self, tlsConnection): |
tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key) |
return True |
cd = os.getcwd() |
os.chdir(dir) |
address = address[0], address[1]+1 |
- httpd = MyHTTPServer(address, SimpleHTTPServer.SimpleHTTPRequestHandler) |
+ httpd = MyHTTPServer(address, SimpleHTTPRequestHandler) |
for x in range(6): |
httpd.handle_request() |
httpd.server_close() |
cd = os.chdir(cd) |
- if cryptoIDlibLoaded: |
- print "Test 21a - HTTPS test SRP+cryptoID" |
- |
- #Create and run an HTTP Server using TLSSocketServerMixIn |
- class MyHTTPServer(TLSSocketServerMixIn, |
- BaseHTTPServer.HTTPServer): |
- def handshake(self, tlsConnection): |
- tlsConnection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey, |
- verifierDB=verifierDB) |
- return True |
- cd = os.getcwd() |
- os.chdir(dir) |
- address = address[0], address[1]+1 |
- httpd = MyHTTPServer(address, SimpleHTTPServer.SimpleHTTPRequestHandler) |
- for x in range(6): |
- httpd.handle_request() |
- httpd.server_close() |
- cd = os.chdir(cd) |
- |
#Re-connect the listening socket |
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
address = address[0], address[1]+1 |
lsock.bind(address) |
lsock.listen(5) |
- def connect(): |
- return TLSConnection(lsock.accept()[0]) |
- |
implementations = [] |
- if cryptlibpyLoaded: |
- implementations.append("cryptlib") |
if m2cryptoLoaded: |
implementations.append("openssl") |
if pycryptoLoaded: |
implementations.append("pycrypto") |
implementations.append("python") |
- print "Test 22 - different ciphers" |
+ print("Test 22 - different ciphers") |
for implementation in ["python"] * len(implementations): |
for cipher in ["aes128", "aes256", "rc4"]: |
- print "Test 22:", |
+ print("Test 22:", end=' ') |
connection = connect() |
settings = HandshakeSettings() |
settings.cipherNames = [cipher] |
settings.cipherImplementations = [implementation, "python"] |
- connection.handshakeServer(sharedKeyDB=sharedKeyDB, settings=settings) |
- print connection.getCipherName(), connection.getCipherImplementation() |
- h = connection.read(min=5, max=5) |
- assert(h == "hello") |
- connection.write(h) |
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
+ settings=settings) |
+ print(connection.getCipherName(), connection.getCipherImplementation()) |
+ testConnServer(connection) |
connection.close() |
- connection.sock.close() |
- print "Test 23 - throughput test" |
+ print("Test 23 - throughput test") |
for implementation in implementations: |
for cipher in ["aes128", "aes256", "3des", "rc4"]: |
- if cipher == "3des" and implementation not in ("openssl", "cryptlib", "pycrypto"): |
+ if cipher == "3des" and implementation not in ("openssl", "pycrypto"): |
continue |
- print "Test 23:", |
+ print("Test 23:", end=' ') |
connection = connect() |
settings = HandshakeSettings() |
settings.cipherNames = [cipher] |
settings.cipherImplementations = [implementation, "python"] |
- connection.handshakeServer(sharedKeyDB=sharedKeyDB, settings=settings) |
- print connection.getCipherName(), connection.getCipherImplementation() |
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
+ settings=settings) |
+ print(connection.getCipherName(), connection.getCipherImplementation()) |
h = connection.read(min=50000, max=50000) |
- assert(h == "hello"*10000) |
+ assert(h == b"hello"*10000) |
connection.write(h) |
connection.close() |
- connection.sock.close() |
- |
- print "Test succeeded" |
- |
+ print("Test 24.a - Next-Protocol Server Negotiation") |
+ connection = connect() |
+ settings = HandshakeSettings() |
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
+ settings=settings, nextProtos=[b"http/1.1"]) |
+ testConnServer(connection) |
+ connection.close() |
+ print("Test 24.b - Next-Protocol Server Negotiation") |
+ connection = connect() |
+ settings = HandshakeSettings() |
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
+ settings=settings, nextProtos=[b"spdy/2", b"http/1.1"]) |
+ testConnServer(connection) |
+ connection.close() |
+ |
+ print("Test 24.c - Next-Protocol Server Negotiation") |
+ connection = connect() |
+ settings = HandshakeSettings() |
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
+ settings=settings, nextProtos=[b"http/1.1", b"spdy/2"]) |
+ testConnServer(connection) |
+ connection.close() |
+ print("Test 24.d - Next-Protocol Server Negotiation") |
+ connection = connect() |
+ settings = HandshakeSettings() |
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
+ settings=settings, nextProtos=[b"spdy/2", b"http/1.1"]) |
+ testConnServer(connection) |
+ connection.close() |
+ |
+ print("Test 24.e - Next-Protocol Server Negotiation") |
+ connection = connect() |
+ settings = HandshakeSettings() |
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
+ settings=settings, nextProtos=[b"http/1.1", b"spdy/2", b"spdy/3"]) |
+ testConnServer(connection) |
+ connection.close() |
+ |
+ print("Test 24.f - Next-Protocol Server Negotiation") |
+ connection = connect() |
+ settings = HandshakeSettings() |
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
+ settings=settings, nextProtos=[b"spdy/3", b"spdy/2"]) |
+ testConnServer(connection) |
+ connection.close() |
+ |
+ print("Test 24.g - Next-Protocol Server Negotiation") |
+ connection = connect() |
+ settings = HandshakeSettings() |
+ connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
+ settings=settings, nextProtos=[]) |
+ testConnServer(connection) |
+ connection.close() |
+ print("Tests 25-27 - XMLRPXC server") |
+ address = address[0], address[1]+1 |
+ class Server(TLSXMLRPCServer): |
- |
- |
- |
- |
- |
- |
-if len(sys.argv) == 1 or (len(sys.argv)==2 and sys.argv[1].lower().endswith("help")): |
- print "" |
- print "Version: 0.3.8" |
- print "" |
- print "RNG: %s" % prngName |
- print "" |
- print "Modules:" |
- if cryptlibpyLoaded: |
- print " cryptlib_py : Loaded" |
- else: |
- print " cryptlib_py : Not Loaded" |
- if m2cryptoLoaded: |
- print " M2Crypto : Loaded" |
- else: |
- print " M2Crypto : Not Loaded" |
- if pycryptoLoaded: |
- print " pycrypto : Loaded" |
- else: |
- print " pycrypto : Not Loaded" |
- if gmpyLoaded: |
- print " GMPY : Loaded" |
- else: |
- print " GMPY : Not Loaded" |
- if cryptoIDlibLoaded: |
- print " cryptoIDlib : Loaded" |
- else: |
- print " cryptoIDlib : Not Loaded" |
- print "" |
- print "Commands:" |
- print "" |
- print " clientcert <server> [<chain> <key>]" |
- print " clientsharedkey <server> <user> <pass>" |
- print " clientsrp <server> <user> <pass>" |
- print " clienttest <server> <dir>" |
- print "" |
- print " serversrp <server> <verifierDB>" |
- print " servercert <server> <chain> <key> [req]" |
- print " serversrpcert <server> <verifierDB> <chain> <key>" |
- print " serversharedkey <server> <sharedkeyDB>" |
- print " servertest <server> <dir>" |
- sys.exit() |
- |
-cmd = sys.argv[1].lower() |
- |
-class Args: |
- def __init__(self, argv): |
- self.argv = argv |
- def get(self, index): |
- if len(self.argv)<=index: |
- raise SyntaxError("Not enough arguments") |
- return self.argv[index] |
- def getLast(self, index): |
- if len(self.argv)>index+1: |
- raise SyntaxError("Too many arguments") |
- return self.get(index) |
- |
-args = Args(sys.argv) |
- |
-def reformatDocString(s): |
- lines = s.splitlines() |
- newLines = [] |
- for line in lines: |
- newLines.append(" " + line.strip()) |
- return "\n".join(newLines) |
- |
-try: |
- if cmd == "clienttest": |
- address = args.get(2) |
- dir = args.getLast(3) |
- clientTest(address, dir) |
- sys.exit() |
- |
- elif cmd.startswith("client"): |
- address = args.get(2) |
- |
- #Split address into hostname/port tuple |
- address = address.split(":") |
- if len(address)==1: |
- address.append("4443") |
- address = ( address[0], int(address[1]) ) |
- |
- def connect(): |
- #Connect to server |
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
- if hasattr(sock, "settimeout"): |
- sock.settimeout(5) |
- sock.connect(address) |
- |
- #Instantiate TLSConnections |
- return TLSConnection(sock) |
- |
- try: |
- if cmd == "clientsrp": |
- username = args.get(3) |
- password = args.getLast(4) |
- connection = connect() |
- start = time.clock() |
- connection.handshakeClientSRP(username, password) |
- elif cmd == "clientsharedkey": |
- username = args.get(3) |
- password = args.getLast(4) |
- connection = connect() |
- start = time.clock() |
- connection.handshakeClientSharedKey(username, password) |
- elif cmd == "clientcert": |
- certChain = None |
- privateKey = None |
- if len(sys.argv) > 3: |
- certFilename = args.get(3) |
- keyFilename = args.getLast(4) |
- |
- s1 = open(certFilename, "rb").read() |
- s2 = open(keyFilename, "rb").read() |
- |
- #Try to create cryptoID cert chain |
- if cryptoIDlibLoaded: |
- try: |
- certChain = CertChain().parse(s1) |
- privateKey = parsePrivateKey(s2) |
- except: |
- certChain = None |
- privateKey = None |
- |
- #Try to create X.509 cert chain |
- if not certChain: |
- x509 = X509() |
- x509.parse(s1) |
- certChain = X509CertChain([x509]) |
- privateKey = parsePrivateKey(s2) |
- |
- connection = connect() |
- start = time.clock() |
- connection.handshakeClientCert(certChain, privateKey) |
- else: |
- raise SyntaxError("Unknown command") |
- |
- except TLSLocalAlert, a: |
- if a.description == AlertDescription.bad_record_mac: |
- if cmd == "clientsharedkey": |
- print "Bad sharedkey password" |
- else: |
- raise |
- elif a.description == AlertDescription.user_canceled: |
- print str(a) |
- else: |
- raise |
- sys.exit() |
- except TLSRemoteAlert, a: |
- if a.description == AlertDescription.unknown_srp_username: |
- if cmd == "clientsrp": |
- print "Unknown username" |
- else: |
- raise |
- elif a.description == AlertDescription.bad_record_mac: |
- if cmd == "clientsrp": |
- print "Bad username or password" |
- else: |
- raise |
- elif a.description == AlertDescription.handshake_failure: |
- print "Unable to negotiate mutually acceptable parameters" |
- else: |
- raise |
- sys.exit() |
- |
- stop = time.clock() |
- print "Handshake success" |
- print " Handshake time: %.4f seconds" % (stop - start) |
- print " Version: %s.%s" % connection.version |
- print " Cipher: %s %s" % (connection.getCipherName(), connection.getCipherImplementation()) |
- if connection.session.srpUsername: |
- print " Client SRP username: %s" % connection.session.srpUsername |
- if connection.session.sharedKeyUsername: |
- print " Client shared key username: %s" % connection.session.sharedKeyUsername |
- if connection.session.clientCertChain: |
- print " Client fingerprint: %s" % connection.session.clientCertChain.getFingerprint() |
- if connection.session.serverCertChain: |
- print " Server fingerprint: %s" % connection.session.serverCertChain.getFingerprint() |
- connection.close() |
- connection.sock.close() |
- |
- elif cmd.startswith("server"): |
- address = args.get(2) |
- |
- #Split address into hostname/port tuple |
- address = address.split(":") |
- if len(address)==1: |
- address.append("4443") |
- address = ( address[0], int(address[1]) ) |
- |
- verifierDBFilename = None |
- sharedKeyDBFilename = None |
- certFilename = None |
- keyFilename = None |
- sharedKeyDB = None |
- reqCert = False |
- |
- if cmd == "serversrp": |
- verifierDBFilename = args.getLast(3) |
- elif cmd == "servercert": |
- certFilename = args.get(3) |
- keyFilename = args.get(4) |
- if len(sys.argv)>=6: |
- req = args.getLast(5) |
- if req.lower() != "req": |
- raise SyntaxError() |
- reqCert = True |
- elif cmd == "serversrpcert": |
- verifierDBFilename = args.get(3) |
- certFilename = args.get(4) |
- keyFilename = args.getLast(5) |
- elif cmd == "serversharedkey": |
- sharedKeyDBFilename = args.getLast(3) |
- elif cmd == "servertest": |
- address = args.get(2) |
- dir = args.getLast(3) |
- serverTest(address, dir) |
- sys.exit() |
- |
- verifierDB = None |
- if verifierDBFilename: |
- verifierDB = VerifierDB(verifierDBFilename) |
- verifierDB.open() |
- |
- sharedKeyDB = None |
- if sharedKeyDBFilename: |
- sharedKeyDB = SharedKeyDB(sharedKeyDBFilename) |
- sharedKeyDB.open() |
- |
- certChain = None |
- privateKey = None |
- if certFilename: |
- s1 = open(certFilename, "rb").read() |
- s2 = open(keyFilename, "rb").read() |
- |
- #Try to create cryptoID cert chain |
- if cryptoIDlibLoaded: |
- try: |
- certChain = CertChain().parse(s1) |
- privateKey = parsePrivateKey(s2) |
- except: |
- certChain = None |
- privateKey = None |
- |
- #Try to create X.509 cert chain |
- if not certChain: |
- x509 = X509() |
- x509.parse(s1) |
- certChain = X509CertChain([x509]) |
- privateKey = parsePrivateKey(s2) |
- |
- |
- |
- #Create handler function - performs handshake, then echos all bytes received |
- def handler(sock): |
- try: |
- connection = TLSConnection(sock) |
- settings = HandshakeSettings() |
- connection.handshakeServer(sharedKeyDB=sharedKeyDB, verifierDB=verifierDB, \ |
- certChain=certChain, privateKey=privateKey, \ |
- reqCert=reqCert, settings=settings) |
- print "Handshake success" |
- print " Version: %s.%s" % connection.version |
- print " Cipher: %s %s" % (connection.getCipherName(), connection.getCipherImplementation()) |
- if connection.session.srpUsername: |
- print " Client SRP username: %s" % connection.session.srpUsername |
- if connection.session.sharedKeyUsername: |
- print " Client shared key username: %s" % connection.session.sharedKeyUsername |
- if connection.session.clientCertChain: |
- print " Client fingerprint: %s" % connection.session.clientCertChain.getFingerprint() |
- if connection.session.serverCertChain: |
- print " Server fingerprint: %s" % connection.session.serverCertChain.getFingerprint() |
- |
- s = "" |
- while 1: |
- newS = connection.read() |
- if not newS: |
- break |
- s += newS |
- if s[-1]=='\n': |
- connection.write(s) |
- s = "" |
- except TLSLocalAlert, a: |
- if a.description == AlertDescription.unknown_srp_username: |
- print "Unknown SRP username" |
- elif a.description == AlertDescription.bad_record_mac: |
- if cmd == "serversrp" or cmd == "serversrpcert": |
- print "Bad SRP password for:", connection.allegedSrpUsername |
- else: |
- raise |
- elif a.description == AlertDescription.handshake_failure: |
- print "Unable to negotiate mutually acceptable parameters" |
- else: |
- raise |
- except TLSRemoteAlert, a: |
- if a.description == AlertDescription.bad_record_mac: |
- if cmd == "serversharedkey": |
- print "Bad sharedkey password for:", connection.allegedSharedKeyUsername |
- else: |
- raise |
- elif a.description == AlertDescription.user_canceled: |
- print "Handshake cancelled" |
- elif a.description == AlertDescription.handshake_failure: |
- print "Unable to negotiate mutually acceptable parameters" |
- elif a.description == AlertDescription.close_notify: |
- pass |
- else: |
- raise |
- |
- #Run multi-threaded server |
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
- sock.bind(address) |
- sock.listen(5) |
- while 1: |
- (newsock, cliAddress) = sock.accept() |
- thread.start_new_thread(handler, (newsock,)) |
- |
- |
+ def handshake(self, tlsConnection): |
+ try: |
+ tlsConnection.handshakeServer(certChain=x509Chain, |
+ privateKey=x509Key, |
+ sessionCache=sessionCache) |
+ tlsConnection.ignoreAbruptClose = True |
+ return True |
+ except TLSError as error: |
+ print("Handshake failure:", str(error)) |
+ return False |
+ |
+ class MyFuncs: |
+ def pow(self, x, y): return pow(x, y) |
+ def add(self, x, y): return x + y |
+ |
+ server = Server(address) |
+ server.register_instance(MyFuncs()) |
+ #sa = server.socket.getsockname() |
+ #print "Serving HTTPS on", sa[0], "port", sa[1] |
+ for i in range(6): |
+ server.handle_request() |
+ |
+ print("Test succeeded") |
+ |
+ |
+if __name__ == '__main__': |
+ if len(sys.argv) < 2: |
+ printUsage("Missing command") |
+ elif sys.argv[1] == "client"[:len(sys.argv[1])]: |
+ clientTestCmd(sys.argv[2:]) |
+ elif sys.argv[1] == "server"[:len(sys.argv[1])]: |
+ serverTestCmd(sys.argv[2:]) |
else: |
- print "Bad command: '%s'" % cmd |
-except TLSRemoteAlert, a: |
- print str(a) |
- raise |
- |
- |
- |
- |
- |
+ printUsage("Unknown command: %s" % sys.argv[1]) |