Index: lib/src/copy/web_socket_impl.dart |
diff --git a/lib/src/web_socket.dart b/lib/src/copy/web_socket_impl.dart |
similarity index 76% |
copy from lib/src/web_socket.dart |
copy to lib/src/copy/web_socket_impl.dart |
index 968c70b0e13dd0606b2e75309425c41b84fc5cf6..a53a2891d95f2d10e5a4ddb4da0544805bab43db 100644 |
--- a/lib/src/web_socket.dart |
+++ b/lib/src/copy/web_socket_impl.dart |
@@ -1,154 +1,29 @@ |
-// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
// for details. All rights reserved. Use of this source code is governed by a |
// BSD-style license that can be found in the LICENSE file. |
-library http_parser.web_socket; |
+// The following code is copied from sdk/lib/io/websocket_impl.dart. The |
+// "dart:io" implementation isn't used directly to support non-"dart:io" |
+// applications. |
+// |
+// Because it's copied directly, only modifications necessary to support the |
+// desired public API and to remove "dart:io" dependencies have been made. |
+// |
+// This is up-to-date as of sdk revision |
+// 86227840d75d974feb238f8b3c59c038b99c05cf. |
+library http_parser.copy.web_socket_impl; |
import 'dart:async'; |
import 'dart:convert'; |
import 'dart:math'; |
import 'dart:typed_data'; |
-import 'package:crypto/crypto.dart'; |
- |
+import '../web_socket.dart'; |
import 'bytes_builder.dart'; |
+import 'io_sink.dart'; |
+import 'web_socket.dart'; |
-/// An implementation of the WebSocket protocol that's not specific to "dart:io" |
-/// or to any particular HTTP API. |
-/// |
-/// Because this is HTTP-API-agnostic, it doesn't handle the initial [WebSocket |
-/// handshake][]. This needs to be handled manually by the user of the code. |
-/// Once that's been done, [new CompatibleWebSocket] can be called with the |
-/// underlying socket and it will handle the remainder of the protocol. |
-/// |
-/// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4 |
-abstract class CompatibleWebSocket implements Stream, StreamSink { |
- /// The interval for sending ping signals. |
- /// |
- /// If a ping message is not answered by a pong message from the peer, the |
- /// `WebSocket` is assumed disconnected and the connection is closed with a |
- /// [WebSocketStatus.GOING_AWAY] close code. When a ping signal is sent, the |
- /// pong message must be received within [pingInterval]. |
- /// |
- /// There are never two outstanding pings at any given time, and the next ping |
- /// timer starts when the pong is received. |
- /// |
- /// By default, the [pingInterval] is `null`, indicating that ping messages |
- /// are disabled. |
- Duration pingInterval; |
- |
- /// The [close code][] set when the WebSocket connection is closed. |
- /// |
- /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5 |
- /// |
- /// Before the connection has been closed, this will be `null`. |
- int get closeCode; |
- |
- /// The [close reason][] set when the WebSocket connection is closed. |
- /// |
- /// [close reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6 |
- /// |
- /// Before the connection has been closed, this will be `null`. |
- String get closeReason; |
- |
- /// Signs a `Sec-WebSocket-Key` header sent by a WebSocket client as part of |
- /// the [initial handshake]. |
- /// |
- /// The return value should be sent back to the client in a |
- /// `Sec-WebSocket-Accept` header. |
- /// |
- /// [initial handshake]: https://tools.ietf.org/html/rfc6455#section-4.2.2 |
- static String signKey(String key) { |
- var hash = new SHA1(); |
- // We use [codeUnits] here rather than UTF-8-decoding the string because |
- // [key] is expected to be base64 encoded, and so will be pure ASCII. |
- hash.add((key + _webSocketGUID).codeUnits); |
- return CryptoUtils.bytesToBase64(hash.close()); |
- } |
- |
- /// Creates a new WebSocket handling messaging across an existing socket. |
- /// |
- /// Because this is HTTP-API-agnostic, the initial [WebSocket handshake][] |
- /// must have already been completed on the socket before this is called. |
- /// |
- /// If [stream] is also a [StreamSink] (for example, if it's a "dart:io" |
- /// `Socket`), it will be used for both sending and receiving data. Otherwise, |
- /// it will be used for receiving data and [sink] will be used for sending it. |
- /// |
- /// If this is a WebSocket server, [serverSide] should be `true` (the |
- /// default); if it's a client, [serverSide] should be `false`. |
- /// |
- /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4 |
- factory CompatibleWebSocket(Stream<List<int>> stream, |
- {StreamSink<List<int>> sink, bool serverSide: true}) { |
- if (sink == null) { |
- if (stream is! StreamSink) { |
- throw new ArgumentError("If stream isn't also a StreamSink, sink must " |
- "be passed explicitly."); |
- } |
- sink = stream as StreamSink; |
- } |
- |
- return new _WebSocketImpl._fromSocket(stream, sink, serverSide); |
- } |
- |
- /// Closes the web socket connection. |
- /// |
- /// [closeCode] and [closeReason] are the [close code][] and [reason][] sent |
- /// to the remote peer, respectively. If they are omitted, the peer will see |
- /// a "no status received" code with no reason. |
- /// |
- /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5 |
- /// [reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6 |
- Future close([int closeCode, String closeReason]); |
-} |
- |
-/// An exception thrown by [CompatibleWebSocket]. |
-class CompatibleWebSocketException implements Exception { |
- final String message; |
- |
- CompatibleWebSocketException([this.message]); |
- |
- String toString() => message == null |
- ? "CompatibleWebSocketException" : |
- "CompatibleWebSocketException: $message"; |
-} |
- |
-// The following code is copied from sdk/lib/io/websocket_impl.dart. The |
-// "dart:io" implementation isn't used directly both to support non-"dart:io" |
-// applications, and because it's incompatible with non-"dart:io" HTTP requests |
-// (issue 18172). |
-// |
-// Because it's copied directly, only modifications necessary to support the |
-// desired public API and to remove "dart:io" dependencies have been made. |
- |
-/** |
- * Web socket status codes used when closing a web socket connection. |
- */ |
-abstract class _WebSocketStatus { |
- static const int NORMAL_CLOSURE = 1000; |
- static const int GOING_AWAY = 1001; |
- static const int PROTOCOL_ERROR = 1002; |
- static const int UNSUPPORTED_DATA = 1003; |
- static const int RESERVED_1004 = 1004; |
- static const int NO_STATUS_RECEIVED = 1005; |
- static const int ABNORMAL_CLOSURE = 1006; |
- static const int INVALID_FRAME_PAYLOAD_DATA = 1007; |
- static const int POLICY_VIOLATION = 1008; |
- static const int MESSAGE_TOO_BIG = 1009; |
- static const int MISSING_MANDATORY_EXTENSION = 1010; |
- static const int INTERNAL_SERVER_ERROR = 1011; |
- static const int RESERVED_1015 = 1015; |
-} |
- |
-abstract class _WebSocketState { |
- static const int CONNECTING = 0; |
- static const int OPEN = 1; |
- static const int CLOSING = 2; |
- static const int CLOSED = 3; |
-} |
- |
-const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; |
+const String webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; |
final _random = new Random(); |
@@ -208,7 +83,7 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
int _remainingPayloadBytes = -1; |
int _unmaskingIndex = 0; |
int _currentMessageType = _WebSocketMessageType.NONE; |
- int closeCode = _WebSocketStatus.NO_STATUS_RECEIVED; |
+ int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; |
String closeReason = ""; |
EventSink _eventSink; |
@@ -448,14 +323,14 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
void _controlFrameEnd() { |
switch (_opcode) { |
case _WebSocketOpcode.CLOSE: |
- closeCode = _WebSocketStatus.NO_STATUS_RECEIVED; |
+ closeCode = WebSocketStatus.NO_STATUS_RECEIVED; |
var payload = _payload.takeBytes(); |
if (payload.length > 0) { |
if (payload.length == 1) { |
throw new CompatibleWebSocketException("Protocol error"); |
} |
closeCode = payload[0] << 8 | payload[1]; |
- if (closeCode == _WebSocketStatus.NO_STATUS_RECEIVED) { |
+ if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) { |
throw new CompatibleWebSocketException("Protocol error"); |
} |
if (payload.length > 2) { |
@@ -509,7 +384,7 @@ class _WebSocketPong { |
// TODO(ajohnsen): Make this transformer reusable. |
class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
- final _WebSocketImpl webSocket; |
+ final WebSocketImpl webSocket; |
EventSink _eventSink; |
_WebSocketOutgoingTransformer(this.webSocket); |
@@ -663,7 +538,7 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
class _WebSocketConsumer implements StreamConsumer { |
- final _WebSocketImpl webSocket; |
+ final WebSocketImpl webSocket; |
final StreamSink<List<int>> sink; |
StreamController _controller; |
StreamSubscription _subscription; |
@@ -785,13 +660,19 @@ class _WebSocketConsumer implements StreamConsumer { |
} |
-class _WebSocketImpl extends Stream implements CompatibleWebSocket { |
+class WebSocketImpl extends Stream with _ServiceObject |
+ implements CompatibleWebSocket { |
+ // Use default Map so we keep order. |
+ static Map<int, WebSocketImpl> _webSockets = new Map<int, WebSocketImpl>(); |
+ |
+ final String protocol; |
+ |
StreamController _controller; |
StreamSubscription _subscription; |
- StreamController _sink; |
+ StreamSink _sink; |
final bool _serverSide; |
- int _readyState = _WebSocketState.CONNECTING; |
+ int _readyState = WebSocket.CONNECTING; |
bool _writeClosed = false; |
int _closeCode; |
String _closeReason; |
@@ -803,12 +684,11 @@ class _WebSocketImpl extends Stream implements CompatibleWebSocket { |
String _outCloseReason; |
Timer _closeTimer; |
- _WebSocketImpl._fromSocket(Stream<List<int>> stream, |
- StreamSink<List<int>> sink, [this._serverSide = false]) { |
+ WebSocketImpl.fromSocket(Stream<List<int>> stream, |
+ StreamSink<List<int>> sink, this.protocol, [this._serverSide = false]) { |
_consumer = new _WebSocketConsumer(this, sink); |
- _sink = new StreamController(); |
- _sink.stream.pipe(_consumer); |
- _readyState = _WebSocketState.OPEN; |
+ _sink = new StreamSinkImpl(_consumer); |
+ _readyState = WebSocket.OPEN; |
var transformer = new _WebSocketProtocolTransformer(_serverSide); |
_subscription = stream.transform(transformer).listen( |
@@ -825,23 +705,27 @@ class _WebSocketImpl extends Stream implements CompatibleWebSocket { |
onError: (error) { |
if (_closeTimer != null) _closeTimer.cancel(); |
if (error is FormatException) { |
- _close(_WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); |
+ _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); |
} else { |
- _close(_WebSocketStatus.PROTOCOL_ERROR); |
+ _close(WebSocketStatus.PROTOCOL_ERROR); |
} |
+ // An error happened, set the close code set above. |
+ _closeCode = _outCloseCode; |
+ _closeReason = _outCloseReason; |
_controller.close(); |
}, |
onDone: () { |
if (_closeTimer != null) _closeTimer.cancel(); |
- if (_readyState == _WebSocketState.OPEN) { |
- _readyState = _WebSocketState.CLOSING; |
+ if (_readyState == WebSocket.OPEN) { |
+ _readyState = WebSocket.CLOSING; |
if (!_isReservedStatusCode(transformer.closeCode)) { |
_close(transformer.closeCode); |
} else { |
_close(); |
} |
- _readyState = _WebSocketState.CLOSED; |
+ _readyState = WebSocket.CLOSED; |
} |
+ // Protocol close, use close code from transformer. |
_closeCode = transformer.closeCode; |
_closeReason = transformer.closeReason; |
_controller.close(); |
@@ -849,9 +733,15 @@ class _WebSocketImpl extends Stream implements CompatibleWebSocket { |
cancelOnError: true); |
_subscription.pause(); |
_controller = new StreamController(sync: true, |
- onListen: _subscription.resume, |
+ onListen: () => _subscription.resume(), |
+ onCancel: () { |
+ _subscription.cancel(); |
+ _subscription = null; |
+ }, |
onPause: _subscription.pause, |
onResume: _subscription.resume); |
+ |
+ _webSockets[_serviceId] = this; |
} |
StreamSubscription listen(void onData(message), |
@@ -878,11 +768,14 @@ class _WebSocketImpl extends Stream implements CompatibleWebSocket { |
_consumer.add(new _WebSocketPing()); |
_pingTimer = new Timer(_pingInterval, () { |
// No pong received. |
- _close(_WebSocketStatus.GOING_AWAY); |
+ _close(WebSocketStatus.GOING_AWAY); |
}); |
}); |
} |
+ int get readyState => _readyState; |
+ |
+ String get extensions => null; |
int get closeCode => _closeCode; |
String get closeReason => _closeReason; |
@@ -900,12 +793,26 @@ class _WebSocketImpl extends Stream implements CompatibleWebSocket { |
_outCloseCode = code; |
_outCloseReason = reason; |
} |
- if (_closeTimer == null && !_controller.isClosed) { |
- // When closing the web-socket, we no longer accept data. |
- _closeTimer = new Timer(const Duration(seconds: 5), () { |
- _subscription.cancel(); |
- _controller.close(); |
- }); |
+ if (!_controller.isClosed) { |
+ // If a close has not yet been received from the other end then |
+ // 1) make sure to listen on the stream so the close frame will be |
+ // processed if received. |
+ // 2) set a timer terminate the connection if a close frame is |
+ // not received. |
+ if (!_controller.hasListener && _subscription != null) { |
+ _controller.stream.drain().catchError((_) => {}); |
+ } |
+ if (_closeTimer == null) { |
+ // When closing the web-socket, we no longer accept data. |
+ _closeTimer = new Timer(const Duration(seconds: 5), () { |
+ // Reuse code and reason from the local close. |
+ _closeCode = _outCloseCode; |
+ _closeReason = _outCloseReason; |
+ if (_subscription != null) _subscription.cancel(); |
+ _controller.close(); |
+ _webSockets.remove(_serviceId); |
+ }); |
+ } |
} |
return _sink.close(); |
} |
@@ -918,18 +825,39 @@ class _WebSocketImpl extends Stream implements CompatibleWebSocket { |
} |
_writeClosed = true; |
_consumer.closeSocket(); |
+ _webSockets.remove(_serviceId); |
} |
+ // The _toJSON, _serviceTypePath, and _serviceTypeName methods |
+ // have been deleted for http_parser. The methods were unused in WebSocket |
+ // code and produced warnings. |
+ |
static bool _isReservedStatusCode(int code) { |
return code != null && |
- (code < _WebSocketStatus.NORMAL_CLOSURE || |
- code == _WebSocketStatus.RESERVED_1004 || |
- code == _WebSocketStatus.NO_STATUS_RECEIVED || |
- code == _WebSocketStatus.ABNORMAL_CLOSURE || |
- (code > _WebSocketStatus.INTERNAL_SERVER_ERROR && |
- code < _WebSocketStatus.RESERVED_1015) || |
- (code >= _WebSocketStatus.RESERVED_1015 && |
+ (code < WebSocketStatus.NORMAL_CLOSURE || |
+ code == WebSocketStatus.RESERVED_1004 || |
+ code == WebSocketStatus.NO_STATUS_RECEIVED || |
+ code == WebSocketStatus.ABNORMAL_CLOSURE || |
+ (code > WebSocketStatus.INTERNAL_SERVER_ERROR && |
+ code < WebSocketStatus.RESERVED_1015) || |
+ (code >= WebSocketStatus.RESERVED_1015 && |
code < 3000)); |
} |
} |
+// The following code is from sdk/lib/io/service_object.dart. |
+ |
+int _nextServiceId = 1; |
+ |
+// TODO(ajohnsen): Use other way of getting a uniq id. |
+abstract class _ServiceObject { |
+ int __serviceId = 0; |
+ int get _serviceId { |
+ if (__serviceId == 0) __serviceId = _nextServiceId++; |
+ return __serviceId; |
+ } |
+ |
+ // The _toJSON, _servicePath, _serviceTypePath, _serviceTypeName, and |
+ // _serviceType methods have been deleted for http_parser. The methods were |
+ // unused in WebSocket code and produced warnings. |
+} |