Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(362)

Side by Side Diff: sdk/lib/io/websocket_impl.dart

Issue 25373004: Don't throw SocketExceptions in websocket. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Added test. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | tests/standalone/io/web_socket_error_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
8 8
9 class _WebSocketMessageType { 9 class _WebSocketMessageType {
10 static const int NONE = 0; 10 static const int NONE = 0;
(...skipping 554 matching lines...) Expand 10 before | Expand all | Expand 10 after
565 } 565 }
566 } 566 }
567 567
568 568
569 class _WebSocketConsumer implements StreamConsumer { 569 class _WebSocketConsumer implements StreamConsumer {
570 final _WebSocketImpl webSocket; 570 final _WebSocketImpl webSocket;
571 final Socket socket; 571 final Socket socket;
572 StreamController _controller; 572 StreamController _controller;
573 StreamSubscription _subscription; 573 StreamSubscription _subscription;
574 bool _issuedPause = false; 574 bool _issuedPause = false;
575 // Only report error if the last message was a user-provided message and not a 575 bool _closed = false;
576 // ping or pong message.
577 bool _reportError = false;
578 Completer _closeCompleter = new Completer(); 576 Completer _closeCompleter = new Completer();
579 Completer _completer; 577 Completer _completer;
580 578
581 _WebSocketConsumer(_WebSocketImpl this.webSocket, Socket this.socket); 579 _WebSocketConsumer(_WebSocketImpl this.webSocket, Socket this.socket);
582 580
583 void _onListen() { 581 void _onListen() {
584 if (_subscription != null) { 582 if (_subscription != null) {
585 _subscription.cancel(); 583 _subscription.cancel();
586 } 584 }
587 } 585 }
588 586
589 void _onPause() { 587 void _onPause() {
590 if (_subscription != null) { 588 if (_subscription != null) {
591 _subscription.pause(); 589 _subscription.pause();
592 } else { 590 } else {
593 _issuedPause = true; 591 _issuedPause = true;
594 } 592 }
595 } 593 }
596 594
597 void _onResume() { 595 void _onResume() {
598 if (_subscription != null) { 596 if (_subscription != null) {
599 _subscription.resume(); 597 _subscription.resume();
600 } else { 598 } else {
601 _issuedPause = false; 599 _issuedPause = false;
602 } 600 }
603 } 601 }
604 602
603 void _cancel() {
604 if (_subscription != null) {
605 var subscription = _subscription;
606 _subscription = null;
607 subscription.cancel();
608 }
609 }
610
605 _ensureController() { 611 _ensureController() {
606 if (_controller != null) return; 612 if (_controller != null) return;
607 _controller = new StreamController(sync: true, 613 _controller = new StreamController(sync: true,
608 onPause: _onPause, 614 onPause: _onPause,
609 onResume: _onResume, 615 onResume: _onResume,
610 onCancel: _onListen); 616 onCancel: _onListen);
611 var stream = _controller.stream.transform( 617 var stream = _controller.stream.transform(
612 new _WebSocketOutgoingTransformer(webSocket)); 618 new _WebSocketOutgoingTransformer(webSocket));
613 socket.addStream(stream) 619 socket.addStream(stream)
614 .then((_) { 620 .then((_) {
615 _done(); 621 _done();
616 _closeCompleter.complete(webSocket); 622 _closeCompleter.complete(webSocket);
617 }, 623 }, onError: (error) {
618 onError: (error) { 624 _closed = true;
619 if (_reportError) { 625 _cancel();
620 if (!_done(error)) { 626 if (error is ArgumentError) {
621 _closeCompleter.completeError(error); 627 if (!_done(error)) {
622 } 628 _closeCompleter.completeError(error);
623 } else { 629 }
624 _done(); 630 } else {
625 _closeCompleter.complete(webSocket); 631 _done();
626 } 632 _closeCompleter.complete(webSocket);
627 }); 633 }
634 });
628 } 635 }
629 636
630 bool _done([error]) { 637 bool _done([error]) {
631 if (_completer == null) return false; 638 if (_completer == null) return false;
632 if (error != null) { 639 if (error != null) {
633 _completer.completeError(error); 640 _completer.completeError(error);
634 } else { 641 } else {
635 _completer.complete(webSocket); 642 _completer.complete(webSocket);
636 } 643 }
637 _completer = null; 644 _completer = null;
638 return true; 645 return true;
639 } 646 }
640 647
641 Future addStream(var stream) { 648 Future addStream(var stream) {
649 if (_closed) {
650 stream.listen(null).cancel();
651 return new Future.value(webSocket);
652 }
642 _ensureController(); 653 _ensureController();
643 _completer = new Completer(); 654 _completer = new Completer();
644 _subscription = stream.listen( 655 _subscription = stream.listen(
645 (data) { 656 (data) {
646 _reportError = true;
647 _controller.add(data); 657 _controller.add(data);
648 }, 658 },
649 onDone: () { 659 onDone: () {
650 _done(); 660 _done();
651 }, 661 },
652 onError: (error) { 662 onError: (error) {
653 _done(error); 663 _done(error);
654 }, 664 },
655 cancelOnError: true); 665 cancelOnError: true);
656 if (_issuedPause) { 666 if (_issuedPause) {
657 _subscription.pause(); 667 _subscription.pause();
658 _issuedPause = false; 668 _issuedPause = false;
659 } 669 }
660 return _completer.future; 670 return _completer.future;
661 } 671 }
662 672
663 Future close() { 673 Future close() {
664 _ensureController(); 674 _ensureController();
665 Future closeSocket() { 675 Future closeSocket() {
666 return socket.close().then((_) => webSocket); 676 return socket.close().catchError((_) {}).then((_) => webSocket);
667 } 677 }
668 _controller.close(); 678 _controller.close();
669 return _closeCompleter.future.then((_) => closeSocket()); 679 return _closeCompleter.future.then((_) => closeSocket());
670 } 680 }
671 681
672 void add(data) { 682 void add(data) {
683 if (_closed) return;
673 _ensureController(); 684 _ensureController();
674 _reportError = false;
675 _controller.add(data); 685 _controller.add(data);
676 } 686 }
687
688 void closeSocket() {
689 _closed = true;
690 _cancel();
691 close();
692 }
677 } 693 }
678 694
679 695
680 class _WebSocketImpl extends Stream implements WebSocket { 696 class _WebSocketImpl extends Stream implements WebSocket {
681 StreamController _controller; 697 StreamController _controller;
682 StreamSubscription _subscription; 698 StreamSubscription _subscription;
683 StreamSink _sink; 699 StreamSink _sink;
684 700
685 final Socket _socket; 701 final Socket _socket;
686 final bool _serverSide; 702 final bool _serverSide;
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after
779 if (data is _WebSocketPing) { 795 if (data is _WebSocketPing) {
780 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); 796 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
781 } else if (data is _WebSocketPong) { 797 } else if (data is _WebSocketPong) {
782 // Simply set pingInterval, as it'll cancel any timers. 798 // Simply set pingInterval, as it'll cancel any timers.
783 pingInterval = _pingInterval; 799 pingInterval = _pingInterval;
784 } else { 800 } else {
785 _controller.add(data); 801 _controller.add(data);
786 } 802 }
787 }, 803 },
788 onError: (error) { 804 onError: (error) {
789 if (error is ArgumentError) { 805 if (error is FormatException) {
790 close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); 806 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
791 } else { 807 } else {
792 close(WebSocketStatus.PROTOCOL_ERROR); 808 _close(WebSocketStatus.PROTOCOL_ERROR);
793 } 809 }
794 _controller.addError(error);
795 _controller.close(); 810 _controller.close();
796 }, 811 },
797 onDone: () { 812 onDone: () {
798 if (_readyState == WebSocket.OPEN) { 813 if (_readyState == WebSocket.OPEN) {
799 _readyState = WebSocket.CLOSING; 814 _readyState = WebSocket.CLOSING;
800 if (!_isReservedStatusCode(transformer.closeCode)) { 815 if (!_isReservedStatusCode(transformer.closeCode)) {
801 close(transformer.closeCode); 816 _close(transformer.closeCode);
802 } else { 817 } else {
803 close(); 818 _close();
804 } 819 }
805 _readyState = WebSocket.CLOSED; 820 _readyState = WebSocket.CLOSED;
806 } 821 }
807 _closeCode = transformer.closeCode; 822 _closeCode = transformer.closeCode;
808 _closeReason = transformer.closeReason; 823 _closeReason = transformer.closeReason;
809 _controller.close(); 824 _controller.close();
810 }, 825 },
811 cancelOnError: true); 826 cancelOnError: true);
812 _subscription.pause(); 827 _subscription.pause();
813 _controller = new StreamController(sync: true, 828 _controller = new StreamController(sync: true,
(...skipping 19 matching lines...) Expand all
833 if (_pingTimer != null) _pingTimer.cancel(); 848 if (_pingTimer != null) _pingTimer.cancel();
834 _pingInterval = interval; 849 _pingInterval = interval;
835 850
836 if (_pingInterval == null) return; 851 if (_pingInterval == null) return;
837 852
838 _pingTimer = new Timer(_pingInterval, () { 853 _pingTimer = new Timer(_pingInterval, () {
839 if (_writeClosed) return; 854 if (_writeClosed) return;
840 _consumer.add(new _WebSocketPing()); 855 _consumer.add(new _WebSocketPing());
841 _pingTimer = new Timer(_pingInterval, () { 856 _pingTimer = new Timer(_pingInterval, () {
842 // No pong received. 857 // No pong received.
843 close(WebSocketStatus.GOING_AWAY); 858 _close(WebSocketStatus.GOING_AWAY);
844 }); 859 });
845 }); 860 });
846 } 861 }
847 862
848 int get readyState => _readyState; 863 int get readyState => _readyState;
849 864
850 String get extensions => null; 865 String get extensions => null;
851 String get protocol => null; 866 String get protocol => null;
852 int get closeCode => _closeCode; 867 int get closeCode => _closeCode;
853 String get closeReason => _closeReason; 868 String get closeReason => _closeReason;
854 869
855 void add(data) => _sink.add(data); 870 void add(data) => _sink.add(data);
856 void addError(error) => _sink.addError(error); 871 void addError(error) => _sink.addError(error);
857 Future addStream(Stream stream) => _sink.addStream(stream); 872 Future addStream(Stream stream) => _sink.addStream(stream);
858 Future get done => _sink.done; 873 Future get done => _sink.done;
859 874
860 Future close([int code, String reason]) { 875 Future close([int code, String reason]) {
861 if (!_writeClosed) { 876 if (_isReservedStatusCode(code)) {
862 if (_isReservedStatusCode(code)) { 877 throw new WebSocketException("Reserved status code $code");
863 throw new WebSocketException("Reserved status code $code"); 878 }
864 } 879 if (_outCloseCode == null) {
865 _outCloseCode = code; 880 _outCloseCode = code;
866 _outCloseReason = reason; 881 _outCloseReason = reason;
867 _writeClosed = true;
868 } 882 }
869 if (!(_sink as _StreamSinkImpl)._isBound) _sink.close(); 883 return _sink.close();
870 return _sink.done; 884 }
885
886 void _close([int code, String reason]) {
887 if (_writeClosed) return;
888 if (_outCloseCode == null) {
889 _outCloseCode = code;
890 _outCloseReason = reason;
891 }
892 _writeClosed = true;
893 _consumer.closeSocket();
871 } 894 }
872 895
873 static bool _isReservedStatusCode(int code) { 896 static bool _isReservedStatusCode(int code) {
874 return code != null && 897 return code != null &&
875 (code < WebSocketStatus.NORMAL_CLOSURE || 898 (code < WebSocketStatus.NORMAL_CLOSURE ||
876 code == WebSocketStatus.RESERVED_1004 || 899 code == WebSocketStatus.RESERVED_1004 ||
877 code == WebSocketStatus.NO_STATUS_RECEIVED || 900 code == WebSocketStatus.NO_STATUS_RECEIVED ||
878 code == WebSocketStatus.ABNORMAL_CLOSURE || 901 code == WebSocketStatus.ABNORMAL_CLOSURE ||
879 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && 902 (code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
880 code < WebSocketStatus.RESERVED_1015) || 903 code < WebSocketStatus.RESERVED_1015) ||
881 (code >= WebSocketStatus.RESERVED_1015 && 904 (code >= WebSocketStatus.RESERVED_1015 &&
882 code < 3000)); 905 code < 3000));
883 } 906 }
884 } 907 }
OLDNEW
« no previous file with comments | « no previous file | tests/standalone/io/web_socket_error_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698