OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/env python |
| 2 |
| 3 # Authors: |
| 4 # Trevor Perrin |
| 5 # Kees Bos - Added tests for XML-RPC |
| 6 # Dimitris Moraitis - Anon ciphersuites |
| 7 # Marcelo Fernandez - Added test for NPN |
| 8 # Martin von Loewis - python 3 port |
| 9 |
| 10 # |
| 11 # See the LICENSE file for legal information regarding use of this file. |
| 12 from __future__ import print_function |
| 13 import sys |
| 14 import os |
| 15 import os.path |
| 16 import socket |
| 17 import time |
| 18 import getopt |
| 19 try: |
| 20 from BaseHTTPServer import HTTPServer |
| 21 from SimpleHTTPServer import SimpleHTTPRequestHandler |
| 22 except ImportError: |
| 23 from http.server import HTTPServer, SimpleHTTPRequestHandler |
| 24 |
| 25 from tlslite import TLSConnection, Fault, HandshakeSettings, \ |
| 26 X509, X509CertChain, IMAP4_TLS, VerifierDB, Session, SessionCache, \ |
| 27 parsePEMKey, constants, \ |
| 28 AlertDescription, HTTPTLSConnection, TLSSocketServerMixIn, \ |
| 29 POP3_TLS, m2cryptoLoaded, pycryptoLoaded, gmpyLoaded, tackpyLoaded, \ |
| 30 Checker, __version__ |
| 31 |
| 32 from tlslite.errors import * |
| 33 from tlslite.utils.cryptomath import prngName |
| 34 try: |
| 35 import xmlrpclib |
| 36 except ImportError: |
| 37 # Python 3 |
| 38 from xmlrpc import client as xmlrpclib |
| 39 from tlslite import * |
| 40 |
| 41 try: |
| 42 from tack.structures.Tack import Tack |
| 43 |
| 44 except ImportError: |
| 45 pass |
| 46 |
| 47 def printUsage(s=None): |
| 48 if m2cryptoLoaded: |
| 49 crypto = "M2Crypto/OpenSSL" |
| 50 else: |
| 51 crypto = "Python crypto" |
| 52 if s: |
| 53 print("ERROR: %s" % s) |
| 54 print("""\ntls.py version %s (using %s) |
| 55 |
| 56 Commands: |
| 57 server HOST:PORT DIRECTORY |
| 58 |
| 59 client HOST:PORT DIRECTORY |
| 60 """ % (__version__, crypto)) |
| 61 sys.exit(-1) |
| 62 |
| 63 |
| 64 def testConnClient(conn): |
| 65 b1 = os.urandom(1) |
| 66 b10 = os.urandom(10) |
| 67 b100 = os.urandom(100) |
| 68 b1000 = os.urandom(1000) |
| 69 conn.write(b1) |
| 70 conn.write(b10) |
| 71 conn.write(b100) |
| 72 conn.write(b1000) |
| 73 assert(conn.read(min=1, max=1) == b1) |
| 74 assert(conn.read(min=10, max=10) == b10) |
| 75 assert(conn.read(min=100, max=100) == b100) |
| 76 assert(conn.read(min=1000, max=1000) == b1000) |
| 77 |
| 78 def clientTestCmd(argv): |
| 79 |
| 80 address = argv[0] |
| 81 dir = argv[1] |
| 82 |
| 83 #Split address into hostname/port tuple |
| 84 address = address.split(":") |
| 85 address = ( address[0], int(address[1]) ) |
| 86 |
| 87 def connect(): |
| 88 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 89 if hasattr(sock, 'settimeout'): #It's a python 2.3 feature |
| 90 sock.settimeout(5) |
| 91 sock.connect(address) |
| 92 c = TLSConnection(sock) |
| 93 return c |
| 94 |
| 95 test = 0 |
| 96 |
| 97 badFault = False |
| 98 |
| 99 print("Test 0 - anonymous handshake") |
| 100 connection = connect() |
| 101 connection.handshakeClientAnonymous() |
| 102 testConnClient(connection) |
| 103 connection.close() |
| 104 |
| 105 print("Test 1 - good X509 (plus SNI)") |
| 106 connection = connect() |
| 107 connection.handshakeClientCert(serverName=address[0]) |
| 108 testConnClient(connection) |
| 109 assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| 110 assert(connection.session.serverName == address[0]) |
| 111 connection.close() |
| 112 |
| 113 print("Test 1.a - good X509, SSLv3") |
| 114 connection = connect() |
| 115 settings = HandshakeSettings() |
| 116 settings.minVersion = (3,0) |
| 117 settings.maxVersion = (3,0) |
| 118 connection.handshakeClientCert(settings=settings) |
| 119 testConnClient(connection) |
| 120 assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| 121 connection.close() |
| 122 |
| 123 print("Test 1.b - good X509, RC4-MD5") |
| 124 connection = connect() |
| 125 settings = HandshakeSettings() |
| 126 settings.macNames = ["md5"] |
| 127 connection.handshakeClientCert(settings=settings) |
| 128 testConnClient(connection) |
| 129 assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| 130 assert(connection.session.cipherSuite == constants.CipherSuite.TLS_RSA_WITH_
RC4_128_MD5) |
| 131 connection.close() |
| 132 |
| 133 if tackpyLoaded: |
| 134 |
| 135 settings = HandshakeSettings() |
| 136 settings.useExperimentalTackExtension = True |
| 137 |
| 138 print("Test 2.a - good X.509, TACK") |
| 139 connection = connect() |
| 140 connection.handshakeClientCert(settings=settings) |
| 141 assert(connection.session.tackExt.tacks[0].getTackId() == "rrted.ptvtl.d
2uiq.ox2xe.w4ss3") |
| 142 assert(connection.session.tackExt.activation_flags == 1) |
| 143 testConnClient(connection) |
| 144 connection.close() |
| 145 |
| 146 print("Test 2.b - good X.509, TACK unrelated to cert chain") |
| 147 connection = connect() |
| 148 try: |
| 149 connection.handshakeClientCert(settings=settings) |
| 150 assert(False) |
| 151 except TLSLocalAlert as alert: |
| 152 if alert.description != AlertDescription.illegal_parameter: |
| 153 raise |
| 154 connection.close() |
| 155 |
| 156 print("Test 3 - good SRP") |
| 157 connection = connect() |
| 158 connection.handshakeClientSRP("test", "password") |
| 159 testConnClient(connection) |
| 160 connection.close() |
| 161 |
| 162 print("Test 4 - SRP faults") |
| 163 for fault in Fault.clientSrpFaults + Fault.genericFaults: |
| 164 connection = connect() |
| 165 connection.fault = fault |
| 166 try: |
| 167 connection.handshakeClientSRP("test", "password") |
| 168 print(" Good Fault %s" % (Fault.faultNames[fault])) |
| 169 except TLSFaultError as e: |
| 170 print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) |
| 171 badFault = True |
| 172 |
| 173 print("Test 6 - good SRP: with X.509 certificate, TLSv1.0") |
| 174 settings = HandshakeSettings() |
| 175 settings.minVersion = (3,1) |
| 176 settings.maxVersion = (3,1) |
| 177 connection = connect() |
| 178 connection.handshakeClientSRP("test", "password", settings=settings) |
| 179 assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| 180 testConnClient(connection) |
| 181 connection.close() |
| 182 |
| 183 print("Test 7 - X.509 with SRP faults") |
| 184 for fault in Fault.clientSrpFaults + Fault.genericFaults: |
| 185 connection = connect() |
| 186 connection.fault = fault |
| 187 try: |
| 188 connection.handshakeClientSRP("test", "password") |
| 189 print(" Good Fault %s" % (Fault.faultNames[fault])) |
| 190 except TLSFaultError as e: |
| 191 print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) |
| 192 badFault = True |
| 193 |
| 194 print("Test 11 - X.509 faults") |
| 195 for fault in Fault.clientNoAuthFaults + Fault.genericFaults: |
| 196 connection = connect() |
| 197 connection.fault = fault |
| 198 try: |
| 199 connection.handshakeClientCert() |
| 200 print(" Good Fault %s" % (Fault.faultNames[fault])) |
| 201 except TLSFaultError as e: |
| 202 print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) |
| 203 badFault = True |
| 204 |
| 205 print("Test 14 - good mutual X509") |
| 206 x509Cert = X509().parse(open(os.path.join(dir, "clientX509Cert.pem")).read()
) |
| 207 x509Chain = X509CertChain([x509Cert]) |
| 208 s = open(os.path.join(dir, "clientX509Key.pem")).read() |
| 209 x509Key = parsePEMKey(s, private=True) |
| 210 |
| 211 connection = connect() |
| 212 connection.handshakeClientCert(x509Chain, x509Key) |
| 213 testConnClient(connection) |
| 214 assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| 215 connection.close() |
| 216 |
| 217 print("Test 14.a - good mutual X509, SSLv3") |
| 218 connection = connect() |
| 219 settings = HandshakeSettings() |
| 220 settings.minVersion = (3,0) |
| 221 settings.maxVersion = (3,0) |
| 222 connection.handshakeClientCert(x509Chain, x509Key, settings=settings) |
| 223 testConnClient(connection) |
| 224 assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| 225 connection.close() |
| 226 |
| 227 print("Test 15 - mutual X.509 faults") |
| 228 for fault in Fault.clientCertFaults + Fault.genericFaults: |
| 229 connection = connect() |
| 230 connection.fault = fault |
| 231 try: |
| 232 connection.handshakeClientCert(x509Chain, x509Key) |
| 233 print(" Good Fault %s" % (Fault.faultNames[fault])) |
| 234 except TLSFaultError as e: |
| 235 print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) |
| 236 badFault = True |
| 237 |
| 238 print("Test 18 - good SRP, prepare to resume... (plus SNI)") |
| 239 connection = connect() |
| 240 connection.handshakeClientSRP("test", "password", serverName=address[0]) |
| 241 testConnClient(connection) |
| 242 connection.close() |
| 243 session = connection.session |
| 244 |
| 245 print("Test 19 - resumption (plus SNI)") |
| 246 connection = connect() |
| 247 connection.handshakeClientSRP("test", "garbage", serverName=address[0], |
| 248 session=session) |
| 249 testConnClient(connection) |
| 250 #Don't close! -- see below |
| 251 |
| 252 print("Test 20 - invalidated resumption (plus SNI)") |
| 253 connection.sock.close() #Close the socket without a close_notify! |
| 254 connection = connect() |
| 255 try: |
| 256 connection.handshakeClientSRP("test", "garbage", |
| 257 serverName=address[0], session=session) |
| 258 assert(False) |
| 259 except TLSRemoteAlert as alert: |
| 260 if alert.description != AlertDescription.bad_record_mac: |
| 261 raise |
| 262 connection.close() |
| 263 |
| 264 print("Test 21 - HTTPS test X.509") |
| 265 address = address[0], address[1]+1 |
| 266 if hasattr(socket, "timeout"): |
| 267 timeoutEx = socket.timeout |
| 268 else: |
| 269 timeoutEx = socket.error |
| 270 while 1: |
| 271 try: |
| 272 time.sleep(2) |
| 273 htmlBody = bytearray(open(os.path.join(dir, "index.html")).read(), "
utf-8") |
| 274 fingerprint = None |
| 275 for y in range(2): |
| 276 checker =Checker(x509Fingerprint=fingerprint) |
| 277 h = HTTPTLSConnection(\ |
| 278 address[0], address[1], checker=checker) |
| 279 for x in range(3): |
| 280 h.request("GET", "/index.html") |
| 281 r = h.getresponse() |
| 282 assert(r.status == 200) |
| 283 b = bytearray(r.read()) |
| 284 assert(b == htmlBody) |
| 285 fingerprint = h.tlsSession.serverCertChain.getFingerprint() |
| 286 assert(fingerprint) |
| 287 time.sleep(2) |
| 288 break |
| 289 except timeoutEx: |
| 290 print("timeout, retrying...") |
| 291 pass |
| 292 |
| 293 address = address[0], address[1]+1 |
| 294 |
| 295 implementations = [] |
| 296 if m2cryptoLoaded: |
| 297 implementations.append("openssl") |
| 298 if pycryptoLoaded: |
| 299 implementations.append("pycrypto") |
| 300 implementations.append("python") |
| 301 |
| 302 print("Test 22 - different ciphers, TLSv1.0") |
| 303 for implementation in implementations: |
| 304 for cipher in ["aes128", "aes256", "rc4"]: |
| 305 |
| 306 print("Test 22:", end=' ') |
| 307 connection = connect() |
| 308 |
| 309 settings = HandshakeSettings() |
| 310 settings.cipherNames = [cipher] |
| 311 settings.cipherImplementations = [implementation, "python"] |
| 312 settings.minVersion = (3,1) |
| 313 settings.maxVersion = (3,1) |
| 314 connection.handshakeClientCert(settings=settings) |
| 315 testConnClient(connection) |
| 316 print("%s %s" % (connection.getCipherName(), connection.getCipherImp
lementation())) |
| 317 connection.close() |
| 318 |
| 319 print("Test 23 - throughput test") |
| 320 for implementation in implementations: |
| 321 for cipher in ["aes128", "aes256", "3des", "rc4"]: |
| 322 if cipher == "3des" and implementation not in ("openssl", "pycrypto"
): |
| 323 continue |
| 324 |
| 325 print("Test 23:", end=' ') |
| 326 connection = connect() |
| 327 |
| 328 settings = HandshakeSettings() |
| 329 settings.cipherNames = [cipher] |
| 330 settings.cipherImplementations = [implementation, "python"] |
| 331 connection.handshakeClientCert(settings=settings) |
| 332 print("%s %s:" % (connection.getCipherName(), connection.getCipherIm
plementation()), end=' ') |
| 333 |
| 334 startTime = time.clock() |
| 335 connection.write(b"hello"*10000) |
| 336 h = connection.read(min=50000, max=50000) |
| 337 stopTime = time.clock() |
| 338 if stopTime-startTime: |
| 339 print("100K exchanged at rate of %d bytes/sec" % int(100000/(sto
pTime-startTime))) |
| 340 else: |
| 341 print("100K exchanged very fast") |
| 342 |
| 343 assert(h == b"hello"*10000) |
| 344 connection.close() |
| 345 |
| 346 print("Test 24.a - Next-Protocol Client Negotiation") |
| 347 connection = connect() |
| 348 connection.handshakeClientCert(nextProtos=[b"http/1.1"]) |
| 349 #print(" Next-Protocol Negotiated: %s" % connection.next_proto) |
| 350 assert(connection.next_proto == b'http/1.1') |
| 351 connection.close() |
| 352 |
| 353 print("Test 24.b - Next-Protocol Client Negotiation") |
| 354 connection = connect() |
| 355 connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"]) |
| 356 #print(" Next-Protocol Negotiated: %s" % connection.next_proto) |
| 357 assert(connection.next_proto == b'spdy/2') |
| 358 connection.close() |
| 359 |
| 360 print("Test 24.c - Next-Protocol Client Negotiation") |
| 361 connection = connect() |
| 362 connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"]) |
| 363 #print(" Next-Protocol Negotiated: %s" % connection.next_proto) |
| 364 assert(connection.next_proto == b'spdy/2') |
| 365 connection.close() |
| 366 |
| 367 print("Test 24.d - Next-Protocol Client Negotiation") |
| 368 connection = connect() |
| 369 connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"
]) |
| 370 #print(" Next-Protocol Negotiated: %s" % connection.next_proto) |
| 371 assert(connection.next_proto == b'spdy/2') |
| 372 connection.close() |
| 373 |
| 374 print("Test 24.e - Next-Protocol Client Negotiation") |
| 375 connection = connect() |
| 376 connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"
]) |
| 377 #print(" Next-Protocol Negotiated: %s" % connection.next_proto) |
| 378 assert(connection.next_proto == b'spdy/3') |
| 379 connection.close() |
| 380 |
| 381 print("Test 24.f - Next-Protocol Client Negotiation") |
| 382 connection = connect() |
| 383 connection.handshakeClientCert(nextProtos=[b"http/1.1"]) |
| 384 #print(" Next-Protocol Negotiated: %s" % connection.next_proto) |
| 385 assert(connection.next_proto == b'http/1.1') |
| 386 connection.close() |
| 387 |
| 388 print("Test 24.g - Next-Protocol Client Negotiation") |
| 389 connection = connect() |
| 390 connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"]) |
| 391 #print(" Next-Protocol Negotiated: %s" % connection.next_proto) |
| 392 assert(connection.next_proto == b'spdy/2') |
| 393 connection.close() |
| 394 |
| 395 print('Test 25 - good standard XMLRPC https client') |
| 396 time.sleep(2) # Hack for lack of ability to set timeout here |
| 397 address = address[0], address[1]+1 |
| 398 server = xmlrpclib.Server('https://%s:%s' % address) |
| 399 assert server.add(1,2) == 3 |
| 400 assert server.pow(2,4) == 16 |
| 401 |
| 402 print('Test 26 - good tlslite XMLRPC client') |
| 403 transport = XMLRPCTransport(ignoreAbruptClose=True) |
| 404 server = xmlrpclib.Server('https://%s:%s' % address, transport) |
| 405 assert server.add(1,2) == 3 |
| 406 assert server.pow(2,4) == 16 |
| 407 |
| 408 print('Test 27 - good XMLRPC ignored protocol') |
| 409 server = xmlrpclib.Server('http://%s:%s' % address, transport) |
| 410 assert server.add(1,2) == 3 |
| 411 assert server.pow(2,4) == 16 |
| 412 |
| 413 print("Test 28 - Internet servers test") |
| 414 try: |
| 415 i = IMAP4_TLS("cyrus.andrew.cmu.edu") |
| 416 i.login("anonymous", "anonymous@anonymous.net") |
| 417 i.logout() |
| 418 print("Test 28: IMAP4 good") |
| 419 p = POP3_TLS("pop.gmail.com") |
| 420 p.quit() |
| 421 print("Test 29: POP3 good") |
| 422 except socket.error as e: |
| 423 print("Non-critical error: socket error trying to reach internet server:
", e) |
| 424 |
| 425 if not badFault: |
| 426 print("Test succeeded") |
| 427 else: |
| 428 print("Test failed") |
| 429 |
| 430 |
| 431 |
| 432 def testConnServer(connection): |
| 433 count = 0 |
| 434 while 1: |
| 435 s = connection.read() |
| 436 count += len(s) |
| 437 if len(s) == 0: |
| 438 break |
| 439 connection.write(s) |
| 440 if count == 1111: |
| 441 break |
| 442 |
| 443 def serverTestCmd(argv): |
| 444 |
| 445 address = argv[0] |
| 446 dir = argv[1] |
| 447 |
| 448 #Split address into hostname/port tuple |
| 449 address = address.split(":") |
| 450 address = ( address[0], int(address[1]) ) |
| 451 |
| 452 #Connect to server |
| 453 lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 454 lsock.bind(address) |
| 455 lsock.listen(5) |
| 456 |
| 457 def connect(): |
| 458 return TLSConnection(lsock.accept()[0]) |
| 459 |
| 460 x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read()
) |
| 461 x509Chain = X509CertChain([x509Cert]) |
| 462 s = open(os.path.join(dir, "serverX509Key.pem")).read() |
| 463 x509Key = parsePEMKey(s, private=True) |
| 464 |
| 465 print("Test 0 - Anonymous server handshake") |
| 466 connection = connect() |
| 467 connection.handshakeServer(anon=True) |
| 468 testConnServer(connection) |
| 469 connection.close() |
| 470 |
| 471 print("Test 1 - good X.509") |
| 472 connection = connect() |
| 473 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key) |
| 474 assert(connection.session.serverName == address[0]) |
| 475 testConnServer(connection) |
| 476 connection.close() |
| 477 |
| 478 print("Test 1.a - good X.509, SSL v3") |
| 479 connection = connect() |
| 480 settings = HandshakeSettings() |
| 481 settings.minVersion = (3,0) |
| 482 settings.maxVersion = (3,0) |
| 483 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings
=settings) |
| 484 testConnServer(connection) |
| 485 connection.close() |
| 486 |
| 487 print("Test 1.b - good X.509, RC4-MD5") |
| 488 connection = connect() |
| 489 settings = HandshakeSettings() |
| 490 settings.macNames = ["sha", "md5"] |
| 491 settings.cipherNames = ["rc4"] |
| 492 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings
=settings) |
| 493 testConnServer(connection) |
| 494 connection.close() |
| 495 |
| 496 if tackpyLoaded: |
| 497 tack = Tack.createFromPem(open("./TACK1.pem", "rU").read()) |
| 498 tackUnrelated = Tack.createFromPem(open("./TACKunrelated.pem", "rU").rea
d()) |
| 499 |
| 500 settings = HandshakeSettings() |
| 501 settings.useExperimentalTackExtension = True |
| 502 |
| 503 print("Test 2.a - good X.509, TACK") |
| 504 connection = connect() |
| 505 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| 506 tacks=[tack], activationFlags=1, settings=settings) |
| 507 testConnServer(connection) |
| 508 connection.close() |
| 509 |
| 510 print("Test 2.b - good X.509, TACK unrelated to cert chain") |
| 511 connection = connect() |
| 512 try: |
| 513 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| 514 tacks=[tackUnrelated], settings=settings) |
| 515 assert(False) |
| 516 except TLSRemoteAlert as alert: |
| 517 if alert.description != AlertDescription.illegal_parameter: |
| 518 raise |
| 519 |
| 520 print("Test 3 - good SRP") |
| 521 verifierDB = VerifierDB() |
| 522 verifierDB.create() |
| 523 entry = VerifierDB.makeVerifier("test", "password", 1536) |
| 524 verifierDB["test"] = entry |
| 525 |
| 526 connection = connect() |
| 527 connection.handshakeServer(verifierDB=verifierDB) |
| 528 testConnServer(connection) |
| 529 connection.close() |
| 530 |
| 531 print("Test 4 - SRP faults") |
| 532 for fault in Fault.clientSrpFaults + Fault.genericFaults: |
| 533 connection = connect() |
| 534 connection.fault = fault |
| 535 try: |
| 536 connection.handshakeServer(verifierDB=verifierDB) |
| 537 assert() |
| 538 except: |
| 539 pass |
| 540 connection.close() |
| 541 |
| 542 print("Test 6 - good SRP: with X.509 cert") |
| 543 connection = connect() |
| 544 connection.handshakeServer(verifierDB=verifierDB, \ |
| 545 certChain=x509Chain, privateKey=x509Key) |
| 546 testConnServer(connection) |
| 547 connection.close() |
| 548 |
| 549 print("Test 7 - X.509 with SRP faults") |
| 550 for fault in Fault.clientSrpFaults + Fault.genericFaults: |
| 551 connection = connect() |
| 552 connection.fault = fault |
| 553 try: |
| 554 connection.handshakeServer(verifierDB=verifierDB, \ |
| 555 certChain=x509Chain, privateKey=x509Key) |
| 556 assert() |
| 557 except: |
| 558 pass |
| 559 connection.close() |
| 560 |
| 561 print("Test 11 - X.509 faults") |
| 562 for fault in Fault.clientNoAuthFaults + Fault.genericFaults: |
| 563 connection = connect() |
| 564 connection.fault = fault |
| 565 try: |
| 566 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key) |
| 567 assert() |
| 568 except: |
| 569 pass |
| 570 connection.close() |
| 571 |
| 572 print("Test 14 - good mutual X.509") |
| 573 connection = connect() |
| 574 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=
True) |
| 575 testConnServer(connection) |
| 576 assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| 577 connection.close() |
| 578 |
| 579 print("Test 14a - good mutual X.509, SSLv3") |
| 580 connection = connect() |
| 581 settings = HandshakeSettings() |
| 582 settings.minVersion = (3,0) |
| 583 settings.maxVersion = (3,0) |
| 584 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=
True, settings=settings) |
| 585 testConnServer(connection) |
| 586 assert(isinstance(connection.session.serverCertChain, X509CertChain)) |
| 587 connection.close() |
| 588 |
| 589 print("Test 15 - mutual X.509 faults") |
| 590 for fault in Fault.clientCertFaults + Fault.genericFaults: |
| 591 connection = connect() |
| 592 connection.fault = fault |
| 593 try: |
| 594 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
reqCert=True) |
| 595 assert() |
| 596 except: |
| 597 pass |
| 598 connection.close() |
| 599 |
| 600 print("Test 18 - good SRP, prepare to resume") |
| 601 sessionCache = SessionCache() |
| 602 connection = connect() |
| 603 connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) |
| 604 assert(connection.session.serverName == address[0]) |
| 605 testConnServer(connection) |
| 606 connection.close() |
| 607 |
| 608 print("Test 19 - resumption") |
| 609 connection = connect() |
| 610 connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) |
| 611 assert(connection.session.serverName == address[0]) |
| 612 testConnServer(connection) |
| 613 #Don't close! -- see next test |
| 614 |
| 615 print("Test 20 - invalidated resumption") |
| 616 try: |
| 617 connection.read(min=1, max=1) |
| 618 assert() #Client is going to close the socket without a close_notify |
| 619 except TLSAbruptCloseError as e: |
| 620 pass |
| 621 connection = connect() |
| 622 try: |
| 623 connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCa
che) |
| 624 except TLSLocalAlert as alert: |
| 625 if alert.description != AlertDescription.bad_record_mac: |
| 626 raise |
| 627 connection.close() |
| 628 |
| 629 print("Test 21 - HTTPS test X.509") |
| 630 |
| 631 #Close the current listening socket |
| 632 lsock.close() |
| 633 |
| 634 #Create and run an HTTP Server using TLSSocketServerMixIn |
| 635 class MyHTTPServer(TLSSocketServerMixIn, |
| 636 HTTPServer): |
| 637 def handshake(self, tlsConnection): |
| 638 tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x5
09Key) |
| 639 return True |
| 640 cd = os.getcwd() |
| 641 os.chdir(dir) |
| 642 address = address[0], address[1]+1 |
| 643 httpd = MyHTTPServer(address, SimpleHTTPRequestHandler) |
| 644 for x in range(6): |
| 645 httpd.handle_request() |
| 646 httpd.server_close() |
| 647 cd = os.chdir(cd) |
| 648 |
| 649 #Re-connect the listening socket |
| 650 lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 651 address = address[0], address[1]+1 |
| 652 lsock.bind(address) |
| 653 lsock.listen(5) |
| 654 |
| 655 implementations = [] |
| 656 if m2cryptoLoaded: |
| 657 implementations.append("openssl") |
| 658 if pycryptoLoaded: |
| 659 implementations.append("pycrypto") |
| 660 implementations.append("python") |
| 661 |
| 662 print("Test 22 - different ciphers") |
| 663 for implementation in ["python"] * len(implementations): |
| 664 for cipher in ["aes128", "aes256", "rc4"]: |
| 665 |
| 666 print("Test 22:", end=' ') |
| 667 connection = connect() |
| 668 |
| 669 settings = HandshakeSettings() |
| 670 settings.cipherNames = [cipher] |
| 671 settings.cipherImplementations = [implementation, "python"] |
| 672 |
| 673 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| 674 settings=settings) |
| 675 print(connection.getCipherName(), connection.getCipherImplementation
()) |
| 676 testConnServer(connection) |
| 677 connection.close() |
| 678 |
| 679 print("Test 23 - throughput test") |
| 680 for implementation in implementations: |
| 681 for cipher in ["aes128", "aes256", "3des", "rc4"]: |
| 682 if cipher == "3des" and implementation not in ("openssl", "pycrypto"
): |
| 683 continue |
| 684 |
| 685 print("Test 23:", end=' ') |
| 686 connection = connect() |
| 687 |
| 688 settings = HandshakeSettings() |
| 689 settings.cipherNames = [cipher] |
| 690 settings.cipherImplementations = [implementation, "python"] |
| 691 |
| 692 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| 693 settings=settings) |
| 694 print(connection.getCipherName(), connection.getCipherImplementation
()) |
| 695 h = connection.read(min=50000, max=50000) |
| 696 assert(h == b"hello"*10000) |
| 697 connection.write(h) |
| 698 connection.close() |
| 699 |
| 700 print("Test 24.a - Next-Protocol Server Negotiation") |
| 701 connection = connect() |
| 702 settings = HandshakeSettings() |
| 703 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| 704 settings=settings, nextProtos=[b"http/1.1"]) |
| 705 testConnServer(connection) |
| 706 connection.close() |
| 707 |
| 708 print("Test 24.b - Next-Protocol Server Negotiation") |
| 709 connection = connect() |
| 710 settings = HandshakeSettings() |
| 711 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| 712 settings=settings, nextProtos=[b"spdy/2", b"http/
1.1"]) |
| 713 testConnServer(connection) |
| 714 connection.close() |
| 715 |
| 716 print("Test 24.c - Next-Protocol Server Negotiation") |
| 717 connection = connect() |
| 718 settings = HandshakeSettings() |
| 719 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| 720 settings=settings, nextProtos=[b"http/1.1", b"spd
y/2"]) |
| 721 testConnServer(connection) |
| 722 connection.close() |
| 723 |
| 724 print("Test 24.d - Next-Protocol Server Negotiation") |
| 725 connection = connect() |
| 726 settings = HandshakeSettings() |
| 727 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| 728 settings=settings, nextProtos=[b"spdy/2", b"http/
1.1"]) |
| 729 testConnServer(connection) |
| 730 connection.close() |
| 731 |
| 732 print("Test 24.e - Next-Protocol Server Negotiation") |
| 733 connection = connect() |
| 734 settings = HandshakeSettings() |
| 735 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| 736 settings=settings, nextProtos=[b"http/1.1", b"spd
y/2", b"spdy/3"]) |
| 737 testConnServer(connection) |
| 738 connection.close() |
| 739 |
| 740 print("Test 24.f - Next-Protocol Server Negotiation") |
| 741 connection = connect() |
| 742 settings = HandshakeSettings() |
| 743 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| 744 settings=settings, nextProtos=[b"spdy/3", b"spdy/
2"]) |
| 745 testConnServer(connection) |
| 746 connection.close() |
| 747 |
| 748 print("Test 24.g - Next-Protocol Server Negotiation") |
| 749 connection = connect() |
| 750 settings = HandshakeSettings() |
| 751 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, |
| 752 settings=settings, nextProtos=[]) |
| 753 testConnServer(connection) |
| 754 connection.close() |
| 755 |
| 756 print("Tests 25-27 - XMLRPXC server") |
| 757 address = address[0], address[1]+1 |
| 758 class Server(TLSXMLRPCServer): |
| 759 |
| 760 def handshake(self, tlsConnection): |
| 761 try: |
| 762 tlsConnection.handshakeServer(certChain=x509Chain, |
| 763 privateKey=x509Key, |
| 764 sessionCache=sessionCache) |
| 765 tlsConnection.ignoreAbruptClose = True |
| 766 return True |
| 767 except TLSError as error: |
| 768 print("Handshake failure:", str(error)) |
| 769 return False |
| 770 |
| 771 class MyFuncs: |
| 772 def pow(self, x, y): return pow(x, y) |
| 773 def add(self, x, y): return x + y |
| 774 |
| 775 server = Server(address) |
| 776 server.register_instance(MyFuncs()) |
| 777 #sa = server.socket.getsockname() |
| 778 #print "Serving HTTPS on", sa[0], "port", sa[1] |
| 779 for i in range(6): |
| 780 server.handle_request() |
| 781 |
| 782 print("Test succeeded") |
| 783 |
| 784 |
| 785 if __name__ == '__main__': |
| 786 if len(sys.argv) < 2: |
| 787 printUsage("Missing command") |
| 788 elif sys.argv[1] == "client"[:len(sys.argv[1])]: |
| 789 clientTestCmd(sys.argv[2:]) |
| 790 elif sys.argv[1] == "server"[:len(sys.argv[1])]: |
| 791 serverTestCmd(sys.argv[2:]) |
| 792 else: |
| 793 printUsage("Unknown command: %s" % sys.argv[1]) |
OLD | NEW |