OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; | 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; |
8 | 8 |
9 class _WebSocketMessageType { | 9 class _WebSocketMessageType { |
10 static const int NONE = 0; | 10 static const int NONE = 0; |
(...skipping 554 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
565 } | 565 } |
566 } | 566 } |
567 | 567 |
568 | 568 |
569 class _WebSocketConsumer implements StreamConsumer { | 569 class _WebSocketConsumer implements StreamConsumer { |
570 final _WebSocketImpl webSocket; | 570 final _WebSocketImpl webSocket; |
571 final Socket socket; | 571 final Socket socket; |
572 StreamController _controller; | 572 StreamController _controller; |
573 StreamSubscription _subscription; | 573 StreamSubscription _subscription; |
574 bool _issuedPause = false; | 574 bool _issuedPause = false; |
575 // Only report error if the last message was a user-provided message and not a | 575 bool _closed = false; |
576 // ping or pong message. | |
577 bool _reportError = false; | |
578 Completer _closeCompleter = new Completer(); | 576 Completer _closeCompleter = new Completer(); |
579 Completer _completer; | 577 Completer _completer; |
580 | 578 |
581 _WebSocketConsumer(_WebSocketImpl this.webSocket, Socket this.socket); | 579 _WebSocketConsumer(_WebSocketImpl this.webSocket, Socket this.socket); |
582 | 580 |
583 void _onListen() { | 581 void _onListen() { |
584 if (_subscription != null) { | 582 if (_subscription != null) { |
585 _subscription.cancel(); | 583 _subscription.cancel(); |
586 } | 584 } |
587 } | 585 } |
588 | 586 |
589 void _onPause() { | 587 void _onPause() { |
590 if (_subscription != null) { | 588 if (_subscription != null) { |
591 _subscription.pause(); | 589 _subscription.pause(); |
592 } else { | 590 } else { |
593 _issuedPause = true; | 591 _issuedPause = true; |
594 } | 592 } |
595 } | 593 } |
596 | 594 |
597 void _onResume() { | 595 void _onResume() { |
598 if (_subscription != null) { | 596 if (_subscription != null) { |
599 _subscription.resume(); | 597 _subscription.resume(); |
600 } else { | 598 } else { |
601 _issuedPause = false; | 599 _issuedPause = false; |
602 } | 600 } |
603 } | 601 } |
604 | 602 |
| 603 void _cancel() { |
| 604 if (_subscription != null) { |
| 605 var subscription = _subscription; |
| 606 _subscription = null; |
| 607 subscription.cancel(); |
| 608 } |
| 609 } |
| 610 |
605 _ensureController() { | 611 _ensureController() { |
606 if (_controller != null) return; | 612 if (_controller != null) return; |
607 _controller = new StreamController(sync: true, | 613 _controller = new StreamController(sync: true, |
608 onPause: _onPause, | 614 onPause: _onPause, |
609 onResume: _onResume, | 615 onResume: _onResume, |
610 onCancel: _onListen); | 616 onCancel: _onListen); |
611 var stream = _controller.stream.transform( | 617 var stream = _controller.stream.transform( |
612 new _WebSocketOutgoingTransformer(webSocket)); | 618 new _WebSocketOutgoingTransformer(webSocket)); |
613 socket.addStream(stream) | 619 socket.addStream(stream) |
614 .then((_) { | 620 .then((_) { |
615 _done(); | 621 _done(); |
616 _closeCompleter.complete(webSocket); | 622 _closeCompleter.complete(webSocket); |
617 }, | 623 }, onError: (error) { |
618 onError: (error) { | 624 _closed = true; |
619 if (_reportError) { | 625 _cancel(); |
620 if (!_done(error)) { | 626 if (error is ArgumentError) { |
621 _closeCompleter.completeError(error); | 627 if (!_done(error)) { |
622 } | 628 _closeCompleter.completeError(error); |
623 } else { | 629 } |
624 _done(); | 630 } else { |
625 _closeCompleter.complete(webSocket); | 631 _done(); |
626 } | 632 _closeCompleter.complete(webSocket); |
627 }); | 633 } |
| 634 }); |
628 } | 635 } |
629 | 636 |
630 bool _done([error]) { | 637 bool _done([error]) { |
631 if (_completer == null) return false; | 638 if (_completer == null) return false; |
632 if (error != null) { | 639 if (error != null) { |
633 _completer.completeError(error); | 640 _completer.completeError(error); |
634 } else { | 641 } else { |
635 _completer.complete(webSocket); | 642 _completer.complete(webSocket); |
636 } | 643 } |
637 _completer = null; | 644 _completer = null; |
638 return true; | 645 return true; |
639 } | 646 } |
640 | 647 |
641 Future addStream(var stream) { | 648 Future addStream(var stream) { |
| 649 if (_closed) { |
| 650 stream.listen(null).cancel(); |
| 651 return new Future.value(webSocket); |
| 652 } |
642 _ensureController(); | 653 _ensureController(); |
643 _completer = new Completer(); | 654 _completer = new Completer(); |
644 _subscription = stream.listen( | 655 _subscription = stream.listen( |
645 (data) { | 656 (data) { |
646 _reportError = true; | |
647 _controller.add(data); | 657 _controller.add(data); |
648 }, | 658 }, |
649 onDone: () { | 659 onDone: () { |
650 _done(); | 660 _done(); |
651 }, | 661 }, |
652 onError: (error) { | 662 onError: (error) { |
653 _done(error); | 663 _done(error); |
654 }, | 664 }, |
655 cancelOnError: true); | 665 cancelOnError: true); |
656 if (_issuedPause) { | 666 if (_issuedPause) { |
657 _subscription.pause(); | 667 _subscription.pause(); |
658 _issuedPause = false; | 668 _issuedPause = false; |
659 } | 669 } |
660 return _completer.future; | 670 return _completer.future; |
661 } | 671 } |
662 | 672 |
663 Future close() { | 673 Future close() { |
664 _ensureController(); | 674 _ensureController(); |
665 Future closeSocket() { | 675 Future closeSocket() { |
666 return socket.close().then((_) => webSocket); | 676 return socket.close().catchError((_) {}).then((_) => webSocket); |
667 } | 677 } |
668 _controller.close(); | 678 _controller.close(); |
669 return _closeCompleter.future.then((_) => closeSocket()); | 679 return _closeCompleter.future.then((_) => closeSocket()); |
670 } | 680 } |
671 | 681 |
672 void add(data) { | 682 void add(data) { |
| 683 if (_closed) return; |
673 _ensureController(); | 684 _ensureController(); |
674 _reportError = false; | |
675 _controller.add(data); | 685 _controller.add(data); |
676 } | 686 } |
| 687 |
| 688 void closeSocket() { |
| 689 _closed = true; |
| 690 _cancel(); |
| 691 close(); |
| 692 } |
677 } | 693 } |
678 | 694 |
679 | 695 |
680 class _WebSocketImpl extends Stream implements WebSocket { | 696 class _WebSocketImpl extends Stream implements WebSocket { |
681 StreamController _controller; | 697 StreamController _controller; |
682 StreamSubscription _subscription; | 698 StreamSubscription _subscription; |
683 StreamSink _sink; | 699 StreamSink _sink; |
684 | 700 |
685 final Socket _socket; | 701 final Socket _socket; |
686 final bool _serverSide; | 702 final bool _serverSide; |
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
779 if (data is _WebSocketPing) { | 795 if (data is _WebSocketPing) { |
780 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); | 796 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); |
781 } else if (data is _WebSocketPong) { | 797 } else if (data is _WebSocketPong) { |
782 // Simply set pingInterval, as it'll cancel any timers. | 798 // Simply set pingInterval, as it'll cancel any timers. |
783 pingInterval = _pingInterval; | 799 pingInterval = _pingInterval; |
784 } else { | 800 } else { |
785 _controller.add(data); | 801 _controller.add(data); |
786 } | 802 } |
787 }, | 803 }, |
788 onError: (error) { | 804 onError: (error) { |
789 if (error is ArgumentError) { | 805 if (error is FormatException) { |
790 close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); | 806 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); |
791 } else { | 807 } else { |
792 close(WebSocketStatus.PROTOCOL_ERROR); | 808 _close(WebSocketStatus.PROTOCOL_ERROR); |
793 } | 809 } |
794 _controller.addError(error); | |
795 _controller.close(); | 810 _controller.close(); |
796 }, | 811 }, |
797 onDone: () { | 812 onDone: () { |
798 if (_readyState == WebSocket.OPEN) { | 813 if (_readyState == WebSocket.OPEN) { |
799 _readyState = WebSocket.CLOSING; | 814 _readyState = WebSocket.CLOSING; |
800 if (!_isReservedStatusCode(transformer.closeCode)) { | 815 if (!_isReservedStatusCode(transformer.closeCode)) { |
801 close(transformer.closeCode); | 816 _close(transformer.closeCode); |
802 } else { | 817 } else { |
803 close(); | 818 _close(); |
804 } | 819 } |
805 _readyState = WebSocket.CLOSED; | 820 _readyState = WebSocket.CLOSED; |
806 } | 821 } |
807 _closeCode = transformer.closeCode; | 822 _closeCode = transformer.closeCode; |
808 _closeReason = transformer.closeReason; | 823 _closeReason = transformer.closeReason; |
809 _controller.close(); | 824 _controller.close(); |
810 }, | 825 }, |
811 cancelOnError: true); | 826 cancelOnError: true); |
812 _subscription.pause(); | 827 _subscription.pause(); |
813 _controller = new StreamController(sync: true, | 828 _controller = new StreamController(sync: true, |
(...skipping 19 matching lines...) Expand all Loading... |
833 if (_pingTimer != null) _pingTimer.cancel(); | 848 if (_pingTimer != null) _pingTimer.cancel(); |
834 _pingInterval = interval; | 849 _pingInterval = interval; |
835 | 850 |
836 if (_pingInterval == null) return; | 851 if (_pingInterval == null) return; |
837 | 852 |
838 _pingTimer = new Timer(_pingInterval, () { | 853 _pingTimer = new Timer(_pingInterval, () { |
839 if (_writeClosed) return; | 854 if (_writeClosed) return; |
840 _consumer.add(new _WebSocketPing()); | 855 _consumer.add(new _WebSocketPing()); |
841 _pingTimer = new Timer(_pingInterval, () { | 856 _pingTimer = new Timer(_pingInterval, () { |
842 // No pong received. | 857 // No pong received. |
843 close(WebSocketStatus.GOING_AWAY); | 858 _close(WebSocketStatus.GOING_AWAY); |
844 }); | 859 }); |
845 }); | 860 }); |
846 } | 861 } |
847 | 862 |
848 int get readyState => _readyState; | 863 int get readyState => _readyState; |
849 | 864 |
850 String get extensions => null; | 865 String get extensions => null; |
851 String get protocol => null; | 866 String get protocol => null; |
852 int get closeCode => _closeCode; | 867 int get closeCode => _closeCode; |
853 String get closeReason => _closeReason; | 868 String get closeReason => _closeReason; |
854 | 869 |
855 void add(data) => _sink.add(data); | 870 void add(data) => _sink.add(data); |
856 void addError(error) => _sink.addError(error); | 871 void addError(error) => _sink.addError(error); |
857 Future addStream(Stream stream) => _sink.addStream(stream); | 872 Future addStream(Stream stream) => _sink.addStream(stream); |
858 Future get done => _sink.done; | 873 Future get done => _sink.done; |
859 | 874 |
860 Future close([int code, String reason]) { | 875 Future close([int code, String reason]) { |
861 if (!_writeClosed) { | 876 if (_isReservedStatusCode(code)) { |
862 if (_isReservedStatusCode(code)) { | 877 throw new WebSocketException("Reserved status code $code"); |
863 throw new WebSocketException("Reserved status code $code"); | 878 } |
864 } | 879 if (_outCloseCode == null) { |
865 _outCloseCode = code; | 880 _outCloseCode = code; |
866 _outCloseReason = reason; | 881 _outCloseReason = reason; |
867 _writeClosed = true; | |
868 } | 882 } |
869 if (!(_sink as _StreamSinkImpl)._isBound) _sink.close(); | 883 return _sink.close(); |
870 return _sink.done; | 884 } |
| 885 |
| 886 void _close([int code, String reason]) { |
| 887 if (_writeClosed) return; |
| 888 if (_outCloseCode == null) { |
| 889 _outCloseCode = code; |
| 890 _outCloseReason = reason; |
| 891 } |
| 892 _writeClosed = true; |
| 893 _consumer.closeSocket(); |
871 } | 894 } |
872 | 895 |
873 static bool _isReservedStatusCode(int code) { | 896 static bool _isReservedStatusCode(int code) { |
874 return code != null && | 897 return code != null && |
875 (code < WebSocketStatus.NORMAL_CLOSURE || | 898 (code < WebSocketStatus.NORMAL_CLOSURE || |
876 code == WebSocketStatus.RESERVED_1004 || | 899 code == WebSocketStatus.RESERVED_1004 || |
877 code == WebSocketStatus.NO_STATUS_RECEIVED || | 900 code == WebSocketStatus.NO_STATUS_RECEIVED || |
878 code == WebSocketStatus.ABNORMAL_CLOSURE || | 901 code == WebSocketStatus.ABNORMAL_CLOSURE || |
879 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && | 902 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && |
880 code < WebSocketStatus.RESERVED_1015) || | 903 code < WebSocketStatus.RESERVED_1015) || |
881 (code >= WebSocketStatus.RESERVED_1015 && | 904 (code >= WebSocketStatus.RESERVED_1015 && |
882 code < 3000)); | 905 code < 3000)); |
883 } | 906 } |
884 } | 907 } |
OLD | NEW |