| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.io; | 5 part of dart.io; |
| 6 | 6 |
| 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; | 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; |
| 8 | 8 |
| 9 class _WebSocketMessageType { | 9 class _WebSocketMessageType { |
| 10 static const int NONE = 0; | 10 static const int NONE = 0; |
| (...skipping 554 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 565 } | 565 } |
| 566 } | 566 } |
| 567 | 567 |
| 568 | 568 |
| 569 class _WebSocketConsumer implements StreamConsumer { | 569 class _WebSocketConsumer implements StreamConsumer { |
| 570 final _WebSocketImpl webSocket; | 570 final _WebSocketImpl webSocket; |
| 571 final Socket socket; | 571 final Socket socket; |
| 572 StreamController _controller; | 572 StreamController _controller; |
| 573 StreamSubscription _subscription; | 573 StreamSubscription _subscription; |
| 574 bool _issuedPause = false; | 574 bool _issuedPause = false; |
| 575 // Only report error if the last message was a user-provided message and not a | 575 bool _closed = false; |
| 576 // ping or pong message. | |
| 577 bool _reportError = false; | |
| 578 Completer _closeCompleter = new Completer(); | 576 Completer _closeCompleter = new Completer(); |
| 579 Completer _completer; | 577 Completer _completer; |
| 580 | 578 |
| 581 _WebSocketConsumer(_WebSocketImpl this.webSocket, Socket this.socket); | 579 _WebSocketConsumer(_WebSocketImpl this.webSocket, Socket this.socket); |
| 582 | 580 |
| 583 void _onListen() { | 581 void _onListen() { |
| 584 if (_subscription != null) { | 582 if (_subscription != null) { |
| 585 _subscription.cancel(); | 583 _subscription.cancel(); |
| 586 } | 584 } |
| 587 } | 585 } |
| 588 | 586 |
| 589 void _onPause() { | 587 void _onPause() { |
| 590 if (_subscription != null) { | 588 if (_subscription != null) { |
| 591 _subscription.pause(); | 589 _subscription.pause(); |
| 592 } else { | 590 } else { |
| 593 _issuedPause = true; | 591 _issuedPause = true; |
| 594 } | 592 } |
| 595 } | 593 } |
| 596 | 594 |
| 597 void _onResume() { | 595 void _onResume() { |
| 598 if (_subscription != null) { | 596 if (_subscription != null) { |
| 599 _subscription.resume(); | 597 _subscription.resume(); |
| 600 } else { | 598 } else { |
| 601 _issuedPause = false; | 599 _issuedPause = false; |
| 602 } | 600 } |
| 603 } | 601 } |
| 604 | 602 |
| 603 void _cancel() { |
| 604 if (_subscription != null) { |
| 605 var subscription = _subscription; |
| 606 _subscription = null; |
| 607 subscription.cancel(); |
| 608 } |
| 609 } |
| 610 |
| 605 _ensureController() { | 611 _ensureController() { |
| 606 if (_controller != null) return; | 612 if (_controller != null) return; |
| 607 _controller = new StreamController(sync: true, | 613 _controller = new StreamController(sync: true, |
| 608 onPause: _onPause, | 614 onPause: _onPause, |
| 609 onResume: _onResume, | 615 onResume: _onResume, |
| 610 onCancel: _onListen); | 616 onCancel: _onListen); |
| 611 var stream = _controller.stream.transform( | 617 var stream = _controller.stream.transform( |
| 612 new _WebSocketOutgoingTransformer(webSocket)); | 618 new _WebSocketOutgoingTransformer(webSocket)); |
| 613 socket.addStream(stream) | 619 socket.addStream(stream) |
| 614 .then((_) { | 620 .then((_) { |
| 615 _done(); | 621 _done(); |
| 616 _closeCompleter.complete(webSocket); | 622 _closeCompleter.complete(webSocket); |
| 617 }, | 623 }, onError: (error) { |
| 618 onError: (error) { | 624 _closed = true; |
| 619 if (_reportError) { | 625 _cancel(); |
| 620 if (!_done(error)) { | 626 if (error is ArgumentError) { |
| 621 _closeCompleter.completeError(error); | 627 if (!_done(error)) { |
| 622 } | 628 _closeCompleter.completeError(error); |
| 623 } else { | 629 } |
| 624 _done(); | 630 } else { |
| 625 _closeCompleter.complete(webSocket); | 631 _done(); |
| 626 } | 632 _closeCompleter.complete(webSocket); |
| 627 }); | 633 } |
| 634 }); |
| 628 } | 635 } |
| 629 | 636 |
| 630 bool _done([error]) { | 637 bool _done([error]) { |
| 631 if (_completer == null) return false; | 638 if (_completer == null) return false; |
| 632 if (error != null) { | 639 if (error != null) { |
| 633 _completer.completeError(error); | 640 _completer.completeError(error); |
| 634 } else { | 641 } else { |
| 635 _completer.complete(webSocket); | 642 _completer.complete(webSocket); |
| 636 } | 643 } |
| 637 _completer = null; | 644 _completer = null; |
| 638 return true; | 645 return true; |
| 639 } | 646 } |
| 640 | 647 |
| 641 Future addStream(var stream) { | 648 Future addStream(var stream) { |
| 649 if (_closed) { |
| 650 stream.listen(null).cancel(); |
| 651 return new Future.value(webSocket); |
| 652 } |
| 642 _ensureController(); | 653 _ensureController(); |
| 643 _completer = new Completer(); | 654 _completer = new Completer(); |
| 644 _subscription = stream.listen( | 655 _subscription = stream.listen( |
| 645 (data) { | 656 (data) { |
| 646 _reportError = true; | |
| 647 _controller.add(data); | 657 _controller.add(data); |
| 648 }, | 658 }, |
| 649 onDone: () { | 659 onDone: () { |
| 650 _done(); | 660 _done(); |
| 651 }, | 661 }, |
| 652 onError: (error) { | 662 onError: (error) { |
| 653 _done(error); | 663 _done(error); |
| 654 }, | 664 }, |
| 655 cancelOnError: true); | 665 cancelOnError: true); |
| 656 if (_issuedPause) { | 666 if (_issuedPause) { |
| 657 _subscription.pause(); | 667 _subscription.pause(); |
| 658 _issuedPause = false; | 668 _issuedPause = false; |
| 659 } | 669 } |
| 660 return _completer.future; | 670 return _completer.future; |
| 661 } | 671 } |
| 662 | 672 |
| 663 Future close() { | 673 Future close() { |
| 664 _ensureController(); | 674 _ensureController(); |
| 665 Future closeSocket() { | 675 Future closeSocket() { |
| 666 return socket.close().then((_) => webSocket); | 676 return socket.close().catchError((_) {}).then((_) => webSocket); |
| 667 } | 677 } |
| 668 _controller.close(); | 678 _controller.close(); |
| 669 return _closeCompleter.future.then((_) => closeSocket()); | 679 return _closeCompleter.future.then((_) => closeSocket()); |
| 670 } | 680 } |
| 671 | 681 |
| 672 void add(data) { | 682 void add(data) { |
| 683 if (_closed) return; |
| 673 _ensureController(); | 684 _ensureController(); |
| 674 _reportError = false; | |
| 675 _controller.add(data); | 685 _controller.add(data); |
| 676 } | 686 } |
| 687 |
| 688 void closeSocket() { |
| 689 _closed = true; |
| 690 _cancel(); |
| 691 close(); |
| 692 } |
| 677 } | 693 } |
| 678 | 694 |
| 679 | 695 |
| 680 class _WebSocketImpl extends Stream implements WebSocket { | 696 class _WebSocketImpl extends Stream implements WebSocket { |
| 681 StreamController _controller; | 697 StreamController _controller; |
| 682 StreamSubscription _subscription; | 698 StreamSubscription _subscription; |
| 683 StreamSink _sink; | 699 StreamSink _sink; |
| 684 | 700 |
| 685 final Socket _socket; | 701 final Socket _socket; |
| 686 final bool _serverSide; | 702 final bool _serverSide; |
| (...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 779 if (data is _WebSocketPing) { | 795 if (data is _WebSocketPing) { |
| 780 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); | 796 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); |
| 781 } else if (data is _WebSocketPong) { | 797 } else if (data is _WebSocketPong) { |
| 782 // Simply set pingInterval, as it'll cancel any timers. | 798 // Simply set pingInterval, as it'll cancel any timers. |
| 783 pingInterval = _pingInterval; | 799 pingInterval = _pingInterval; |
| 784 } else { | 800 } else { |
| 785 _controller.add(data); | 801 _controller.add(data); |
| 786 } | 802 } |
| 787 }, | 803 }, |
| 788 onError: (error) { | 804 onError: (error) { |
| 789 if (error is ArgumentError) { | 805 if (error is FormatException) { |
| 790 close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); | 806 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); |
| 791 } else { | 807 } else { |
| 792 close(WebSocketStatus.PROTOCOL_ERROR); | 808 _close(WebSocketStatus.PROTOCOL_ERROR); |
| 793 } | 809 } |
| 794 _controller.addError(error); | |
| 795 _controller.close(); | 810 _controller.close(); |
| 796 }, | 811 }, |
| 797 onDone: () { | 812 onDone: () { |
| 798 if (_readyState == WebSocket.OPEN) { | 813 if (_readyState == WebSocket.OPEN) { |
| 799 _readyState = WebSocket.CLOSING; | 814 _readyState = WebSocket.CLOSING; |
| 800 if (!_isReservedStatusCode(transformer.closeCode)) { | 815 if (!_isReservedStatusCode(transformer.closeCode)) { |
| 801 close(transformer.closeCode); | 816 _close(transformer.closeCode); |
| 802 } else { | 817 } else { |
| 803 close(); | 818 _close(); |
| 804 } | 819 } |
| 805 _readyState = WebSocket.CLOSED; | 820 _readyState = WebSocket.CLOSED; |
| 806 } | 821 } |
| 807 _closeCode = transformer.closeCode; | 822 _closeCode = transformer.closeCode; |
| 808 _closeReason = transformer.closeReason; | 823 _closeReason = transformer.closeReason; |
| 809 _controller.close(); | 824 _controller.close(); |
| 810 }, | 825 }, |
| 811 cancelOnError: true); | 826 cancelOnError: true); |
| 812 _subscription.pause(); | 827 _subscription.pause(); |
| 813 _controller = new StreamController(sync: true, | 828 _controller = new StreamController(sync: true, |
| (...skipping 19 matching lines...) Expand all Loading... |
| 833 if (_pingTimer != null) _pingTimer.cancel(); | 848 if (_pingTimer != null) _pingTimer.cancel(); |
| 834 _pingInterval = interval; | 849 _pingInterval = interval; |
| 835 | 850 |
| 836 if (_pingInterval == null) return; | 851 if (_pingInterval == null) return; |
| 837 | 852 |
| 838 _pingTimer = new Timer(_pingInterval, () { | 853 _pingTimer = new Timer(_pingInterval, () { |
| 839 if (_writeClosed) return; | 854 if (_writeClosed) return; |
| 840 _consumer.add(new _WebSocketPing()); | 855 _consumer.add(new _WebSocketPing()); |
| 841 _pingTimer = new Timer(_pingInterval, () { | 856 _pingTimer = new Timer(_pingInterval, () { |
| 842 // No pong received. | 857 // No pong received. |
| 843 close(WebSocketStatus.GOING_AWAY); | 858 _close(WebSocketStatus.GOING_AWAY); |
| 844 }); | 859 }); |
| 845 }); | 860 }); |
| 846 } | 861 } |
| 847 | 862 |
| 848 int get readyState => _readyState; | 863 int get readyState => _readyState; |
| 849 | 864 |
| 850 String get extensions => null; | 865 String get extensions => null; |
| 851 String get protocol => null; | 866 String get protocol => null; |
| 852 int get closeCode => _closeCode; | 867 int get closeCode => _closeCode; |
| 853 String get closeReason => _closeReason; | 868 String get closeReason => _closeReason; |
| 854 | 869 |
| 855 void add(data) => _sink.add(data); | 870 void add(data) => _sink.add(data); |
| 856 void addError(error) => _sink.addError(error); | 871 void addError(error) => _sink.addError(error); |
| 857 Future addStream(Stream stream) => _sink.addStream(stream); | 872 Future addStream(Stream stream) => _sink.addStream(stream); |
| 858 Future get done => _sink.done; | 873 Future get done => _sink.done; |
| 859 | 874 |
| 860 Future close([int code, String reason]) { | 875 Future close([int code, String reason]) { |
| 861 if (!_writeClosed) { | 876 if (_isReservedStatusCode(code)) { |
| 862 if (_isReservedStatusCode(code)) { | 877 throw new WebSocketException("Reserved status code $code"); |
| 863 throw new WebSocketException("Reserved status code $code"); | 878 } |
| 864 } | 879 if (_outCloseCode == null) { |
| 865 _outCloseCode = code; | 880 _outCloseCode = code; |
| 866 _outCloseReason = reason; | 881 _outCloseReason = reason; |
| 867 _writeClosed = true; | |
| 868 } | 882 } |
| 869 if (!(_sink as _StreamSinkImpl)._isBound) _sink.close(); | 883 return _sink.close(); |
| 870 return _sink.done; | 884 } |
| 885 |
| 886 void _close([int code, String reason]) { |
| 887 if (_writeClosed) return; |
| 888 if (_outCloseCode == null) { |
| 889 _outCloseCode = code; |
| 890 _outCloseReason = reason; |
| 891 } |
| 892 _writeClosed = true; |
| 893 _consumer.closeSocket(); |
| 871 } | 894 } |
| 872 | 895 |
| 873 static bool _isReservedStatusCode(int code) { | 896 static bool _isReservedStatusCode(int code) { |
| 874 return code != null && | 897 return code != null && |
| 875 (code < WebSocketStatus.NORMAL_CLOSURE || | 898 (code < WebSocketStatus.NORMAL_CLOSURE || |
| 876 code == WebSocketStatus.RESERVED_1004 || | 899 code == WebSocketStatus.RESERVED_1004 || |
| 877 code == WebSocketStatus.NO_STATUS_RECEIVED || | 900 code == WebSocketStatus.NO_STATUS_RECEIVED || |
| 878 code == WebSocketStatus.ABNORMAL_CLOSURE || | 901 code == WebSocketStatus.ABNORMAL_CLOSURE || |
| 879 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && | 902 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && |
| 880 code < WebSocketStatus.RESERVED_1015) || | 903 code < WebSocketStatus.RESERVED_1015) || |
| 881 (code >= WebSocketStatus.RESERVED_1015 && | 904 (code >= WebSocketStatus.RESERVED_1015 && |
| 882 code < 3000)); | 905 code < 3000)); |
| 883 } | 906 } |
| 884 } | 907 } |
| OLD | NEW |