OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 /** | 7 /** |
8 * A high-level class for communicating securely over a TCP socket, using | 8 * A high-level class for communicating securely over a TCP socket, using |
9 * TLS and SSL. The [SecureSocket] exposes both a [Stream] and an | 9 * TLS and SSL. The [SecureSocket] exposes both a [Stream] and an |
10 * [IOSink] interface, making it ideal for using together with | 10 * [IOSink] interface, making it ideal for using together with |
(...skipping 340 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
351 this.issuer, | 351 this.issuer, |
352 this.startValidity, | 352 this.startValidity, |
353 this.endValidity); | 353 this.endValidity); |
354 final String subject; | 354 final String subject; |
355 final String issuer; | 355 final String issuer; |
356 final DateTime startValidity; | 356 final DateTime startValidity; |
357 final DateTime endValidity; | 357 final DateTime endValidity; |
358 } | 358 } |
359 | 359 |
360 | 360 |
| 361 class _FilterStatus { |
| 362 bool progress = false; // The filter read or wrote data to the buffers. |
| 363 bool readEmpty = true; // The read buffers and decryption filter are empty. |
| 364 bool writeEmpty = true; // The write buffers and encryption filter are empty. |
| 365 // These are set if a buffer changes state from empty or full. |
| 366 bool readPlaintextNoLongerEmpty = false; |
| 367 bool writePlaintextNoLongerFull = false; |
| 368 bool readEncryptedNoLongerFull = false; |
| 369 bool writeEncryptedNoLongerEmpty = false; |
| 370 |
| 371 _FilterStatus(); |
| 372 } |
| 373 |
| 374 |
361 class _RawSecureSocket extends Stream<RawSocketEvent> | 375 class _RawSecureSocket extends Stream<RawSocketEvent> |
362 implements RawSecureSocket { | 376 implements RawSecureSocket { |
363 // Status states | 377 // Status states |
364 static final int NOT_CONNECTED = 200; | |
365 static final int HANDSHAKE = 201; | 378 static final int HANDSHAKE = 201; |
366 static final int CONNECTED = 202; | 379 static final int CONNECTED = 202; |
367 static final int CLOSED = 203; | 380 static final int CLOSED = 203; |
368 | 381 |
369 // Buffer identifiers. | 382 // Buffer identifiers. |
370 // These must agree with those in the native C++ implementation. | 383 // These must agree with those in the native C++ implementation. |
371 static final int READ_PLAINTEXT = 0; | 384 static final int READ_PLAINTEXT = 0; |
372 static final int WRITE_PLAINTEXT = 1; | 385 static final int WRITE_PLAINTEXT = 1; |
373 static final int READ_ENCRYPTED = 2; | 386 static final int READ_ENCRYPTED = 2; |
374 static final int WRITE_ENCRYPTED = 3; | 387 static final int WRITE_ENCRYPTED = 3; |
375 static final int NUM_BUFFERS = 4; | 388 static final int NUM_BUFFERS = 4; |
376 | 389 |
| 390 // Is a buffer identifier for an encrypted buffer? |
| 391 static bool _isBufferEncrypted(int identifier) => identifier >= READ_ENCRYPTED
; |
| 392 |
377 RawSocket _socket; | 393 RawSocket _socket; |
378 final Completer<_RawSecureSocket> _handshakeComplete = | 394 final Completer<_RawSecureSocket> _handshakeComplete = |
379 new Completer<_RawSecureSocket>(); | 395 new Completer<_RawSecureSocket>(); |
380 StreamController<RawSocketEvent> _controller; | 396 StreamController<RawSocketEvent> _controller; |
381 Stream<RawSocketEvent> _stream; | 397 Stream<RawSocketEvent> _stream; |
382 StreamSubscription<RawSocketEvent> _socketSubscription; | 398 StreamSubscription<RawSocketEvent> _socketSubscription; |
383 List<int> _bufferedData; | 399 List<int> _bufferedData; |
384 int _bufferedDataIndex = 0; | 400 int _bufferedDataIndex = 0; |
385 final InternetAddress address; | 401 final InternetAddress address; |
386 final bool is_server; | 402 final bool is_server; |
387 final String certificateName; | 403 final String certificateName; |
388 final bool requestClientCertificate; | 404 final bool requestClientCertificate; |
389 final bool requireClientCertificate; | 405 final bool requireClientCertificate; |
390 final bool sendClientCertificate; | 406 final bool sendClientCertificate; |
391 final Function onBadCertificate; | 407 final Function onBadCertificate; |
392 | 408 |
393 var _status = NOT_CONNECTED; | 409 var _status = HANDSHAKE; |
394 bool _writeEventsEnabled = true; | 410 bool _writeEventsEnabled = true; |
395 bool _readEventsEnabled = true; | 411 bool _readEventsEnabled = true; |
| 412 int _pauseCount = 0; |
| 413 bool _pendingReadEvent = false; |
396 bool _socketClosedRead = false; // The network socket is closed for reading. | 414 bool _socketClosedRead = false; // The network socket is closed for reading. |
397 bool _socketClosedWrite = false; // The network socket is closed for writing. | 415 bool _socketClosedWrite = false; // The network socket is closed for writing. |
398 bool _closedRead = false; // The secure socket has fired an onClosed event. | 416 bool _closedRead = false; // The secure socket has fired an onClosed event. |
399 bool _closedWrite = false; // The secure socket has been closed for writing. | 417 bool _closedWrite = false; // The secure socket has been closed for writing. |
400 bool _filterReadEmpty = true; // There is no buffered data to read. | 418 _FilterStatus _filterStatus = new _FilterStatus(); |
401 bool _filterWriteEmpty = true; // There is no buffered data to be written. | |
402 bool _connectPending = false; | 419 bool _connectPending = false; |
| 420 bool _filterPending = false; |
| 421 bool _filterActive = false; |
| 422 |
403 _SecureFilter _secureFilter = new _SecureFilter(); | 423 _SecureFilter _secureFilter = new _SecureFilter(); |
| 424 int _filterPointer; |
| 425 SendPort _filterService; |
404 | 426 |
405 static Future<_RawSecureSocket> connect( | 427 static Future<_RawSecureSocket> connect( |
406 host, | 428 host, |
407 int requestedPort, | 429 int requestedPort, |
408 String certificateName, | 430 String certificateName, |
409 {bool is_server, | 431 {bool is_server, |
410 RawSocket socket, | 432 RawSocket socket, |
411 StreamSubscription subscription, | 433 StreamSubscription subscription, |
412 List<int> bufferedData, | 434 List<int> bufferedData, |
413 bool requestClientCertificate: false, | 435 bool requestClientCertificate: false, |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
457 sync: true, | 479 sync: true, |
458 onListen: _onSubscriptionStateChange, | 480 onListen: _onSubscriptionStateChange, |
459 onPause: _onPauseStateChange, | 481 onPause: _onPauseStateChange, |
460 onResume: _onPauseStateChange, | 482 onResume: _onPauseStateChange, |
461 onCancel: _onSubscriptionStateChange); | 483 onCancel: _onSubscriptionStateChange); |
462 _stream = _controller.stream; | 484 _stream = _controller.stream; |
463 // Throw an ArgumentError if any field is invalid. After this, all | 485 // Throw an ArgumentError if any field is invalid. After this, all |
464 // errors will be reported through the future or the stream. | 486 // errors will be reported through the future or the stream. |
465 _verifyFields(); | 487 _verifyFields(); |
466 _secureFilter.init(); | 488 _secureFilter.init(); |
467 if (_bufferedData != null) _readFromBuffered(); | 489 _filterPointer = _secureFilter._pointer(); |
468 _secureFilter.registerHandshakeCompleteCallback( | 490 _secureFilter.registerHandshakeCompleteCallback( |
469 _secureHandshakeCompleteHandler); | 491 _secureHandshakeCompleteHandler); |
470 if (onBadCertificate != null) { | 492 if (onBadCertificate != null) { |
471 _secureFilter.registerBadCertificateCallback(onBadCertificate); | 493 _secureFilter.registerBadCertificateCallback(onBadCertificate); |
472 } | 494 } |
473 var futureSocket; | 495 var futureSocket; |
474 if (socket == null) { | 496 if (socket == null) { |
475 futureSocket = RawSocket.connect(address, requestedPort); | 497 futureSocket = RawSocket.connect(address, requestedPort); |
476 } else { | 498 } else { |
477 futureSocket = new Future.value(socket); | 499 futureSocket = new Future.value(socket); |
(...skipping 16 matching lines...) Expand all Loading... |
494 _connectPending = true; | 516 _connectPending = true; |
495 _secureFilter.connect(address.host, | 517 _secureFilter.connect(address.host, |
496 (address as dynamic)._sockaddr_storage, | 518 (address as dynamic)._sockaddr_storage, |
497 port, | 519 port, |
498 is_server, | 520 is_server, |
499 certificateName, | 521 certificateName, |
500 requestClientCertificate || | 522 requestClientCertificate || |
501 requireClientCertificate, | 523 requireClientCertificate, |
502 requireClientCertificate, | 524 requireClientCertificate, |
503 sendClientCertificate); | 525 sendClientCertificate); |
504 _status = HANDSHAKE; | |
505 _secureHandshake(); | 526 _secureHandshake(); |
506 }) | 527 }) |
507 .catchError((error) { | 528 .catchError((error) { |
508 _handshakeComplete.completeError(error); | 529 _handshakeComplete.completeError(error); |
509 _close(); | 530 _close(); |
510 }); | 531 }); |
511 } | 532 } |
512 | 533 |
513 StreamSubscription listen(void onData(RawSocketEvent data), | 534 StreamSubscription listen(void onData(RawSocketEvent data), |
514 {void onError(error), | 535 {void onError(error), |
515 void onDone(), | 536 void onDone(), |
516 bool cancelOnError}) { | 537 bool cancelOnError}) { |
517 if (_writeEventsEnabled) { | 538 _sendWriteEvent(); |
518 _writeEventsEnabled = false; | |
519 _controller.add(RawSocketEvent.WRITE); | |
520 } | |
521 return _stream.listen(onData, | 539 return _stream.listen(onData, |
522 onError: onError, | 540 onError: onError, |
523 onDone: onDone, | 541 onDone: onDone, |
524 cancelOnError: cancelOnError); | 542 cancelOnError: cancelOnError); |
525 } | 543 } |
526 | 544 |
527 void _verifyFields() { | 545 void _verifyFields() { |
528 assert(is_server is bool); | 546 assert(is_server is bool); |
529 assert(_socket == null || _socket is RawSocket); | 547 assert(_socket == null || _socket is RawSocket); |
530 if (address is! InternetAddress) { | 548 if (address is! InternetAddress) { |
(...skipping 21 matching lines...) Expand all Loading... |
552 } | 570 } |
553 | 571 |
554 int get port => _socket.port; | 572 int get port => _socket.port; |
555 | 573 |
556 String get remoteHost => _socket.remoteHost; | 574 String get remoteHost => _socket.remoteHost; |
557 | 575 |
558 int get remotePort => _socket.remotePort; | 576 int get remotePort => _socket.remotePort; |
559 | 577 |
560 int available() { | 578 int available() { |
561 if (_status != CONNECTED) return 0; | 579 if (_status != CONNECTED) return 0; |
562 _readEncryptedData(); | |
563 return _secureFilter.buffers[READ_PLAINTEXT].length; | 580 return _secureFilter.buffers[READ_PLAINTEXT].length; |
564 } | 581 } |
565 | 582 |
566 void close() { | 583 void close() { |
567 shutdown(SocketDirection.BOTH); | 584 shutdown(SocketDirection.BOTH); |
568 } | 585 } |
569 | 586 |
570 void _close() { | 587 void _close() { |
571 _closedWrite = true; | 588 _closedWrite = true; |
572 _closedRead = true; | 589 _closedRead = true; |
573 if (_socket != null) { | 590 if (_socket != null) { |
574 _socket.close(); | 591 _socket.close(); |
575 } | 592 } |
576 _socketClosedWrite = true; | 593 _socketClosedWrite = true; |
577 _socketClosedRead = true; | 594 _socketClosedRead = true; |
578 if (_secureFilter != null) { | 595 if (!_filterActive && _secureFilter != null) { |
579 _secureFilter.destroy(); | 596 _secureFilter.destroy(); |
580 _secureFilter = null; | 597 _secureFilter = null; |
581 } | 598 } |
582 if (_socketSubscription != null) { | 599 if (_socketSubscription != null) { |
583 _socketSubscription.cancel(); | 600 _socketSubscription.cancel(); |
584 } | 601 } |
585 _controller.close(); | 602 _controller.close(); |
586 _status = CLOSED; | 603 _status = CLOSED; |
587 } | 604 } |
588 | 605 |
589 void shutdown(SocketDirection direction) { | 606 void shutdown(SocketDirection direction) { |
590 if (direction == SocketDirection.SEND || | 607 if (direction == SocketDirection.SEND || |
591 direction == SocketDirection.BOTH) { | 608 direction == SocketDirection.BOTH) { |
592 _closedWrite = true; | 609 _closedWrite = true; |
593 _writeEncryptedData(); | 610 if (_filterStatus.writeEmpty) { |
594 if (_filterWriteEmpty) { | |
595 _socket.shutdown(SocketDirection.SEND); | 611 _socket.shutdown(SocketDirection.SEND); |
596 _socketClosedWrite = true; | 612 _socketClosedWrite = true; |
597 if (_closedRead) { | 613 if (_closedRead) { |
598 _close(); | 614 _close(); |
599 } | 615 } |
600 } | 616 } |
601 } | 617 } |
602 if (direction == SocketDirection.RECEIVE || | 618 if (direction == SocketDirection.RECEIVE || |
603 direction == SocketDirection.BOTH) { | 619 direction == SocketDirection.BOTH) { |
604 _closedRead = true; | 620 _closedRead = true; |
605 _socketClosedRead = true; | 621 _socketClosedRead = true; |
606 _socket.shutdown(SocketDirection.RECEIVE); | 622 _socket.shutdown(SocketDirection.RECEIVE); |
607 if (_socketClosedWrite) { | 623 if (_socketClosedWrite) { |
608 _close(); | 624 _close(); |
609 } | 625 } |
610 } | 626 } |
611 } | 627 } |
612 | 628 |
613 bool get writeEventsEnabled => _writeEventsEnabled; | 629 bool get writeEventsEnabled => _writeEventsEnabled; |
614 | 630 |
615 void set writeEventsEnabled(bool value) { | 631 void set writeEventsEnabled(bool value) { |
616 if (value && | 632 _writeEventsEnabled = value; |
617 _controller.hasListener && | 633 if (value) { |
618 _secureFilter != null && | 634 Timer.run(() => _sendWriteEvent()); |
619 _secureFilter.buffers[WRITE_PLAINTEXT].free > 0) { | |
620 Timer.run(() => _controller.add(RawSocketEvent.WRITE)); | |
621 } else { | |
622 _writeEventsEnabled = value; | |
623 } | 635 } |
624 } | 636 } |
625 | 637 |
626 bool get readEventsEnabled => _readEventsEnabled; | 638 bool get readEventsEnabled => _readEventsEnabled; |
627 | 639 |
628 void set readEventsEnabled(bool value) { | 640 void set readEventsEnabled(bool value) { |
629 _readEventsEnabled = value; | 641 _readEventsEnabled = value; |
630 if (value && | 642 _scheduleReadEvent(); |
631 ((_secureFilter != null && | |
632 _secureFilter.buffers[READ_PLAINTEXT].length > 0) || | |
633 _socketClosedRead)) { | |
634 // We might not have no underlying socket to set off read events. | |
635 Timer.run(_readHandler); | |
636 } | |
637 } | 643 } |
638 | 644 |
639 List<int> read([int len]) { | 645 List<int> read([int length]) { |
| 646 if (length != null && (length is! int || length < 0)) { |
| 647 throw new ArgumentError( |
| 648 "Invalid length parameter in SecureSocket.read (length: $length)"); |
| 649 } |
640 if (_closedRead) { | 650 if (_closedRead) { |
641 throw new SocketException("Reading from a closed socket"); | 651 throw new SocketException("Reading from a closed socket"); |
642 } | 652 } |
643 if (_status != CONNECTED) { | 653 if (_status != CONNECTED) { |
644 return null; | 654 return null; |
645 } | 655 } |
646 var buffer = _secureFilter.buffers[READ_PLAINTEXT]; | 656 var result = _secureFilter.buffers[READ_PLAINTEXT].read(length); |
647 _readEncryptedData(); | 657 _scheduleFilter(); |
648 int toRead = buffer.length; | |
649 if (len != null) { | |
650 if (len is! int || len < 0) { | |
651 throw new ArgumentError( | |
652 "Invalid len parameter in SecureSocket.read (len: $len)"); | |
653 } | |
654 if (len < toRead) { | |
655 toRead = len; | |
656 } | |
657 } | |
658 List<int> result = (toRead == 0) ? null : | |
659 buffer.data.sublist(buffer.start, buffer.start + toRead); | |
660 buffer.advanceStart(toRead); | |
661 | |
662 // Set up a read event if the filter still has data. | |
663 if (!_filterReadEmpty) { | |
664 Timer.run(_readHandler); | |
665 } | |
666 | |
667 if (_socketClosedRead) { // An onClose event is pending. | |
668 // _closedRead is false, since we are in a read call. | |
669 if (!_filterReadEmpty) { | |
670 // _filterReadEmpty may be out of date since read empties | |
671 // the plaintext buffer after calling _readEncryptedData. | |
672 // TODO(whesse): Fix this as part of fixing read. | |
673 _readEncryptedData(); | |
674 } | |
675 if (_filterReadEmpty) { | |
676 // This can't be an else clause: the value of _filterReadEmpty changes. | |
677 // This must be asynchronous, because we are in a read call. | |
678 Timer.run(_closeHandler); | |
679 } | |
680 } | |
681 | |
682 return result; | 658 return result; |
683 } | 659 } |
684 | 660 |
685 // Write the data to the socket, and flush it as much as possible | 661 // Write the data to the socket, and schedule the filter to encrypt it. |
686 // until it would block. If the write would block, _writeEncryptedData sets | |
687 // up handlers to flush the pipeline when possible. | |
688 int write(List<int> data, [int offset, int bytes]) { | 662 int write(List<int> data, [int offset, int bytes]) { |
| 663 if (bytes != null && (bytes is! int || bytes < 0)) { |
| 664 throw new ArgumentError( |
| 665 "Invalid bytes parameter in SecureSocket.read (bytes: $bytes)"); |
| 666 } |
| 667 if (offset != null && (offset is! int || offset < 0)) { |
| 668 throw new ArgumentError( |
| 669 "Invalid offset parameter in SecureSocket.read (offset: $offset)"); |
| 670 } |
689 if (_closedWrite) { | 671 if (_closedWrite) { |
690 _controller.addError(new SocketException("Writing to a closed socket")); | 672 _controller.addError(new SocketException("Writing to a closed socket")); |
691 return 0; | 673 return 0; |
692 } | 674 } |
693 if (_status != CONNECTED) return 0; | 675 if (_status != CONNECTED) return 0; |
694 | |
695 if (offset == null) offset = 0; | 676 if (offset == null) offset = 0; |
696 if (bytes == null) bytes = data.length - offset; | 677 if (bytes == null) bytes = data.length - offset; |
697 | 678 |
698 var buffer = _secureFilter.buffers[WRITE_PLAINTEXT]; | 679 int written = |
699 if (bytes > buffer.free) { | 680 _secureFilter.buffers[WRITE_PLAINTEXT].write(data, offset, bytes); |
700 bytes = buffer.free; | 681 if (written > 0) { |
| 682 _filterStatus.writeEmpty = false; |
701 } | 683 } |
702 if (bytes > 0) { | 684 _scheduleFilter(); |
703 int startIndex = buffer.start + buffer.length; | 685 return written; |
704 buffer.data.setRange(startIndex, startIndex + bytes, data, offset); | |
705 buffer.length += bytes; | |
706 } | |
707 _writeEncryptedData(); // Tries to flush all pipeline stages. | |
708 return bytes; | |
709 } | 686 } |
710 | 687 |
711 X509Certificate get peerCertificate => _secureFilter.peerCertificate; | 688 X509Certificate get peerCertificate => _secureFilter.peerCertificate; |
712 | 689 |
713 bool setOption(SocketOption option, bool enabled) { | 690 bool setOption(SocketOption option, bool enabled) { |
714 if (_socket == null) return false; | 691 if (_socket == null) return false; |
715 return _socket.setOption(option, enabled); | 692 return _socket.setOption(option, enabled); |
716 } | 693 } |
717 | 694 |
718 void _writeHandler() { | |
719 if (_status == CLOSED) return; | |
720 _writeEncryptedData(); | |
721 if (_filterWriteEmpty && _closedWrite && !_socketClosedWrite) { | |
722 // Close _socket for write, by calling shutdown(), to avoid cloning the | |
723 // socket closing code in shutdown(). | |
724 shutdown(SocketDirection.SEND); | |
725 } | |
726 if (_status == HANDSHAKE) { | |
727 try { | |
728 _secureHandshake(); | |
729 } catch (e) { _reportError(e, "RawSecureSocket error"); } | |
730 } else if (_status == CONNECTED && | |
731 _controller.hasListener && | |
732 _writeEventsEnabled && | |
733 _secureFilter.buffers[WRITE_PLAINTEXT].free > 0) { | |
734 // Reset the one-shot handler. | |
735 _writeEventsEnabled = false; | |
736 _controller.add(RawSocketEvent.WRITE); | |
737 } | |
738 } | |
739 | |
740 void _eventDispatcher(RawSocketEvent event) { | 695 void _eventDispatcher(RawSocketEvent event) { |
741 if (event == RawSocketEvent.READ) { | 696 if (event == RawSocketEvent.READ) { |
742 _readHandler(); | 697 _readHandler(); |
743 } else if (event == RawSocketEvent.WRITE) { | 698 } else if (event == RawSocketEvent.WRITE) { |
744 _writeHandler(); | 699 _writeHandler(); |
745 } else if (event == RawSocketEvent.READ_CLOSED) { | 700 } else if (event == RawSocketEvent.READ_CLOSED) { |
746 _closeHandler(); | 701 _closeHandler(); |
747 } | 702 } |
748 } | 703 } |
749 | 704 |
750 void _readFromBuffered() { | |
751 assert(_bufferedData != null); | |
752 var encrypted = _secureFilter.buffers[READ_ENCRYPTED]; | |
753 var bytes = _bufferedData.length - _bufferedDataIndex; | |
754 int startIndex = encrypted.start + encrypted.length; | |
755 encrypted.data.setRange(startIndex, | |
756 startIndex + bytes, | |
757 _bufferedData, | |
758 _bufferedDataIndex); | |
759 encrypted.length += bytes; | |
760 _bufferedDataIndex += bytes; | |
761 if (_bufferedData.length == _bufferedDataIndex) { | |
762 _bufferedData = null; | |
763 } | |
764 } | |
765 | |
766 void _readHandler() { | 705 void _readHandler() { |
767 if (_status == CLOSED) { | 706 _readSocket(); |
768 return; | 707 _scheduleFilter(); |
769 } else if (_status == HANDSHAKE) { | 708 } |
770 try { | 709 |
771 _secureHandshake(); | 710 void _writeHandler() { |
772 if (_status != HANDSHAKE) _readHandler(); | 711 _writeSocket(); |
773 } catch (e) { _reportError(e, "RawSecureSocket error"); } | 712 _scheduleFilter(); |
774 } else { | |
775 if (_status != CONNECTED) { | |
776 // Cannot happen. | |
777 throw new SocketException("Internal SocketIO Error"); | |
778 } | |
779 try { | |
780 _readEncryptedData(); | |
781 } catch (e) { _reportError(e, "RawSecureSocket error"); } | |
782 if (!_filterReadEmpty) { | |
783 if (_readEventsEnabled) { | |
784 if (_secureFilter.buffers[READ_PLAINTEXT].length > 0) { | |
785 _controller.add(RawSocketEvent.READ); | |
786 } | |
787 if (_socketClosedRead) { | |
788 // Keep firing read events until we are paused or buffer is empty. | |
789 Timer.run(_readHandler); | |
790 } | |
791 } | |
792 } else if (_socketClosedRead) { | |
793 _closeHandler(); | |
794 } | |
795 } | |
796 } | 713 } |
797 | 714 |
798 void _doneHandler() { | 715 void _doneHandler() { |
799 if (_filterReadEmpty) { | 716 if (_filterStatus.readEmpty) { |
800 _close(); | 717 _close(); |
801 } | 718 } |
802 } | 719 } |
803 | 720 |
804 void _errorHandler(e) { | 721 void _errorHandler(e) { |
805 _reportError(e, 'Error on underlying RawSocket'); | 722 _reportError(e, 'Error on underlying RawSocket'); |
806 } | 723 } |
807 | 724 |
808 void _reportError(e, String message) { | 725 void _reportError(e, String message) { |
809 // TODO(whesse): Call _reportError from all internal functions that throw. | 726 // TODO(whesse): Call _reportError from all internal functions that throw. |
810 if (e is SocketException) { | 727 if (e is SocketException) { |
811 e = new SocketException('$message (${e.message})', e.osError); | 728 e = new SocketException('$message (${e.message})', e.osError); |
812 } else if (e is OSError) { | 729 } else if (e is OSError) { |
813 e = new SocketException(message, e); | 730 e = new SocketException(message, e); |
814 } else { | 731 } else { |
815 e = new SocketException('$message (${e.toString()})', null); | 732 e = new SocketException('$message (${e.toString()})', null); |
816 } | 733 } |
817 if (_connectPending) { | 734 if (_connectPending) { |
818 _handshakeComplete.completeError(e); | 735 _handshakeComplete.completeError(e); |
819 } else { | 736 } else { |
820 _controller.addError(e); | 737 _controller.addError(e); |
821 } | 738 } |
822 _close(); | 739 _close(); |
823 } | 740 } |
824 | 741 |
825 void _closeHandler() { | 742 void _closeHandler() { |
826 if (_status == CONNECTED) { | 743 if (_status == CONNECTED) { |
827 if (_closedRead) return; | 744 if (_closedRead) return; |
828 _socketClosedRead = true; | 745 _socketClosedRead = true; |
829 if (_filterReadEmpty) { | 746 if (_filterStatus.readEmpty) { |
830 _closedRead = true; | 747 _closedRead = true; |
831 _controller.add(RawSocketEvent.READ_CLOSED); | 748 _controller.add(RawSocketEvent.READ_CLOSED); |
832 if (_socketClosedWrite) { | 749 if (_socketClosedWrite) { |
833 _close(); | 750 _close(); |
834 } | 751 } |
| 752 } else { |
| 753 _scheduleFilter(); |
835 } | 754 } |
836 } else if (_status == HANDSHAKE) { | 755 } else if (_status == HANDSHAKE) { |
| 756 _socketClosedRead = true; |
| 757 if (_filterStatus.readEmpty) { |
837 _reportError( | 758 _reportError( |
838 new SocketException('Connection terminated during handshake'), | 759 new SocketException('Connection terminated during handshake'), |
839 'handshake error'); | 760 'RawSecureSocket error'); |
| 761 } else { |
| 762 _secureHandshake(); |
| 763 } |
840 } | 764 } |
841 } | 765 } |
842 | 766 |
843 void _secureHandshake() { | 767 void _secureHandshake() { |
844 _readEncryptedData(); | 768 try { |
845 _secureFilter.handshake(); | 769 _secureFilter.handshake(); |
846 _writeEncryptedData(); | 770 _filterStatus.writeEmpty = false; |
| 771 _readSocket(); |
| 772 _writeSocket(); |
| 773 _scheduleFilter(); |
| 774 } catch (e) { |
| 775 _reportError(e, "RawSecureSocket error"); |
| 776 } |
847 } | 777 } |
848 | 778 |
849 void _secureHandshakeCompleteHandler() { | 779 void _secureHandshakeCompleteHandler() { |
850 _status = CONNECTED; | 780 _status = CONNECTED; |
851 if (_connectPending) { | 781 if (_connectPending) { |
852 _connectPending = false; | 782 _connectPending = false; |
853 // If we complete the future synchronously, user code will run here, | 783 // We don't want user code to run synchronously in this callback. |
854 // and modify the state of the RawSecureSocket. For example, it | |
855 // could close the socket, and set _filter to null. | |
856 Timer.run(() => _handshakeComplete.complete(this)); | 784 Timer.run(() => _handshakeComplete.complete(this)); |
857 } | 785 } |
858 } | 786 } |
859 | 787 |
860 void _onPauseStateChange() { | 788 void _onPauseStateChange() { |
| 789 if (_controller.isPaused) { |
| 790 _pauseCount++; |
| 791 } else { |
| 792 _pauseCount--; |
| 793 if (_pauseCount == 0) { |
| 794 _scheduleReadEvent(); |
| 795 _sendWriteEvent(); // Can send event synchronously. |
| 796 } |
| 797 } |
| 798 |
861 if (!_socketClosedRead || !_socketClosedWrite) { | 799 if (!_socketClosedRead || !_socketClosedWrite) { |
862 if (_controller.isPaused) { | 800 if (_controller.isPaused) { |
863 _socketSubscription.pause(); | 801 _socketSubscription.pause(); |
864 } else { | 802 } else { |
865 _socketSubscription.resume(); | 803 _socketSubscription.resume(); |
866 } | 804 } |
867 } | 805 } |
868 } | 806 } |
869 | 807 |
870 void _onSubscriptionStateChange() { | 808 void _onSubscriptionStateChange() { |
871 if (_controller.hasListener) { | 809 if (_controller.hasListener) { |
872 // TODO(ajohnsen): Do something here? | 810 // TODO(ajohnsen): Do something here? |
873 } | 811 } |
874 } | 812 } |
875 | 813 |
876 void _readEncryptedData() { | 814 void _scheduleFilter() { |
877 // Read from the socket, and push it through the filter as far as | 815 _filterPending = true; |
878 // possible. | 816 _tryFilter(); |
879 var encrypted = _secureFilter.buffers[READ_ENCRYPTED]; | 817 } |
880 var plaintext = _secureFilter.buffers[READ_PLAINTEXT]; | 818 |
881 bool progress = true; | 819 void _tryFilter() { |
882 while (progress) { | 820 if (_status == CLOSED) return; |
883 progress = false; | 821 if (_filterPending && !_filterActive) { |
884 // Do not try to read plaintext from the filter while handshaking. | 822 _filterActive = true; |
885 if ((_status == CONNECTED) && plaintext.free > 0) { | 823 _filterPending = false; |
886 int bytes = _secureFilter.processBuffer(READ_PLAINTEXT); | 824 _pushAllFilterStages().then((status) { |
887 if (bytes > 0) { | 825 _filterStatus = status; |
888 plaintext.length += bytes; | 826 _filterActive = false; |
889 progress = true; | 827 if (_status == CLOSED) { |
890 } | 828 _secureFilter.destroy(); |
891 } | 829 _secureFilter = null; |
892 if (encrypted.length > 0) { | 830 return; |
893 int bytes = _secureFilter.processBuffer(READ_ENCRYPTED); | 831 } |
894 if (bytes > 0) { | 832 if (_filterStatus.writeEmpty && _closedWrite && !_socketClosedWrite) { |
895 encrypted.advanceStart(bytes); | 833 // Checks for and handles all cases of partially closed sockets. |
896 progress = true; | 834 shutdown(SocketDirection.SEND); |
897 } | 835 if (_status == CLOSED) return; |
898 } | 836 } |
899 if (!_socketClosedRead && encrypted.free > 0) { | 837 if (_filterStatus.readEmpty && _socketClosedRead && !_closedRead) { |
900 if (_bufferedData != null) { | 838 if (_status == HANDSHAKE) { |
901 _readFromBuffered(); | 839 _secureFilter.handshake(); |
902 progress = true; | 840 if (_status == HANDSHAKE) { |
903 } else { | 841 _reportError( |
904 List<int> data = _socket.read(encrypted.free); | 842 new SocketException('Connection terminated during handshake'), |
905 if (data != null) { | 843 'RawSecureSocket error'); |
906 int bytes = data.length; | 844 } |
907 int startIndex = encrypted.start + encrypted.length; | |
908 encrypted.data.setRange(startIndex, startIndex + bytes, data); | |
909 encrypted.length += bytes; | |
910 progress = true; | |
911 } | 845 } |
912 } | 846 _closeHandler(); |
913 } | 847 } |
914 } | 848 if (_status == CLOSED) return; |
915 // If there is any data in any stages of the filter, there should | 849 if (_filterStatus.progress) { |
916 // be data in the plaintext buffer after this process. | 850 _filterPending = true; |
917 // TODO(whesse): Verify that this is true, and there can be no | 851 if (_filterStatus.writePlaintextNoLongerFull) _sendWriteEvent(); |
918 // partial encrypted block stuck in the secureFilter. | 852 if (_filterStatus.readEncryptedNoLongerFull) _readSocket(); |
919 _filterReadEmpty = (plaintext.length == 0); | 853 if (_filterStatus.writeEncryptedNoLongerEmpty) _writeSocket(); |
920 } | 854 if (_filterStatus.readPlaintextNoLongerEmpty) _scheduleReadEvent(); |
921 | 855 if (_status == HANDSHAKE) _secureHandshake(); |
922 void _writeEncryptedData() { | 856 } |
| 857 _tryFilter(); |
| 858 }); |
| 859 } |
| 860 } |
| 861 |
| 862 List<int> _readSocketOrBufferedData(int bytes) { |
| 863 if (_bufferedData != null) { |
| 864 if (bytes > _bufferedData.length - _bufferedDataIndex) { |
| 865 bytes = _bufferedData.length - _bufferedDataIndex; |
| 866 } |
| 867 var result = _bufferedData.sublist(_bufferedDataIndex, |
| 868 _bufferedDataIndex + bytes); |
| 869 _bufferedDataIndex += bytes; |
| 870 if (_bufferedData.length == _bufferedDataIndex) { |
| 871 _bufferedData = null; |
| 872 } |
| 873 return result; |
| 874 } else if (!_socketClosedRead) { |
| 875 try { |
| 876 return _socket.read(bytes); |
| 877 } catch (e) { |
| 878 _reportError(e, "RawSecureSocket error reading encrypted socket"); |
| 879 return null; |
| 880 } |
| 881 } else { |
| 882 return null; |
| 883 } |
| 884 } |
| 885 |
| 886 void _readSocket() { |
| 887 if (_status == CLOSED) return; |
| 888 var buffer = _secureFilter.buffers[READ_ENCRYPTED]; |
| 889 if (buffer.writeFromSource(_readSocketOrBufferedData) > 0) { |
| 890 _filterStatus.readEmpty = false; |
| 891 } |
| 892 } |
| 893 |
| 894 void _writeSocket() { |
923 if (_socketClosedWrite) return; | 895 if (_socketClosedWrite) return; |
924 var encrypted = _secureFilter.buffers[WRITE_ENCRYPTED]; | 896 var buffer = _secureFilter.buffers[WRITE_ENCRYPTED]; |
925 var plaintext = _secureFilter.buffers[WRITE_PLAINTEXT]; | 897 if (buffer.readToSocket(_socket)) { // Returns true if blocked |
| 898 _socket.writeEventsEnabled = true; |
| 899 } |
| 900 } |
| 901 |
| 902 // If a read event should be sent, add it to the controller. |
| 903 _scheduleReadEvent() { |
| 904 if (!_pendingReadEvent && |
| 905 _readEventsEnabled && |
| 906 _pauseCount == 0 && |
| 907 _secureFilter != null && |
| 908 !_secureFilter.buffers[READ_PLAINTEXT].isEmpty) { |
| 909 _pendingReadEvent = true; |
| 910 Timer.run(_sendReadEvent); |
| 911 } |
| 912 } |
| 913 |
| 914 _sendReadEvent() { |
| 915 _pendingReadEvent = false; |
| 916 if (_readEventsEnabled && |
| 917 _pauseCount == 0 && |
| 918 _secureFilter != null && |
| 919 !_secureFilter.buffers[READ_PLAINTEXT].isEmpty) { |
| 920 _controller.add(RawSocketEvent.READ); |
| 921 _scheduleReadEvent(); |
| 922 } |
| 923 } |
| 924 |
| 925 // If a write event should be sent, add it to the controller. |
| 926 _sendWriteEvent() { |
| 927 if (!_closedWrite && |
| 928 _writeEventsEnabled && |
| 929 _pauseCount == 0 && |
| 930 _secureFilter != null && |
| 931 _secureFilter.buffers[WRITE_PLAINTEXT].free > 0) { |
| 932 _writeEventsEnabled = false; |
| 933 _controller.add(RawSocketEvent.WRITE); |
| 934 } |
| 935 } |
| 936 |
| 937 Future<_FilterStatus> _pushAllFilterStages() { |
| 938 if (_filterService == null) { |
| 939 _filterService = _SecureFilter._newServicePort(); |
| 940 } |
| 941 List args = [_filterPointer, _status != CONNECTED]; |
| 942 var bufs = _secureFilter.buffers; |
| 943 for (var i = 0; i < NUM_BUFFERS; ++i) { |
| 944 args.add(bufs[i].start); |
| 945 args.add(bufs[i].end); |
| 946 } |
| 947 |
| 948 return _filterService.call(args).then((response) { |
| 949 bool wasInHandshake = response[1]; |
| 950 int start(int index) => response[2 * index + 2]; |
| 951 int end(int index) => response[2 * index + 3]; |
| 952 |
| 953 _FilterStatus status = new _FilterStatus(); |
| 954 // Compute writeEmpty as "write plaintext buffer and write encrypted |
| 955 // buffer were empty when we started and are empty now". |
| 956 status.writeEmpty = bufs[WRITE_PLAINTEXT].isEmpty && |
| 957 start(WRITE_ENCRYPTED) == end(WRITE_ENCRYPTED); |
| 958 // If we were in handshake when this started, _writeEmpty may be false |
| 959 // because the handshake wrote data after we checked. |
| 960 if (wasInHandshake) status.writeEmpty = false; |
| 961 |
| 962 // Compute readEmpty as "both read buffers were empty when we started |
| 963 // and are empty now". |
| 964 status.readEmpty = bufs[READ_ENCRYPTED].isEmpty && |
| 965 start(READ_PLAINTEXT) == end(READ_PLAINTEXT); |
| 966 |
| 967 _ExternalBuffer buffer = bufs[WRITE_PLAINTEXT]; |
| 968 int new_start = start(WRITE_PLAINTEXT); |
| 969 if (new_start != buffer.start) { |
| 970 status.progress = true; |
| 971 if (buffer.free == 0) { |
| 972 status.writePlaintextNoLongerFull = true; |
| 973 } |
| 974 buffer.start = new_start; |
| 975 } |
| 976 buffer = bufs[READ_ENCRYPTED]; |
| 977 new_start = start(READ_ENCRYPTED); |
| 978 if (new_start != buffer.start) { |
| 979 status.progress = true; |
| 980 if (buffer.free == 0) { |
| 981 status.readEncryptedNoLongerFull = true; |
| 982 } |
| 983 buffer.start = new_start; |
| 984 } |
| 985 buffer = bufs[WRITE_ENCRYPTED]; |
| 986 int new_end = end(WRITE_ENCRYPTED); |
| 987 if (new_end != buffer.end) { |
| 988 status.progress = true; |
| 989 if (buffer.length == 0) { |
| 990 status.writeEncryptedNoLongerEmpty = true; |
| 991 } |
| 992 buffer.end = new_end; |
| 993 } |
| 994 buffer = bufs[READ_PLAINTEXT]; |
| 995 new_end = end(READ_PLAINTEXT); |
| 996 if (new_end != buffer.end) { |
| 997 status.progress = true; |
| 998 if (buffer.length == 0) { |
| 999 status.readPlaintextNoLongerEmpty = true; |
| 1000 } |
| 1001 buffer.end = new_end; |
| 1002 } |
| 1003 return status; |
| 1004 }); |
| 1005 } |
| 1006 } |
| 1007 |
| 1008 |
| 1009 /** |
| 1010 * A circular buffer backed by an external byte array. Accessed from |
| 1011 * both C++ and Dart code in an unsynchronized way, with one reading |
| 1012 * and one writing. All updates to start and end are done by Dart code. |
| 1013 */ |
| 1014 class _ExternalBuffer { |
| 1015 _ExternalBuffer(this.size) { |
| 1016 start = size~/2; |
| 1017 end = size~/2; |
| 1018 } |
| 1019 |
| 1020 void advanceStart(int bytes) { |
| 1021 assert(start > end || start + bytes <= end); |
| 1022 start += bytes; |
| 1023 if (start >= size) { |
| 1024 start -= size; |
| 1025 assert(start <= end); |
| 1026 assert(start < size); |
| 1027 } |
| 1028 } |
| 1029 |
| 1030 void advanceEnd(int bytes) { |
| 1031 assert(start <= end || start > end + bytes); |
| 1032 end += bytes; |
| 1033 if (end >= size) { |
| 1034 end -= size; |
| 1035 assert(end < start); |
| 1036 assert(end < size); |
| 1037 } |
| 1038 } |
| 1039 |
| 1040 bool get isEmpty => end == start; |
| 1041 |
| 1042 int get length { |
| 1043 if (start > end) return size + end - start; |
| 1044 return end - start; |
| 1045 } |
| 1046 |
| 1047 int get linearLength { |
| 1048 if (start > end) return size - start; |
| 1049 return end - start; |
| 1050 } |
| 1051 |
| 1052 int get free { |
| 1053 if (start > end) return start - end - 1; |
| 1054 return size + start - end - 1; |
| 1055 } |
| 1056 |
| 1057 int get linearFree { |
| 1058 if (start > end) return start - end - 1; |
| 1059 if (start == 0) return size - end - 1; |
| 1060 return size - end; |
| 1061 } |
| 1062 |
| 1063 List<int> read(int bytes) { |
| 1064 if (bytes == null) { |
| 1065 bytes = length; |
| 1066 } else { |
| 1067 bytes = min(bytes, length); |
| 1068 } |
| 1069 if (bytes == 0) return null; |
| 1070 List<int> result = new Uint8List(bytes); |
| 1071 int bytesRead = 0; |
| 1072 // Loop over zero, one, or two linear data ranges. |
| 1073 while (bytesRead < bytes) { |
| 1074 int toRead = linearLength; |
| 1075 result.setRange(bytesRead, |
| 1076 bytesRead + toRead, |
| 1077 data, |
| 1078 start); |
| 1079 advanceStart(toRead); |
| 1080 bytesRead += toRead; |
| 1081 } |
| 1082 return result; |
| 1083 } |
| 1084 |
| 1085 int write(List<int> inputData, int offset, int bytes) { |
| 1086 if (bytes > free) { |
| 1087 bytes = free; |
| 1088 } |
| 1089 int written = 0; |
| 1090 int toWrite = min(bytes, linearFree); |
| 1091 // Loop over zero, one, or two linear data ranges. |
| 1092 while (toWrite > 0) { |
| 1093 data.setRange(end, end + toWrite, inputData, offset); |
| 1094 advanceEnd(toWrite); |
| 1095 offset += toWrite; |
| 1096 written += toWrite; |
| 1097 toWrite = min(bytes - written, linearFree); |
| 1098 } |
| 1099 return written; |
| 1100 } |
| 1101 |
| 1102 int writeFromSource(List<int> getData(int requested)) { |
| 1103 int written = 0; |
| 1104 int toWrite = linearFree; |
| 1105 // Loop over zero, one, or two linear data ranges. |
| 1106 while (toWrite > 0) { |
| 1107 // Source returns at most toWrite bytes, and it returns null when empty. |
| 1108 var inputData = getData(toWrite); |
| 1109 if (inputData == null) break; |
| 1110 var len = inputData.length; |
| 1111 data.setRange(end, end + len, inputData); |
| 1112 advanceEnd(len); |
| 1113 written += len; |
| 1114 toWrite = linearFree; |
| 1115 } |
| 1116 return written; |
| 1117 } |
| 1118 |
| 1119 bool readToSocket(RawSocket socket) { |
| 1120 // Loop over zero, one, or two linear data ranges. |
926 while (true) { | 1121 while (true) { |
927 if (encrypted.length > 0) { | 1122 var toWrite = linearLength; |
928 // Write from the filter to the socket. | 1123 if (toWrite == 0) return false; |
929 int bytes = _socket.write(encrypted.data, | 1124 int bytes = socket.write(data, start, toWrite); |
930 encrypted.start, | 1125 advanceStart(bytes); |
931 encrypted.length); | 1126 if (bytes < toWrite) { |
932 encrypted.advanceStart(bytes); | 1127 // The socket has blocked while we have data to write. |
933 if (encrypted.length > 0) { | 1128 return true; |
934 // The socket has blocked while we have data to write. | 1129 } |
935 // We must be notified when it becomes unblocked. | 1130 } |
936 _socket.writeEventsEnabled = true; | 1131 } |
937 _filterWriteEmpty = false; | |
938 break; | |
939 } | |
940 } else { | |
941 var plaintext = _secureFilter.buffers[WRITE_PLAINTEXT]; | |
942 if (plaintext.length > 0) { | |
943 int plaintext_bytes = _secureFilter.processBuffer(WRITE_PLAINTEXT); | |
944 plaintext.advanceStart(plaintext_bytes); | |
945 } | |
946 int bytes = _secureFilter.processBuffer(WRITE_ENCRYPTED); | |
947 if (bytes <= 0) { | |
948 // We know the WRITE_ENCRYPTED buffer is empty, and the | |
949 // filter wrote zero bytes to it, so the filter must be empty. | |
950 // Also, the WRITE_PLAINTEXT buffer must have been empty, or | |
951 // it would have written to the filter. | |
952 // TODO(whesse): Verify that the filter works this way. | |
953 _filterWriteEmpty = true; | |
954 break; | |
955 } | |
956 encrypted.length += bytes; | |
957 } | |
958 } | |
959 } | |
960 } | |
961 | |
962 | |
963 class _ExternalBuffer { | |
964 // Performance is improved if a full buffer of plaintext fits | |
965 // in the encrypted buffer, when encrypted. | |
966 static final int SIZE = 8 * 1024; | |
967 static final int ENCRYPTED_SIZE = 10 * 1024; | |
968 _ExternalBuffer() : start = 0, length = 0; | |
969 | |
970 // TODO(whesse): Consider making this a circular buffer. Only if it helps. | |
971 void advanceStart(int numBytes) { | |
972 start += numBytes; | |
973 length -= numBytes; | |
974 if (length == 0) { | |
975 start = 0; | |
976 } | |
977 } | |
978 | |
979 int get free => data.length - (start + length); | |
980 | 1132 |
981 List data; // This will be a ExternalByteArray, backed by C allocated data. | 1133 List data; // This will be a ExternalByteArray, backed by C allocated data. |
982 int start; | 1134 int start; |
983 int length; | 1135 int end; |
| 1136 final size; |
984 } | 1137 } |
985 | 1138 |
986 | 1139 |
987 abstract class _SecureFilter { | 1140 abstract class _SecureFilter { |
988 external factory _SecureFilter(); | 1141 external factory _SecureFilter(); |
989 | 1142 |
| 1143 external static SendPort _newServicePort(); |
| 1144 |
990 void connect(String hostName, | 1145 void connect(String hostName, |
991 Uint8List addr, | 1146 Uint8List addr, |
992 int port, | 1147 int port, |
993 bool is_server, | 1148 bool is_server, |
994 String certificateName, | 1149 String certificateName, |
995 bool requestClientCertificate, | 1150 bool requestClientCertificate, |
996 bool requireClientCertificate, | 1151 bool requireClientCertificate, |
997 bool sendClientCertificate); | 1152 bool sendClientCertificate); |
998 void destroy(); | 1153 void destroy(); |
999 void handshake(); | 1154 void handshake(); |
1000 void init(); | 1155 void init(); |
1001 X509Certificate get peerCertificate; | 1156 X509Certificate get peerCertificate; |
1002 int processBuffer(int bufferIndex); | 1157 int processBuffer(int bufferIndex); |
1003 void registerBadCertificateCallback(Function callback); | 1158 void registerBadCertificateCallback(Function callback); |
1004 void registerHandshakeCompleteCallback(Function handshakeCompleteHandler); | 1159 void registerHandshakeCompleteCallback(Function handshakeCompleteHandler); |
| 1160 int _pointer(); |
1005 | 1161 |
1006 List<_ExternalBuffer> get buffers; | 1162 List<_ExternalBuffer> get buffers; |
1007 } | 1163 } |
OLD | NEW |