OLD | NEW |
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library http_parser.web_socket; | 5 // The following code is copied from sdk/lib/io/websocket_impl.dart. The |
| 6 // "dart:io" implementation isn't used directly to support non-"dart:io" |
| 7 // applications. |
| 8 // |
| 9 // Because it's copied directly, only modifications necessary to support the |
| 10 // desired public API and to remove "dart:io" dependencies have been made. |
| 11 // |
| 12 // This is up-to-date as of sdk revision |
| 13 // 86227840d75d974feb238f8b3c59c038b99c05cf. |
| 14 library http_parser.copy.web_socket_impl; |
6 | 15 |
7 import 'dart:async'; | 16 import 'dart:async'; |
8 import 'dart:convert'; | 17 import 'dart:convert'; |
9 import 'dart:math'; | 18 import 'dart:math'; |
10 import 'dart:typed_data'; | 19 import 'dart:typed_data'; |
11 | 20 |
12 import 'package:crypto/crypto.dart'; | 21 import '../web_socket.dart'; |
| 22 import 'bytes_builder.dart'; |
| 23 import 'io_sink.dart'; |
| 24 import 'web_socket.dart'; |
13 | 25 |
14 import 'bytes_builder.dart'; | 26 const String webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; |
15 | |
16 /// An implementation of the WebSocket protocol that's not specific to "dart:io" | |
17 /// or to any particular HTTP API. | |
18 /// | |
19 /// Because this is HTTP-API-agnostic, it doesn't handle the initial [WebSocket | |
20 /// handshake][]. This needs to be handled manually by the user of the code. | |
21 /// Once that's been done, [new CompatibleWebSocket] can be called with the | |
22 /// underlying socket and it will handle the remainder of the protocol. | |
23 /// | |
24 /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4 | |
25 abstract class CompatibleWebSocket implements Stream, StreamSink { | |
26 /// The interval for sending ping signals. | |
27 /// | |
28 /// If a ping message is not answered by a pong message from the peer, the | |
29 /// `WebSocket` is assumed disconnected and the connection is closed with a | |
30 /// [WebSocketStatus.GOING_AWAY] close code. When a ping signal is sent, the | |
31 /// pong message must be received within [pingInterval]. | |
32 /// | |
33 /// There are never two outstanding pings at any given time, and the next ping | |
34 /// timer starts when the pong is received. | |
35 /// | |
36 /// By default, the [pingInterval] is `null`, indicating that ping messages | |
37 /// are disabled. | |
38 Duration pingInterval; | |
39 | |
40 /// The [close code][] set when the WebSocket connection is closed. | |
41 /// | |
42 /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5 | |
43 /// | |
44 /// Before the connection has been closed, this will be `null`. | |
45 int get closeCode; | |
46 | |
47 /// The [close reason][] set when the WebSocket connection is closed. | |
48 /// | |
49 /// [close reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6 | |
50 /// | |
51 /// Before the connection has been closed, this will be `null`. | |
52 String get closeReason; | |
53 | |
54 /// Signs a `Sec-WebSocket-Key` header sent by a WebSocket client as part of | |
55 /// the [initial handshake]. | |
56 /// | |
57 /// The return value should be sent back to the client in a | |
58 /// `Sec-WebSocket-Accept` header. | |
59 /// | |
60 /// [initial handshake]: https://tools.ietf.org/html/rfc6455#section-4.2.2 | |
61 static String signKey(String key) { | |
62 var hash = new SHA1(); | |
63 // We use [codeUnits] here rather than UTF-8-decoding the string because | |
64 // [key] is expected to be base64 encoded, and so will be pure ASCII. | |
65 hash.add((key + _webSocketGUID).codeUnits); | |
66 return CryptoUtils.bytesToBase64(hash.close()); | |
67 } | |
68 | |
69 /// Creates a new WebSocket handling messaging across an existing socket. | |
70 /// | |
71 /// Because this is HTTP-API-agnostic, the initial [WebSocket handshake][] | |
72 /// must have already been completed on the socket before this is called. | |
73 /// | |
74 /// If [stream] is also a [StreamSink] (for example, if it's a "dart:io" | |
75 /// `Socket`), it will be used for both sending and receiving data. Otherwise, | |
76 /// it will be used for receiving data and [sink] will be used for sending it. | |
77 /// | |
78 /// If this is a WebSocket server, [serverSide] should be `true` (the | |
79 /// default); if it's a client, [serverSide] should be `false`. | |
80 /// | |
81 /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4 | |
82 factory CompatibleWebSocket(Stream<List<int>> stream, | |
83 {StreamSink<List<int>> sink, bool serverSide: true}) { | |
84 if (sink == null) { | |
85 if (stream is! StreamSink) { | |
86 throw new ArgumentError("If stream isn't also a StreamSink, sink must " | |
87 "be passed explicitly."); | |
88 } | |
89 sink = stream as StreamSink; | |
90 } | |
91 | |
92 return new _WebSocketImpl._fromSocket(stream, sink, serverSide); | |
93 } | |
94 | |
95 /// Closes the web socket connection. | |
96 /// | |
97 /// [closeCode] and [closeReason] are the [close code][] and [reason][] sent | |
98 /// to the remote peer, respectively. If they are omitted, the peer will see | |
99 /// a "no status received" code with no reason. | |
100 /// | |
101 /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5 | |
102 /// [reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6 | |
103 Future close([int closeCode, String closeReason]); | |
104 } | |
105 | |
106 /// An exception thrown by [CompatibleWebSocket]. | |
107 class CompatibleWebSocketException implements Exception { | |
108 final String message; | |
109 | |
110 CompatibleWebSocketException([this.message]); | |
111 | |
112 String toString() => message == null | |
113 ? "CompatibleWebSocketException" : | |
114 "CompatibleWebSocketException: $message"; | |
115 } | |
116 | |
117 // The following code is copied from sdk/lib/io/websocket_impl.dart. The | |
118 // "dart:io" implementation isn't used directly both to support non-"dart:io" | |
119 // applications, and because it's incompatible with non-"dart:io" HTTP requests | |
120 // (issue 18172). | |
121 // | |
122 // Because it's copied directly, only modifications necessary to support the | |
123 // desired public API and to remove "dart:io" dependencies have been made. | |
124 | |
125 /** | |
126 * Web socket status codes used when closing a web socket connection. | |
127 */ | |
128 abstract class _WebSocketStatus { | |
129 static const int NORMAL_CLOSURE = 1000; | |
130 static const int GOING_AWAY = 1001; | |
131 static const int PROTOCOL_ERROR = 1002; | |
132 static const int UNSUPPORTED_DATA = 1003; | |
133 static const int RESERVED_1004 = 1004; | |
134 static const int NO_STATUS_RECEIVED = 1005; | |
135 static const int ABNORMAL_CLOSURE = 1006; | |
136 static const int INVALID_FRAME_PAYLOAD_DATA = 1007; | |
137 static const int POLICY_VIOLATION = 1008; | |
138 static const int MESSAGE_TOO_BIG = 1009; | |
139 static const int MISSING_MANDATORY_EXTENSION = 1010; | |
140 static const int INTERNAL_SERVER_ERROR = 1011; | |
141 static const int RESERVED_1015 = 1015; | |
142 } | |
143 | |
144 abstract class _WebSocketState { | |
145 static const int CONNECTING = 0; | |
146 static const int OPEN = 1; | |
147 static const int CLOSING = 2; | |
148 static const int CLOSED = 3; | |
149 } | |
150 | |
151 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; | |
152 | 27 |
153 final _random = new Random(); | 28 final _random = new Random(); |
154 | 29 |
155 // Matches _WebSocketOpcode. | 30 // Matches _WebSocketOpcode. |
156 class _WebSocketMessageType { | 31 class _WebSocketMessageType { |
157 static const int NONE = 0; | 32 static const int NONE = 0; |
158 static const int TEXT = 1; | 33 static const int TEXT = 1; |
159 static const int BINARY = 2; | 34 static const int BINARY = 2; |
160 } | 35 } |
161 | 36 |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
201 int _state = START; | 76 int _state = START; |
202 bool _fin = false; | 77 bool _fin = false; |
203 int _opcode = -1; | 78 int _opcode = -1; |
204 int _len = -1; | 79 int _len = -1; |
205 bool _masked = false; | 80 bool _masked = false; |
206 int _remainingLenBytes = -1; | 81 int _remainingLenBytes = -1; |
207 int _remainingMaskingKeyBytes = 4; | 82 int _remainingMaskingKeyBytes = 4; |
208 int _remainingPayloadBytes = -1; | 83 int _remainingPayloadBytes = -1; |
209 int _unmaskingIndex = 0; | 84 int _unmaskingIndex = 0; |
210 int _currentMessageType = _WebSocketMessageType.NONE; | 85 int _currentMessageType = _WebSocketMessageType.NONE; |
211 int closeCode = _WebSocketStatus.NO_STATUS_RECEIVED; | 86 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; |
212 String closeReason = ""; | 87 String closeReason = ""; |
213 | 88 |
214 EventSink _eventSink; | 89 EventSink _eventSink; |
215 | 90 |
216 final bool _serverSide; | 91 final bool _serverSide; |
217 final List _maskingBytes = new List(4); | 92 final List _maskingBytes = new List(4); |
218 final BytesBuilder _payload = new BytesBuilder(copy: false); | 93 final BytesBuilder _payload = new BytesBuilder(copy: false); |
219 | 94 |
220 _WebSocketProtocolTransformer([this._serverSide = false]); | 95 _WebSocketProtocolTransformer([this._serverSide = false]); |
221 | 96 |
(...skipping 219 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
441 break; | 316 break; |
442 } | 317 } |
443 _currentMessageType = _WebSocketMessageType.NONE; | 318 _currentMessageType = _WebSocketMessageType.NONE; |
444 } | 319 } |
445 _prepareForNextFrame(); | 320 _prepareForNextFrame(); |
446 } | 321 } |
447 | 322 |
448 void _controlFrameEnd() { | 323 void _controlFrameEnd() { |
449 switch (_opcode) { | 324 switch (_opcode) { |
450 case _WebSocketOpcode.CLOSE: | 325 case _WebSocketOpcode.CLOSE: |
451 closeCode = _WebSocketStatus.NO_STATUS_RECEIVED; | 326 closeCode = WebSocketStatus.NO_STATUS_RECEIVED; |
452 var payload = _payload.takeBytes(); | 327 var payload = _payload.takeBytes(); |
453 if (payload.length > 0) { | 328 if (payload.length > 0) { |
454 if (payload.length == 1) { | 329 if (payload.length == 1) { |
455 throw new CompatibleWebSocketException("Protocol error"); | 330 throw new CompatibleWebSocketException("Protocol error"); |
456 } | 331 } |
457 closeCode = payload[0] << 8 | payload[1]; | 332 closeCode = payload[0] << 8 | payload[1]; |
458 if (closeCode == _WebSocketStatus.NO_STATUS_RECEIVED) { | 333 if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) { |
459 throw new CompatibleWebSocketException("Protocol error"); | 334 throw new CompatibleWebSocketException("Protocol error"); |
460 } | 335 } |
461 if (payload.length > 2) { | 336 if (payload.length > 2) { |
462 closeReason = UTF8.decode(payload.sublist(2)); | 337 closeReason = UTF8.decode(payload.sublist(2)); |
463 } | 338 } |
464 } | 339 } |
465 _state = CLOSED; | 340 _state = CLOSED; |
466 _eventSink.close(); | 341 _eventSink.close(); |
467 break; | 342 break; |
468 | 343 |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
502 } | 377 } |
503 | 378 |
504 | 379 |
505 class _WebSocketPong { | 380 class _WebSocketPong { |
506 final List<int> payload; | 381 final List<int> payload; |
507 _WebSocketPong([this.payload = null]); | 382 _WebSocketPong([this.payload = null]); |
508 } | 383 } |
509 | 384 |
510 // TODO(ajohnsen): Make this transformer reusable. | 385 // TODO(ajohnsen): Make this transformer reusable. |
511 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { | 386 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
512 final _WebSocketImpl webSocket; | 387 final WebSocketImpl webSocket; |
513 EventSink _eventSink; | 388 EventSink _eventSink; |
514 | 389 |
515 _WebSocketOutgoingTransformer(this.webSocket); | 390 _WebSocketOutgoingTransformer(this.webSocket); |
516 | 391 |
517 Stream bind(Stream stream) { | 392 Stream bind(Stream stream) { |
518 return new Stream.eventTransformed( | 393 return new Stream.eventTransformed( |
519 stream, | 394 stream, |
520 (EventSink eventSink) { | 395 (EventSink eventSink) { |
521 if (_eventSink != null) { | 396 if (_eventSink != null) { |
522 throw new StateError("WebSocket transformer already used"); | 397 throw new StateError("WebSocket transformer already used"); |
(...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
656 if (data == null) { | 531 if (data == null) { |
657 return [header]; | 532 return [header]; |
658 } else { | 533 } else { |
659 return [header, data]; | 534 return [header, data]; |
660 } | 535 } |
661 } | 536 } |
662 } | 537 } |
663 | 538 |
664 | 539 |
665 class _WebSocketConsumer implements StreamConsumer { | 540 class _WebSocketConsumer implements StreamConsumer { |
666 final _WebSocketImpl webSocket; | 541 final WebSocketImpl webSocket; |
667 final StreamSink<List<int>> sink; | 542 final StreamSink<List<int>> sink; |
668 StreamController _controller; | 543 StreamController _controller; |
669 StreamSubscription _subscription; | 544 StreamSubscription _subscription; |
670 bool _issuedPause = false; | 545 bool _issuedPause = false; |
671 bool _closed = false; | 546 bool _closed = false; |
672 Completer _closeCompleter = new Completer(); | 547 Completer _closeCompleter = new Completer(); |
673 Completer _completer; | 548 Completer _completer; |
674 | 549 |
675 _WebSocketConsumer(this.webSocket, this.sink); | 550 _WebSocketConsumer(this.webSocket, this.sink); |
676 | 551 |
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
778 } | 653 } |
779 | 654 |
780 void closeSocket() { | 655 void closeSocket() { |
781 _closed = true; | 656 _closed = true; |
782 _cancel(); | 657 _cancel(); |
783 close(); | 658 close(); |
784 } | 659 } |
785 } | 660 } |
786 | 661 |
787 | 662 |
788 class _WebSocketImpl extends Stream implements CompatibleWebSocket { | 663 class WebSocketImpl extends Stream with _ServiceObject |
| 664 implements CompatibleWebSocket { |
| 665 // Use default Map so we keep order. |
| 666 static Map<int, WebSocketImpl> _webSockets = new Map<int, WebSocketImpl>(); |
| 667 |
| 668 final String protocol; |
| 669 |
789 StreamController _controller; | 670 StreamController _controller; |
790 StreamSubscription _subscription; | 671 StreamSubscription _subscription; |
791 StreamController _sink; | 672 StreamSink _sink; |
792 | 673 |
793 final bool _serverSide; | 674 final bool _serverSide; |
794 int _readyState = _WebSocketState.CONNECTING; | 675 int _readyState = WebSocket.CONNECTING; |
795 bool _writeClosed = false; | 676 bool _writeClosed = false; |
796 int _closeCode; | 677 int _closeCode; |
797 String _closeReason; | 678 String _closeReason; |
798 Duration _pingInterval; | 679 Duration _pingInterval; |
799 Timer _pingTimer; | 680 Timer _pingTimer; |
800 _WebSocketConsumer _consumer; | 681 _WebSocketConsumer _consumer; |
801 | 682 |
802 int _outCloseCode; | 683 int _outCloseCode; |
803 String _outCloseReason; | 684 String _outCloseReason; |
804 Timer _closeTimer; | 685 Timer _closeTimer; |
805 | 686 |
806 _WebSocketImpl._fromSocket(Stream<List<int>> stream, | 687 WebSocketImpl.fromSocket(Stream<List<int>> stream, |
807 StreamSink<List<int>> sink, [this._serverSide = false]) { | 688 StreamSink<List<int>> sink, this.protocol, [this._serverSide = false]) { |
808 _consumer = new _WebSocketConsumer(this, sink); | 689 _consumer = new _WebSocketConsumer(this, sink); |
809 _sink = new StreamController(); | 690 _sink = new StreamSinkImpl(_consumer); |
810 _sink.stream.pipe(_consumer); | 691 _readyState = WebSocket.OPEN; |
811 _readyState = _WebSocketState.OPEN; | |
812 | 692 |
813 var transformer = new _WebSocketProtocolTransformer(_serverSide); | 693 var transformer = new _WebSocketProtocolTransformer(_serverSide); |
814 _subscription = stream.transform(transformer).listen( | 694 _subscription = stream.transform(transformer).listen( |
815 (data) { | 695 (data) { |
816 if (data is _WebSocketPing) { | 696 if (data is _WebSocketPing) { |
817 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); | 697 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); |
818 } else if (data is _WebSocketPong) { | 698 } else if (data is _WebSocketPong) { |
819 // Simply set pingInterval, as it'll cancel any timers. | 699 // Simply set pingInterval, as it'll cancel any timers. |
820 pingInterval = _pingInterval; | 700 pingInterval = _pingInterval; |
821 } else { | 701 } else { |
822 _controller.add(data); | 702 _controller.add(data); |
823 } | 703 } |
824 }, | 704 }, |
825 onError: (error) { | 705 onError: (error) { |
826 if (_closeTimer != null) _closeTimer.cancel(); | 706 if (_closeTimer != null) _closeTimer.cancel(); |
827 if (error is FormatException) { | 707 if (error is FormatException) { |
828 _close(_WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); | 708 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); |
829 } else { | 709 } else { |
830 _close(_WebSocketStatus.PROTOCOL_ERROR); | 710 _close(WebSocketStatus.PROTOCOL_ERROR); |
831 } | 711 } |
| 712 // An error happened, set the close code set above. |
| 713 _closeCode = _outCloseCode; |
| 714 _closeReason = _outCloseReason; |
832 _controller.close(); | 715 _controller.close(); |
833 }, | 716 }, |
834 onDone: () { | 717 onDone: () { |
835 if (_closeTimer != null) _closeTimer.cancel(); | 718 if (_closeTimer != null) _closeTimer.cancel(); |
836 if (_readyState == _WebSocketState.OPEN) { | 719 if (_readyState == WebSocket.OPEN) { |
837 _readyState = _WebSocketState.CLOSING; | 720 _readyState = WebSocket.CLOSING; |
838 if (!_isReservedStatusCode(transformer.closeCode)) { | 721 if (!_isReservedStatusCode(transformer.closeCode)) { |
839 _close(transformer.closeCode); | 722 _close(transformer.closeCode); |
840 } else { | 723 } else { |
841 _close(); | 724 _close(); |
842 } | 725 } |
843 _readyState = _WebSocketState.CLOSED; | 726 _readyState = WebSocket.CLOSED; |
844 } | 727 } |
| 728 // Protocol close, use close code from transformer. |
845 _closeCode = transformer.closeCode; | 729 _closeCode = transformer.closeCode; |
846 _closeReason = transformer.closeReason; | 730 _closeReason = transformer.closeReason; |
847 _controller.close(); | 731 _controller.close(); |
848 }, | 732 }, |
849 cancelOnError: true); | 733 cancelOnError: true); |
850 _subscription.pause(); | 734 _subscription.pause(); |
851 _controller = new StreamController(sync: true, | 735 _controller = new StreamController(sync: true, |
852 onListen: _subscription.resume, | 736 onListen: () => _subscription.resume(), |
| 737 onCancel: () { |
| 738 _subscription.cancel(); |
| 739 _subscription = null; |
| 740 }, |
853 onPause: _subscription.pause, | 741 onPause: _subscription.pause, |
854 onResume: _subscription.resume); | 742 onResume: _subscription.resume); |
| 743 |
| 744 _webSockets[_serviceId] = this; |
855 } | 745 } |
856 | 746 |
857 StreamSubscription listen(void onData(message), | 747 StreamSubscription listen(void onData(message), |
858 {Function onError, | 748 {Function onError, |
859 void onDone(), | 749 void onDone(), |
860 bool cancelOnError}) { | 750 bool cancelOnError}) { |
861 return _controller.stream.listen(onData, | 751 return _controller.stream.listen(onData, |
862 onError: onError, | 752 onError: onError, |
863 onDone: onDone, | 753 onDone: onDone, |
864 cancelOnError: cancelOnError); | 754 cancelOnError: cancelOnError); |
865 } | 755 } |
866 | 756 |
867 Duration get pingInterval => _pingInterval; | 757 Duration get pingInterval => _pingInterval; |
868 | 758 |
869 void set pingInterval(Duration interval) { | 759 void set pingInterval(Duration interval) { |
870 if (_writeClosed) return; | 760 if (_writeClosed) return; |
871 if (_pingTimer != null) _pingTimer.cancel(); | 761 if (_pingTimer != null) _pingTimer.cancel(); |
872 _pingInterval = interval; | 762 _pingInterval = interval; |
873 | 763 |
874 if (_pingInterval == null) return; | 764 if (_pingInterval == null) return; |
875 | 765 |
876 _pingTimer = new Timer(_pingInterval, () { | 766 _pingTimer = new Timer(_pingInterval, () { |
877 if (_writeClosed) return; | 767 if (_writeClosed) return; |
878 _consumer.add(new _WebSocketPing()); | 768 _consumer.add(new _WebSocketPing()); |
879 _pingTimer = new Timer(_pingInterval, () { | 769 _pingTimer = new Timer(_pingInterval, () { |
880 // No pong received. | 770 // No pong received. |
881 _close(_WebSocketStatus.GOING_AWAY); | 771 _close(WebSocketStatus.GOING_AWAY); |
882 }); | 772 }); |
883 }); | 773 }); |
884 } | 774 } |
885 | 775 |
| 776 int get readyState => _readyState; |
| 777 |
| 778 String get extensions => null; |
886 int get closeCode => _closeCode; | 779 int get closeCode => _closeCode; |
887 String get closeReason => _closeReason; | 780 String get closeReason => _closeReason; |
888 | 781 |
889 void add(data) => _sink.add(data); | 782 void add(data) => _sink.add(data); |
890 void addError(error, [StackTrace stackTrace]) => | 783 void addError(error, [StackTrace stackTrace]) => |
891 _sink.addError(error, stackTrace); | 784 _sink.addError(error, stackTrace); |
892 Future addStream(Stream stream) => _sink.addStream(stream); | 785 Future addStream(Stream stream) => _sink.addStream(stream); |
893 Future get done => _sink.done; | 786 Future get done => _sink.done; |
894 | 787 |
895 Future close([int code, String reason]) { | 788 Future close([int code, String reason]) { |
896 if (_isReservedStatusCode(code)) { | 789 if (_isReservedStatusCode(code)) { |
897 throw new CompatibleWebSocketException("Reserved status code $code"); | 790 throw new CompatibleWebSocketException("Reserved status code $code"); |
898 } | 791 } |
899 if (_outCloseCode == null) { | 792 if (_outCloseCode == null) { |
900 _outCloseCode = code; | 793 _outCloseCode = code; |
901 _outCloseReason = reason; | 794 _outCloseReason = reason; |
902 } | 795 } |
903 if (_closeTimer == null && !_controller.isClosed) { | 796 if (!_controller.isClosed) { |
904 // When closing the web-socket, we no longer accept data. | 797 // If a close has not yet been received from the other end then |
905 _closeTimer = new Timer(const Duration(seconds: 5), () { | 798 // 1) make sure to listen on the stream so the close frame will be |
906 _subscription.cancel(); | 799 // processed if received. |
907 _controller.close(); | 800 // 2) set a timer terminate the connection if a close frame is |
908 }); | 801 // not received. |
| 802 if (!_controller.hasListener && _subscription != null) { |
| 803 _controller.stream.drain().catchError((_) => {}); |
| 804 } |
| 805 if (_closeTimer == null) { |
| 806 // When closing the web-socket, we no longer accept data. |
| 807 _closeTimer = new Timer(const Duration(seconds: 5), () { |
| 808 // Reuse code and reason from the local close. |
| 809 _closeCode = _outCloseCode; |
| 810 _closeReason = _outCloseReason; |
| 811 if (_subscription != null) _subscription.cancel(); |
| 812 _controller.close(); |
| 813 _webSockets.remove(_serviceId); |
| 814 }); |
| 815 } |
909 } | 816 } |
910 return _sink.close(); | 817 return _sink.close(); |
911 } | 818 } |
912 | 819 |
913 void _close([int code, String reason]) { | 820 void _close([int code, String reason]) { |
914 if (_writeClosed) return; | 821 if (_writeClosed) return; |
915 if (_outCloseCode == null) { | 822 if (_outCloseCode == null) { |
916 _outCloseCode = code; | 823 _outCloseCode = code; |
917 _outCloseReason = reason; | 824 _outCloseReason = reason; |
918 } | 825 } |
919 _writeClosed = true; | 826 _writeClosed = true; |
920 _consumer.closeSocket(); | 827 _consumer.closeSocket(); |
| 828 _webSockets.remove(_serviceId); |
921 } | 829 } |
922 | 830 |
| 831 // The _toJSON, _serviceTypePath, and _serviceTypeName methods |
| 832 // have been deleted for http_parser. The methods were unused in WebSocket |
| 833 // code and produced warnings. |
| 834 |
923 static bool _isReservedStatusCode(int code) { | 835 static bool _isReservedStatusCode(int code) { |
924 return code != null && | 836 return code != null && |
925 (code < _WebSocketStatus.NORMAL_CLOSURE || | 837 (code < WebSocketStatus.NORMAL_CLOSURE || |
926 code == _WebSocketStatus.RESERVED_1004 || | 838 code == WebSocketStatus.RESERVED_1004 || |
927 code == _WebSocketStatus.NO_STATUS_RECEIVED || | 839 code == WebSocketStatus.NO_STATUS_RECEIVED || |
928 code == _WebSocketStatus.ABNORMAL_CLOSURE || | 840 code == WebSocketStatus.ABNORMAL_CLOSURE || |
929 (code > _WebSocketStatus.INTERNAL_SERVER_ERROR && | 841 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && |
930 code < _WebSocketStatus.RESERVED_1015) || | 842 code < WebSocketStatus.RESERVED_1015) || |
931 (code >= _WebSocketStatus.RESERVED_1015 && | 843 (code >= WebSocketStatus.RESERVED_1015 && |
932 code < 3000)); | 844 code < 3000)); |
933 } | 845 } |
934 } | 846 } |
935 | 847 |
| 848 // The following code is from sdk/lib/io/service_object.dart. |
| 849 |
| 850 int _nextServiceId = 1; |
| 851 |
| 852 // TODO(ajohnsen): Use other way of getting a uniq id. |
| 853 abstract class _ServiceObject { |
| 854 int __serviceId = 0; |
| 855 int get _serviceId { |
| 856 if (__serviceId == 0) __serviceId = _nextServiceId++; |
| 857 return __serviceId; |
| 858 } |
| 859 |
| 860 // The _toJSON, _servicePath, _serviceTypePath, _serviceTypeName, and |
| 861 // _serviceType methods have been deleted for http_parser. The methods were |
| 862 // unused in WebSocket code and produced warnings. |
| 863 } |
OLD | NEW |