Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(241)

Side by Side Diff: sdk/lib/io/secure_socket.dart

Issue 16858011: dart:io | Enable multithreaded secure networking encryption. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address all comments Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/_internal/lib/io_patch.dart ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 /** 7 /**
8 * A high-level class for communicating securely over a TCP socket, using 8 * A high-level class for communicating securely over a TCP socket, using
9 * TLS and SSL. The [SecureSocket] exposes both a [Stream] and an 9 * TLS and SSL. The [SecureSocket] exposes both a [Stream] and an
10 * [IOSink] interface, making it ideal for using together with 10 * [IOSink] interface, making it ideal for using together with
(...skipping 340 matching lines...) Expand 10 before | Expand all | Expand 10 after
351 this.issuer, 351 this.issuer,
352 this.startValidity, 352 this.startValidity,
353 this.endValidity); 353 this.endValidity);
354 final String subject; 354 final String subject;
355 final String issuer; 355 final String issuer;
356 final DateTime startValidity; 356 final DateTime startValidity;
357 final DateTime endValidity; 357 final DateTime endValidity;
358 } 358 }
359 359
360 360
361 class _FilterStatus {
362 bool progress = false; // The filter read or wrote data to the buffers.
363 bool readEmpty = true; // The read buffers and decryption filter are empty.
364 bool writeEmpty = true; // The write buffers and encryption filter are empty.
365 // These are set if a buffer changes state from empty or full.
366 bool readPlaintextNoLongerEmpty = false;
367 bool writePlaintextNoLongerFull = false;
368 bool readEncryptedNoLongerFull = false;
369 bool writeEncryptedNoLongerEmpty = false;
370
371 _FilterStatus();
372 }
373
374
361 class _RawSecureSocket extends Stream<RawSocketEvent> 375 class _RawSecureSocket extends Stream<RawSocketEvent>
362 implements RawSecureSocket { 376 implements RawSecureSocket {
363 // Status states 377 // Status states
364 static final int NOT_CONNECTED = 200;
365 static final int HANDSHAKE = 201; 378 static final int HANDSHAKE = 201;
366 static final int CONNECTED = 202; 379 static final int CONNECTED = 202;
367 static final int CLOSED = 203; 380 static final int CLOSED = 203;
368 381
369 // Buffer identifiers. 382 // Buffer identifiers.
370 // These must agree with those in the native C++ implementation. 383 // These must agree with those in the native C++ implementation.
371 static final int READ_PLAINTEXT = 0; 384 static final int READ_PLAINTEXT = 0;
372 static final int WRITE_PLAINTEXT = 1; 385 static final int WRITE_PLAINTEXT = 1;
373 static final int READ_ENCRYPTED = 2; 386 static final int READ_ENCRYPTED = 2;
374 static final int WRITE_ENCRYPTED = 3; 387 static final int WRITE_ENCRYPTED = 3;
375 static final int NUM_BUFFERS = 4; 388 static final int NUM_BUFFERS = 4;
376 389
390 // Is a buffer identifier for an encrypted buffer?
391 static bool _isBufferEncrypted(int identifier) => identifier >= READ_ENCRYPTED ;
392
377 RawSocket _socket; 393 RawSocket _socket;
378 final Completer<_RawSecureSocket> _handshakeComplete = 394 final Completer<_RawSecureSocket> _handshakeComplete =
379 new Completer<_RawSecureSocket>(); 395 new Completer<_RawSecureSocket>();
380 StreamController<RawSocketEvent> _controller; 396 StreamController<RawSocketEvent> _controller;
381 Stream<RawSocketEvent> _stream; 397 Stream<RawSocketEvent> _stream;
382 StreamSubscription<RawSocketEvent> _socketSubscription; 398 StreamSubscription<RawSocketEvent> _socketSubscription;
383 List<int> _bufferedData; 399 List<int> _bufferedData;
384 int _bufferedDataIndex = 0; 400 int _bufferedDataIndex = 0;
385 final InternetAddress address; 401 final InternetAddress address;
386 final bool is_server; 402 final bool is_server;
387 final String certificateName; 403 final String certificateName;
388 final bool requestClientCertificate; 404 final bool requestClientCertificate;
389 final bool requireClientCertificate; 405 final bool requireClientCertificate;
390 final bool sendClientCertificate; 406 final bool sendClientCertificate;
391 final Function onBadCertificate; 407 final Function onBadCertificate;
392 408
393 var _status = NOT_CONNECTED; 409 var _status = HANDSHAKE;
394 bool _writeEventsEnabled = true; 410 bool _writeEventsEnabled = true;
395 bool _readEventsEnabled = true; 411 bool _readEventsEnabled = true;
412 int _pauseCount = 0;
413 bool _pendingReadEvent = false;
396 bool _socketClosedRead = false; // The network socket is closed for reading. 414 bool _socketClosedRead = false; // The network socket is closed for reading.
397 bool _socketClosedWrite = false; // The network socket is closed for writing. 415 bool _socketClosedWrite = false; // The network socket is closed for writing.
398 bool _closedRead = false; // The secure socket has fired an onClosed event. 416 bool _closedRead = false; // The secure socket has fired an onClosed event.
399 bool _closedWrite = false; // The secure socket has been closed for writing. 417 bool _closedWrite = false; // The secure socket has been closed for writing.
400 bool _filterReadEmpty = true; // There is no buffered data to read. 418 _FilterStatus _filterStatus = new _FilterStatus();
401 bool _filterWriteEmpty = true; // There is no buffered data to be written.
402 bool _connectPending = false; 419 bool _connectPending = false;
420 bool _filterPending = false;
421 bool _filterActive = false;
422
403 _SecureFilter _secureFilter = new _SecureFilter(); 423 _SecureFilter _secureFilter = new _SecureFilter();
424 int _filterPointer;
425 SendPort _filterService;
404 426
405 static Future<_RawSecureSocket> connect( 427 static Future<_RawSecureSocket> connect(
406 host, 428 host,
407 int requestedPort, 429 int requestedPort,
408 String certificateName, 430 String certificateName,
409 {bool is_server, 431 {bool is_server,
410 RawSocket socket, 432 RawSocket socket,
411 StreamSubscription subscription, 433 StreamSubscription subscription,
412 List<int> bufferedData, 434 List<int> bufferedData,
413 bool requestClientCertificate: false, 435 bool requestClientCertificate: false,
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
457 sync: true, 479 sync: true,
458 onListen: _onSubscriptionStateChange, 480 onListen: _onSubscriptionStateChange,
459 onPause: _onPauseStateChange, 481 onPause: _onPauseStateChange,
460 onResume: _onPauseStateChange, 482 onResume: _onPauseStateChange,
461 onCancel: _onSubscriptionStateChange); 483 onCancel: _onSubscriptionStateChange);
462 _stream = _controller.stream; 484 _stream = _controller.stream;
463 // Throw an ArgumentError if any field is invalid. After this, all 485 // Throw an ArgumentError if any field is invalid. After this, all
464 // errors will be reported through the future or the stream. 486 // errors will be reported through the future or the stream.
465 _verifyFields(); 487 _verifyFields();
466 _secureFilter.init(); 488 _secureFilter.init();
467 if (_bufferedData != null) _readFromBuffered(); 489 _filterPointer = _secureFilter._pointer();
468 _secureFilter.registerHandshakeCompleteCallback( 490 _secureFilter.registerHandshakeCompleteCallback(
469 _secureHandshakeCompleteHandler); 491 _secureHandshakeCompleteHandler);
470 if (onBadCertificate != null) { 492 if (onBadCertificate != null) {
471 _secureFilter.registerBadCertificateCallback(onBadCertificate); 493 _secureFilter.registerBadCertificateCallback(onBadCertificate);
472 } 494 }
473 var futureSocket; 495 var futureSocket;
474 if (socket == null) { 496 if (socket == null) {
475 futureSocket = RawSocket.connect(address, requestedPort); 497 futureSocket = RawSocket.connect(address, requestedPort);
476 } else { 498 } else {
477 futureSocket = new Future.value(socket); 499 futureSocket = new Future.value(socket);
(...skipping 16 matching lines...) Expand all
494 _connectPending = true; 516 _connectPending = true;
495 _secureFilter.connect(address.host, 517 _secureFilter.connect(address.host,
496 (address as dynamic)._sockaddr_storage, 518 (address as dynamic)._sockaddr_storage,
497 port, 519 port,
498 is_server, 520 is_server,
499 certificateName, 521 certificateName,
500 requestClientCertificate || 522 requestClientCertificate ||
501 requireClientCertificate, 523 requireClientCertificate,
502 requireClientCertificate, 524 requireClientCertificate,
503 sendClientCertificate); 525 sendClientCertificate);
504 _status = HANDSHAKE;
505 _secureHandshake(); 526 _secureHandshake();
506 }) 527 })
507 .catchError((error) { 528 .catchError((error) {
508 _handshakeComplete.completeError(error); 529 _handshakeComplete.completeError(error);
509 _close(); 530 _close();
510 }); 531 });
511 } 532 }
512 533
513 StreamSubscription listen(void onData(RawSocketEvent data), 534 StreamSubscription listen(void onData(RawSocketEvent data),
514 {void onError(error), 535 {void onError(error),
515 void onDone(), 536 void onDone(),
516 bool cancelOnError}) { 537 bool cancelOnError}) {
517 if (_writeEventsEnabled) { 538 _sendWriteEvent();
518 _writeEventsEnabled = false;
519 _controller.add(RawSocketEvent.WRITE);
520 }
521 return _stream.listen(onData, 539 return _stream.listen(onData,
522 onError: onError, 540 onError: onError,
523 onDone: onDone, 541 onDone: onDone,
524 cancelOnError: cancelOnError); 542 cancelOnError: cancelOnError);
525 } 543 }
526 544
527 void _verifyFields() { 545 void _verifyFields() {
528 assert(is_server is bool); 546 assert(is_server is bool);
529 assert(_socket == null || _socket is RawSocket); 547 assert(_socket == null || _socket is RawSocket);
530 if (address is! InternetAddress) { 548 if (address is! InternetAddress) {
(...skipping 21 matching lines...) Expand all
552 } 570 }
553 571
554 int get port => _socket.port; 572 int get port => _socket.port;
555 573
556 String get remoteHost => _socket.remoteHost; 574 String get remoteHost => _socket.remoteHost;
557 575
558 int get remotePort => _socket.remotePort; 576 int get remotePort => _socket.remotePort;
559 577
560 int available() { 578 int available() {
561 if (_status != CONNECTED) return 0; 579 if (_status != CONNECTED) return 0;
562 _readEncryptedData();
563 return _secureFilter.buffers[READ_PLAINTEXT].length; 580 return _secureFilter.buffers[READ_PLAINTEXT].length;
564 } 581 }
565 582
566 void close() { 583 void close() {
567 shutdown(SocketDirection.BOTH); 584 shutdown(SocketDirection.BOTH);
568 } 585 }
569 586
570 void _close() { 587 void _close() {
571 _closedWrite = true; 588 _closedWrite = true;
572 _closedRead = true; 589 _closedRead = true;
573 if (_socket != null) { 590 if (_socket != null) {
574 _socket.close(); 591 _socket.close();
575 } 592 }
576 _socketClosedWrite = true; 593 _socketClosedWrite = true;
577 _socketClosedRead = true; 594 _socketClosedRead = true;
578 if (_secureFilter != null) { 595 if (!_filterActive && _secureFilter != null) {
579 _secureFilter.destroy(); 596 _secureFilter.destroy();
580 _secureFilter = null; 597 _secureFilter = null;
581 } 598 }
582 if (_socketSubscription != null) { 599 if (_socketSubscription != null) {
583 _socketSubscription.cancel(); 600 _socketSubscription.cancel();
584 } 601 }
585 _controller.close(); 602 _controller.close();
586 _status = CLOSED; 603 _status = CLOSED;
587 } 604 }
588 605
589 void shutdown(SocketDirection direction) { 606 void shutdown(SocketDirection direction) {
590 if (direction == SocketDirection.SEND || 607 if (direction == SocketDirection.SEND ||
591 direction == SocketDirection.BOTH) { 608 direction == SocketDirection.BOTH) {
592 _closedWrite = true; 609 _closedWrite = true;
593 _writeEncryptedData(); 610 if (_filterStatus.writeEmpty) {
594 if (_filterWriteEmpty) {
595 _socket.shutdown(SocketDirection.SEND); 611 _socket.shutdown(SocketDirection.SEND);
596 _socketClosedWrite = true; 612 _socketClosedWrite = true;
597 if (_closedRead) { 613 if (_closedRead) {
598 _close(); 614 _close();
599 } 615 }
600 } 616 }
601 } 617 }
602 if (direction == SocketDirection.RECEIVE || 618 if (direction == SocketDirection.RECEIVE ||
603 direction == SocketDirection.BOTH) { 619 direction == SocketDirection.BOTH) {
604 _closedRead = true; 620 _closedRead = true;
605 _socketClosedRead = true; 621 _socketClosedRead = true;
606 _socket.shutdown(SocketDirection.RECEIVE); 622 _socket.shutdown(SocketDirection.RECEIVE);
607 if (_socketClosedWrite) { 623 if (_socketClosedWrite) {
608 _close(); 624 _close();
609 } 625 }
610 } 626 }
611 } 627 }
612 628
613 bool get writeEventsEnabled => _writeEventsEnabled; 629 bool get writeEventsEnabled => _writeEventsEnabled;
614 630
615 void set writeEventsEnabled(bool value) { 631 void set writeEventsEnabled(bool value) {
616 if (value && 632 _writeEventsEnabled = value;
617 _controller.hasListener && 633 if (value) {
618 _secureFilter != null && 634 Timer.run(() => _sendWriteEvent());
619 _secureFilter.buffers[WRITE_PLAINTEXT].free > 0) {
620 Timer.run(() => _controller.add(RawSocketEvent.WRITE));
621 } else {
622 _writeEventsEnabled = value;
623 } 635 }
624 } 636 }
625 637
626 bool get readEventsEnabled => _readEventsEnabled; 638 bool get readEventsEnabled => _readEventsEnabled;
627 639
628 void set readEventsEnabled(bool value) { 640 void set readEventsEnabled(bool value) {
629 _readEventsEnabled = value; 641 _readEventsEnabled = value;
630 if (value && 642 _scheduleReadEvent();
631 ((_secureFilter != null &&
632 _secureFilter.buffers[READ_PLAINTEXT].length > 0) ||
633 _socketClosedRead)) {
634 // We might not have no underlying socket to set off read events.
635 Timer.run(_readHandler);
636 }
637 } 643 }
638 644
639 List<int> read([int len]) { 645 List<int> read([int length]) {
646 if (length != null && (length is! int || length < 0)) {
647 throw new ArgumentError(
648 "Invalid length parameter in SecureSocket.read (length: $length)");
649 }
640 if (_closedRead) { 650 if (_closedRead) {
641 throw new SocketException("Reading from a closed socket"); 651 throw new SocketException("Reading from a closed socket");
642 } 652 }
643 if (_status != CONNECTED) { 653 if (_status != CONNECTED) {
644 return null; 654 return null;
645 } 655 }
646 var buffer = _secureFilter.buffers[READ_PLAINTEXT]; 656 var result = _secureFilter.buffers[READ_PLAINTEXT].read(length);
647 _readEncryptedData(); 657 _scheduleFilter();
648 int toRead = buffer.length;
649 if (len != null) {
650 if (len is! int || len < 0) {
651 throw new ArgumentError(
652 "Invalid len parameter in SecureSocket.read (len: $len)");
653 }
654 if (len < toRead) {
655 toRead = len;
656 }
657 }
658 List<int> result = (toRead == 0) ? null :
659 buffer.data.sublist(buffer.start, buffer.start + toRead);
660 buffer.advanceStart(toRead);
661
662 // Set up a read event if the filter still has data.
663 if (!_filterReadEmpty) {
664 Timer.run(_readHandler);
665 }
666
667 if (_socketClosedRead) { // An onClose event is pending.
668 // _closedRead is false, since we are in a read call.
669 if (!_filterReadEmpty) {
670 // _filterReadEmpty may be out of date since read empties
671 // the plaintext buffer after calling _readEncryptedData.
672 // TODO(whesse): Fix this as part of fixing read.
673 _readEncryptedData();
674 }
675 if (_filterReadEmpty) {
676 // This can't be an else clause: the value of _filterReadEmpty changes.
677 // This must be asynchronous, because we are in a read call.
678 Timer.run(_closeHandler);
679 }
680 }
681
682 return result; 658 return result;
683 } 659 }
684 660
685 // Write the data to the socket, and flush it as much as possible 661 // Write the data to the socket, and schedule the filter to encrypt it.
686 // until it would block. If the write would block, _writeEncryptedData sets
687 // up handlers to flush the pipeline when possible.
688 int write(List<int> data, [int offset, int bytes]) { 662 int write(List<int> data, [int offset, int bytes]) {
663 if (bytes != null && (bytes is! int || bytes < 0)) {
664 throw new ArgumentError(
665 "Invalid bytes parameter in SecureSocket.read (bytes: $bytes)");
666 }
667 if (offset != null && (offset is! int || offset < 0)) {
668 throw new ArgumentError(
669 "Invalid offset parameter in SecureSocket.read (offset: $offset)");
670 }
689 if (_closedWrite) { 671 if (_closedWrite) {
690 _controller.addError(new SocketException("Writing to a closed socket")); 672 _controller.addError(new SocketException("Writing to a closed socket"));
691 return 0; 673 return 0;
692 } 674 }
693 if (_status != CONNECTED) return 0; 675 if (_status != CONNECTED) return 0;
694
695 if (offset == null) offset = 0; 676 if (offset == null) offset = 0;
696 if (bytes == null) bytes = data.length - offset; 677 if (bytes == null) bytes = data.length - offset;
697 678
698 var buffer = _secureFilter.buffers[WRITE_PLAINTEXT]; 679 int written =
699 if (bytes > buffer.free) { 680 _secureFilter.buffers[WRITE_PLAINTEXT].write(data, offset, bytes);
700 bytes = buffer.free; 681 if (written > 0) {
682 _filterStatus.writeEmpty = false;
701 } 683 }
702 if (bytes > 0) { 684 _scheduleFilter();
703 int startIndex = buffer.start + buffer.length; 685 return written;
704 buffer.data.setRange(startIndex, startIndex + bytes, data, offset);
705 buffer.length += bytes;
706 }
707 _writeEncryptedData(); // Tries to flush all pipeline stages.
708 return bytes;
709 } 686 }
710 687
711 X509Certificate get peerCertificate => _secureFilter.peerCertificate; 688 X509Certificate get peerCertificate => _secureFilter.peerCertificate;
712 689
713 bool setOption(SocketOption option, bool enabled) { 690 bool setOption(SocketOption option, bool enabled) {
714 if (_socket == null) return false; 691 if (_socket == null) return false;
715 return _socket.setOption(option, enabled); 692 return _socket.setOption(option, enabled);
716 } 693 }
717 694
718 void _writeHandler() {
719 if (_status == CLOSED) return;
720 _writeEncryptedData();
721 if (_filterWriteEmpty && _closedWrite && !_socketClosedWrite) {
722 // Close _socket for write, by calling shutdown(), to avoid cloning the
723 // socket closing code in shutdown().
724 shutdown(SocketDirection.SEND);
725 }
726 if (_status == HANDSHAKE) {
727 try {
728 _secureHandshake();
729 } catch (e) { _reportError(e, "RawSecureSocket error"); }
730 } else if (_status == CONNECTED &&
731 _controller.hasListener &&
732 _writeEventsEnabled &&
733 _secureFilter.buffers[WRITE_PLAINTEXT].free > 0) {
734 // Reset the one-shot handler.
735 _writeEventsEnabled = false;
736 _controller.add(RawSocketEvent.WRITE);
737 }
738 }
739
740 void _eventDispatcher(RawSocketEvent event) { 695 void _eventDispatcher(RawSocketEvent event) {
741 if (event == RawSocketEvent.READ) { 696 if (event == RawSocketEvent.READ) {
742 _readHandler(); 697 _readHandler();
743 } else if (event == RawSocketEvent.WRITE) { 698 } else if (event == RawSocketEvent.WRITE) {
744 _writeHandler(); 699 _writeHandler();
745 } else if (event == RawSocketEvent.READ_CLOSED) { 700 } else if (event == RawSocketEvent.READ_CLOSED) {
746 _closeHandler(); 701 _closeHandler();
747 } 702 }
748 } 703 }
749 704
750 void _readFromBuffered() {
751 assert(_bufferedData != null);
752 var encrypted = _secureFilter.buffers[READ_ENCRYPTED];
753 var bytes = _bufferedData.length - _bufferedDataIndex;
754 int startIndex = encrypted.start + encrypted.length;
755 encrypted.data.setRange(startIndex,
756 startIndex + bytes,
757 _bufferedData,
758 _bufferedDataIndex);
759 encrypted.length += bytes;
760 _bufferedDataIndex += bytes;
761 if (_bufferedData.length == _bufferedDataIndex) {
762 _bufferedData = null;
763 }
764 }
765
766 void _readHandler() { 705 void _readHandler() {
767 if (_status == CLOSED) { 706 _readSocket();
768 return; 707 _scheduleFilter();
769 } else if (_status == HANDSHAKE) { 708 }
770 try { 709
771 _secureHandshake(); 710 void _writeHandler() {
772 if (_status != HANDSHAKE) _readHandler(); 711 _writeSocket();
773 } catch (e) { _reportError(e, "RawSecureSocket error"); } 712 _scheduleFilter();
774 } else {
775 if (_status != CONNECTED) {
776 // Cannot happen.
777 throw new SocketException("Internal SocketIO Error");
778 }
779 try {
780 _readEncryptedData();
781 } catch (e) { _reportError(e, "RawSecureSocket error"); }
782 if (!_filterReadEmpty) {
783 if (_readEventsEnabled) {
784 if (_secureFilter.buffers[READ_PLAINTEXT].length > 0) {
785 _controller.add(RawSocketEvent.READ);
786 }
787 if (_socketClosedRead) {
788 // Keep firing read events until we are paused or buffer is empty.
789 Timer.run(_readHandler);
790 }
791 }
792 } else if (_socketClosedRead) {
793 _closeHandler();
794 }
795 }
796 } 713 }
797 714
798 void _doneHandler() { 715 void _doneHandler() {
799 if (_filterReadEmpty) { 716 if (_filterStatus.readEmpty) {
800 _close(); 717 _close();
801 } 718 }
802 } 719 }
803 720
804 void _errorHandler(e) { 721 void _errorHandler(e) {
805 _reportError(e, 'Error on underlying RawSocket'); 722 _reportError(e, 'Error on underlying RawSocket');
806 } 723 }
807 724
808 void _reportError(e, String message) { 725 void _reportError(e, String message) {
809 // TODO(whesse): Call _reportError from all internal functions that throw. 726 // TODO(whesse): Call _reportError from all internal functions that throw.
810 if (e is SocketException) { 727 if (e is SocketException) {
811 e = new SocketException('$message (${e.message})', e.osError); 728 e = new SocketException('$message (${e.message})', e.osError);
812 } else if (e is OSError) { 729 } else if (e is OSError) {
813 e = new SocketException(message, e); 730 e = new SocketException(message, e);
814 } else { 731 } else {
815 e = new SocketException('$message (${e.toString()})', null); 732 e = new SocketException('$message (${e.toString()})', null);
816 } 733 }
817 if (_connectPending) { 734 if (_connectPending) {
818 _handshakeComplete.completeError(e); 735 _handshakeComplete.completeError(e);
819 } else { 736 } else {
820 _controller.addError(e); 737 _controller.addError(e);
821 } 738 }
822 _close(); 739 _close();
823 } 740 }
824 741
825 void _closeHandler() { 742 void _closeHandler() {
826 if (_status == CONNECTED) { 743 if (_status == CONNECTED) {
827 if (_closedRead) return; 744 if (_closedRead) return;
828 _socketClosedRead = true; 745 _socketClosedRead = true;
829 if (_filterReadEmpty) { 746 if (_filterStatus.readEmpty) {
830 _closedRead = true; 747 _closedRead = true;
831 _controller.add(RawSocketEvent.READ_CLOSED); 748 _controller.add(RawSocketEvent.READ_CLOSED);
832 if (_socketClosedWrite) { 749 if (_socketClosedWrite) {
833 _close(); 750 _close();
834 } 751 }
752 } else {
753 _scheduleFilter();
835 } 754 }
836 } else if (_status == HANDSHAKE) { 755 } else if (_status == HANDSHAKE) {
756 _socketClosedRead = true;
757 if (_filterStatus.readEmpty) {
837 _reportError( 758 _reportError(
838 new SocketException('Connection terminated during handshake'), 759 new SocketException('Connection terminated during handshake'),
839 'handshake error'); 760 'RawSecureSocket error');
761 } else {
762 _secureHandshake();
763 }
840 } 764 }
841 } 765 }
842 766
843 void _secureHandshake() { 767 void _secureHandshake() {
844 _readEncryptedData(); 768 try {
845 _secureFilter.handshake(); 769 _secureFilter.handshake();
846 _writeEncryptedData(); 770 _filterStatus.writeEmpty = false;
771 _readSocket();
772 _writeSocket();
773 _scheduleFilter();
774 } catch (e) {
775 _reportError(e, "RawSecureSocket error");
776 }
847 } 777 }
848 778
849 void _secureHandshakeCompleteHandler() { 779 void _secureHandshakeCompleteHandler() {
850 _status = CONNECTED; 780 _status = CONNECTED;
851 if (_connectPending) { 781 if (_connectPending) {
852 _connectPending = false; 782 _connectPending = false;
853 // If we complete the future synchronously, user code will run here, 783 // We don't want user code to run synchronously in this callback.
854 // and modify the state of the RawSecureSocket. For example, it
855 // could close the socket, and set _filter to null.
856 Timer.run(() => _handshakeComplete.complete(this)); 784 Timer.run(() => _handshakeComplete.complete(this));
857 } 785 }
858 } 786 }
859 787
860 void _onPauseStateChange() { 788 void _onPauseStateChange() {
789 if (_controller.isPaused) {
790 _pauseCount++;
791 } else {
792 _pauseCount--;
793 if (_pauseCount == 0) {
794 _scheduleReadEvent();
795 _sendWriteEvent(); // Can send event synchronously.
796 }
797 }
798
861 if (!_socketClosedRead || !_socketClosedWrite) { 799 if (!_socketClosedRead || !_socketClosedWrite) {
862 if (_controller.isPaused) { 800 if (_controller.isPaused) {
863 _socketSubscription.pause(); 801 _socketSubscription.pause();
864 } else { 802 } else {
865 _socketSubscription.resume(); 803 _socketSubscription.resume();
866 } 804 }
867 } 805 }
868 } 806 }
869 807
870 void _onSubscriptionStateChange() { 808 void _onSubscriptionStateChange() {
871 if (_controller.hasListener) { 809 if (_controller.hasListener) {
872 // TODO(ajohnsen): Do something here? 810 // TODO(ajohnsen): Do something here?
873 } 811 }
874 } 812 }
875 813
876 void _readEncryptedData() { 814 void _scheduleFilter() {
877 // Read from the socket, and push it through the filter as far as 815 _filterPending = true;
878 // possible. 816 _tryFilter();
879 var encrypted = _secureFilter.buffers[READ_ENCRYPTED]; 817 }
880 var plaintext = _secureFilter.buffers[READ_PLAINTEXT]; 818
881 bool progress = true; 819 void _tryFilter() {
882 while (progress) { 820 if (_status == CLOSED) return;
883 progress = false; 821 if (_filterPending && !_filterActive) {
884 // Do not try to read plaintext from the filter while handshaking. 822 _filterActive = true;
885 if ((_status == CONNECTED) && plaintext.free > 0) { 823 _filterPending = false;
886 int bytes = _secureFilter.processBuffer(READ_PLAINTEXT); 824 _pushAllFilterStages().then((status) {
887 if (bytes > 0) { 825 _filterStatus = status;
888 plaintext.length += bytes; 826 _filterActive = false;
889 progress = true; 827 if (_status == CLOSED) {
890 } 828 _secureFilter.destroy();
891 } 829 _secureFilter = null;
892 if (encrypted.length > 0) { 830 return;
893 int bytes = _secureFilter.processBuffer(READ_ENCRYPTED); 831 }
894 if (bytes > 0) { 832 if (_filterStatus.writeEmpty && _closedWrite && !_socketClosedWrite) {
895 encrypted.advanceStart(bytes); 833 // Checks for and handles all cases of partially closed sockets.
896 progress = true; 834 shutdown(SocketDirection.SEND);
897 } 835 if (_status == CLOSED) return;
898 } 836 }
899 if (!_socketClosedRead && encrypted.free > 0) { 837 if (_filterStatus.readEmpty && _socketClosedRead && !_closedRead) {
900 if (_bufferedData != null) { 838 if (_status == HANDSHAKE) {
901 _readFromBuffered(); 839 _secureFilter.handshake();
902 progress = true; 840 if (_status == HANDSHAKE) {
903 } else { 841 _reportError(
904 List<int> data = _socket.read(encrypted.free); 842 new SocketException('Connection terminated during handshake'),
905 if (data != null) { 843 'RawSecureSocket error');
906 int bytes = data.length; 844 }
907 int startIndex = encrypted.start + encrypted.length;
908 encrypted.data.setRange(startIndex, startIndex + bytes, data);
909 encrypted.length += bytes;
910 progress = true;
911 } 845 }
912 } 846 _closeHandler();
913 } 847 }
914 } 848 if (_status == CLOSED) return;
915 // If there is any data in any stages of the filter, there should 849 if (_filterStatus.progress) {
916 // be data in the plaintext buffer after this process. 850 _filterPending = true;
917 // TODO(whesse): Verify that this is true, and there can be no 851 if (_filterStatus.writePlaintextNoLongerFull) _sendWriteEvent();
918 // partial encrypted block stuck in the secureFilter. 852 if (_filterStatus.readEncryptedNoLongerFull) _readSocket();
919 _filterReadEmpty = (plaintext.length == 0); 853 if (_filterStatus.writeEncryptedNoLongerEmpty) _writeSocket();
920 } 854 if (_filterStatus.readPlaintextNoLongerEmpty) _scheduleReadEvent();
921 855 if (_status == HANDSHAKE) _secureHandshake();
922 void _writeEncryptedData() { 856 }
857 _tryFilter();
858 });
859 }
860 }
861
862 List<int> _readSocketOrBufferedData(int bytes) {
863 if (_bufferedData != null) {
864 if (bytes > _bufferedData.length - _bufferedDataIndex) {
865 bytes = _bufferedData.length - _bufferedDataIndex;
866 }
867 var result = _bufferedData.sublist(_bufferedDataIndex,
868 _bufferedDataIndex + bytes);
869 _bufferedDataIndex += bytes;
870 if (_bufferedData.length == _bufferedDataIndex) {
871 _bufferedData = null;
872 }
873 return result;
874 } else if (!_socketClosedRead) {
875 try {
876 return _socket.read(bytes);
877 } catch (e) {
878 _reportError(e, "RawSecureSocket error reading encrypted socket");
879 return null;
880 }
881 } else {
882 return null;
883 }
884 }
885
886 void _readSocket() {
887 if (_status == CLOSED) return;
888 var buffer = _secureFilter.buffers[READ_ENCRYPTED];
889 if (buffer.writeFromSource(_readSocketOrBufferedData) > 0) {
890 _filterStatus.readEmpty = false;
891 }
892 }
893
894 void _writeSocket() {
923 if (_socketClosedWrite) return; 895 if (_socketClosedWrite) return;
924 var encrypted = _secureFilter.buffers[WRITE_ENCRYPTED]; 896 var buffer = _secureFilter.buffers[WRITE_ENCRYPTED];
925 var plaintext = _secureFilter.buffers[WRITE_PLAINTEXT]; 897 if (buffer.readToSocket(_socket)) { // Returns true if blocked
898 _socket.writeEventsEnabled = true;
899 }
900 }
901
902 // If a read event should be sent, add it to the controller.
903 _scheduleReadEvent() {
904 if (!_pendingReadEvent &&
905 _readEventsEnabled &&
906 _pauseCount == 0 &&
907 _secureFilter != null &&
908 !_secureFilter.buffers[READ_PLAINTEXT].isEmpty) {
909 _pendingReadEvent = true;
910 Timer.run(_sendReadEvent);
911 }
912 }
913
914 _sendReadEvent() {
915 _pendingReadEvent = false;
916 if (_readEventsEnabled &&
917 _pauseCount == 0 &&
918 _secureFilter != null &&
919 !_secureFilter.buffers[READ_PLAINTEXT].isEmpty) {
920 _controller.add(RawSocketEvent.READ);
921 _scheduleReadEvent();
922 }
923 }
924
925 // If a write event should be sent, add it to the controller.
926 _sendWriteEvent() {
927 if (!_closedWrite &&
928 _writeEventsEnabled &&
929 _pauseCount == 0 &&
930 _secureFilter != null &&
931 _secureFilter.buffers[WRITE_PLAINTEXT].free > 0) {
932 _writeEventsEnabled = false;
933 _controller.add(RawSocketEvent.WRITE);
934 }
935 }
936
937 Future<_FilterStatus> _pushAllFilterStages() {
938 if (_filterService == null) {
939 _filterService = _SecureFilter._newServicePort();
940 }
941 List args = [_filterPointer, _status != CONNECTED];
942 var bufs = _secureFilter.buffers;
943 for (var i = 0; i < NUM_BUFFERS; ++i) {
944 args.add(bufs[i].start);
945 args.add(bufs[i].end);
946 }
947
948 return _filterService.call(args).then((response) {
949 bool wasInHandshake = response[1];
950 int start(int index) => response[2 * index + 2];
951 int end(int index) => response[2 * index + 3];
952
953 _FilterStatus status = new _FilterStatus();
954 // Compute writeEmpty as "write plaintext buffer and write encrypted
955 // buffer were empty when we started and are empty now".
956 status.writeEmpty = bufs[WRITE_PLAINTEXT].isEmpty &&
957 start(WRITE_ENCRYPTED) == end(WRITE_ENCRYPTED);
958 // If we were in handshake when this started, _writeEmpty may be false
959 // because the handshake wrote data after we checked.
960 if (wasInHandshake) status.writeEmpty = false;
961
962 // Compute readEmpty as "both read buffers were empty when we started
963 // and are empty now".
964 status.readEmpty = bufs[READ_ENCRYPTED].isEmpty &&
965 start(READ_PLAINTEXT) == end(READ_PLAINTEXT);
966
967 _ExternalBuffer buffer = bufs[WRITE_PLAINTEXT];
968 int new_start = start(WRITE_PLAINTEXT);
969 if (new_start != buffer.start) {
970 status.progress = true;
971 if (buffer.free == 0) {
972 status.writePlaintextNoLongerFull = true;
973 }
974 buffer.start = new_start;
975 }
976 buffer = bufs[READ_ENCRYPTED];
977 new_start = start(READ_ENCRYPTED);
978 if (new_start != buffer.start) {
979 status.progress = true;
980 if (buffer.free == 0) {
981 status.readEncryptedNoLongerFull = true;
982 }
983 buffer.start = new_start;
984 }
985 buffer = bufs[WRITE_ENCRYPTED];
986 int new_end = end(WRITE_ENCRYPTED);
987 if (new_end != buffer.end) {
988 status.progress = true;
989 if (buffer.length == 0) {
990 status.writeEncryptedNoLongerEmpty = true;
991 }
992 buffer.end = new_end;
993 }
994 buffer = bufs[READ_PLAINTEXT];
995 new_end = end(READ_PLAINTEXT);
996 if (new_end != buffer.end) {
997 status.progress = true;
998 if (buffer.length == 0) {
999 status.readPlaintextNoLongerEmpty = true;
1000 }
1001 buffer.end = new_end;
1002 }
1003 return status;
1004 });
1005 }
1006 }
1007
1008
1009 /**
1010 * A circular buffer backed by an external byte array. Accessed from
1011 * both C++ and Dart code in an unsynchronized way, with one reading
1012 * and one writing. All updates to start and end are done by Dart code.
1013 */
1014 class _ExternalBuffer {
1015 _ExternalBuffer(this.size) {
1016 start = size~/2;
1017 end = size~/2;
1018 }
1019
1020 void advanceStart(int bytes) {
1021 assert(start > end || start + bytes <= end);
1022 start += bytes;
1023 if (start >= size) {
1024 start -= size;
1025 assert(start <= end);
1026 assert(start < size);
1027 }
1028 }
1029
1030 void advanceEnd(int bytes) {
1031 assert(start <= end || start > end + bytes);
1032 end += bytes;
1033 if (end >= size) {
1034 end -= size;
1035 assert(end < start);
1036 assert(end < size);
1037 }
1038 }
1039
1040 bool get isEmpty => end == start;
1041
1042 int get length {
1043 if (start > end) return size + end - start;
1044 return end - start;
1045 }
1046
1047 int get linearLength {
1048 if (start > end) return size - start;
1049 return end - start;
1050 }
1051
1052 int get free {
1053 if (start > end) return start - end - 1;
1054 return size + start - end - 1;
1055 }
1056
1057 int get linearFree {
1058 if (start > end) return start - end - 1;
1059 if (start == 0) return size - end - 1;
1060 return size - end;
1061 }
1062
1063 List<int> read(int bytes) {
1064 if (bytes == null) {
1065 bytes = length;
1066 } else {
1067 bytes = min(bytes, length);
1068 }
1069 if (bytes == 0) return null;
1070 List<int> result = new Uint8List(bytes);
1071 int bytesRead = 0;
1072 // Loop over zero, one, or two linear data ranges.
1073 while (bytesRead < bytes) {
1074 int toRead = linearLength;
1075 result.setRange(bytesRead,
1076 bytesRead + toRead,
1077 data,
1078 start);
1079 advanceStart(toRead);
1080 bytesRead += toRead;
1081 }
1082 return result;
1083 }
1084
1085 int write(List<int> inputData, int offset, int bytes) {
1086 if (bytes > free) {
1087 bytes = free;
1088 }
1089 int written = 0;
1090 int toWrite = min(bytes, linearFree);
1091 // Loop over zero, one, or two linear data ranges.
1092 while (toWrite > 0) {
1093 data.setRange(end, end + toWrite, inputData, offset);
1094 advanceEnd(toWrite);
1095 offset += toWrite;
1096 written += toWrite;
1097 toWrite = min(bytes - written, linearFree);
1098 }
1099 return written;
1100 }
1101
1102 int writeFromSource(List<int> getData(int requested)) {
1103 int written = 0;
1104 int toWrite = linearFree;
1105 // Loop over zero, one, or two linear data ranges.
1106 while (toWrite > 0) {
1107 // Source returns at most toWrite bytes, and it returns null when empty.
1108 var inputData = getData(toWrite);
1109 if (inputData == null) break;
1110 var len = inputData.length;
1111 data.setRange(end, end + len, inputData);
1112 advanceEnd(len);
1113 written += len;
1114 toWrite = linearFree;
1115 }
1116 return written;
1117 }
1118
1119 bool readToSocket(RawSocket socket) {
1120 // Loop over zero, one, or two linear data ranges.
926 while (true) { 1121 while (true) {
927 if (encrypted.length > 0) { 1122 var toWrite = linearLength;
928 // Write from the filter to the socket. 1123 if (toWrite == 0) return false;
929 int bytes = _socket.write(encrypted.data, 1124 int bytes = socket.write(data, start, toWrite);
930 encrypted.start, 1125 advanceStart(bytes);
931 encrypted.length); 1126 if (bytes < toWrite) {
932 encrypted.advanceStart(bytes); 1127 // The socket has blocked while we have data to write.
933 if (encrypted.length > 0) { 1128 return true;
934 // The socket has blocked while we have data to write. 1129 }
935 // We must be notified when it becomes unblocked. 1130 }
936 _socket.writeEventsEnabled = true; 1131 }
937 _filterWriteEmpty = false;
938 break;
939 }
940 } else {
941 var plaintext = _secureFilter.buffers[WRITE_PLAINTEXT];
942 if (plaintext.length > 0) {
943 int plaintext_bytes = _secureFilter.processBuffer(WRITE_PLAINTEXT);
944 plaintext.advanceStart(plaintext_bytes);
945 }
946 int bytes = _secureFilter.processBuffer(WRITE_ENCRYPTED);
947 if (bytes <= 0) {
948 // We know the WRITE_ENCRYPTED buffer is empty, and the
949 // filter wrote zero bytes to it, so the filter must be empty.
950 // Also, the WRITE_PLAINTEXT buffer must have been empty, or
951 // it would have written to the filter.
952 // TODO(whesse): Verify that the filter works this way.
953 _filterWriteEmpty = true;
954 break;
955 }
956 encrypted.length += bytes;
957 }
958 }
959 }
960 }
961
962
963 class _ExternalBuffer {
964 // Performance is improved if a full buffer of plaintext fits
965 // in the encrypted buffer, when encrypted.
966 static final int SIZE = 8 * 1024;
967 static final int ENCRYPTED_SIZE = 10 * 1024;
968 _ExternalBuffer() : start = 0, length = 0;
969
970 // TODO(whesse): Consider making this a circular buffer. Only if it helps.
971 void advanceStart(int numBytes) {
972 start += numBytes;
973 length -= numBytes;
974 if (length == 0) {
975 start = 0;
976 }
977 }
978
979 int get free => data.length - (start + length);
980 1132
981 List data; // This will be a ExternalByteArray, backed by C allocated data. 1133 List data; // This will be a ExternalByteArray, backed by C allocated data.
982 int start; 1134 int start;
983 int length; 1135 int end;
1136 final size;
984 } 1137 }
985 1138
986 1139
987 abstract class _SecureFilter { 1140 abstract class _SecureFilter {
988 external factory _SecureFilter(); 1141 external factory _SecureFilter();
989 1142
1143 external static SendPort _newServicePort();
1144
990 void connect(String hostName, 1145 void connect(String hostName,
991 Uint8List addr, 1146 Uint8List addr,
992 int port, 1147 int port,
993 bool is_server, 1148 bool is_server,
994 String certificateName, 1149 String certificateName,
995 bool requestClientCertificate, 1150 bool requestClientCertificate,
996 bool requireClientCertificate, 1151 bool requireClientCertificate,
997 bool sendClientCertificate); 1152 bool sendClientCertificate);
998 void destroy(); 1153 void destroy();
999 void handshake(); 1154 void handshake();
1000 void init(); 1155 void init();
1001 X509Certificate get peerCertificate; 1156 X509Certificate get peerCertificate;
1002 int processBuffer(int bufferIndex); 1157 int processBuffer(int bufferIndex);
1003 void registerBadCertificateCallback(Function callback); 1158 void registerBadCertificateCallback(Function callback);
1004 void registerHandshakeCompleteCallback(Function handshakeCompleteHandler); 1159 void registerHandshakeCompleteCallback(Function handshakeCompleteHandler);
1160 int _pointer();
1005 1161
1006 List<_ExternalBuffer> get buffers; 1162 List<_ExternalBuffer> get buffers;
1007 } 1163 }
OLDNEW
« no previous file with comments | « sdk/lib/_internal/lib/io_patch.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698