Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(36)

Unified Diff: lib/src/copy/web_socket_impl.dart

Issue 1225403008: Bring in latest dart:io WebSocket code. (Closed) Base URL: git@github.com:dart-lang/http_parser@master
Patch Set: pubspec + changelog Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « lib/src/copy/web_socket.dart ('k') | lib/src/web_socket.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: lib/src/copy/web_socket_impl.dart
diff --git a/lib/src/web_socket.dart b/lib/src/copy/web_socket_impl.dart
similarity index 76%
copy from lib/src/web_socket.dart
copy to lib/src/copy/web_socket_impl.dart
index 968c70b0e13dd0606b2e75309425c41b84fc5cf6..a53a2891d95f2d10e5a4ddb4da0544805bab43db 100644
--- a/lib/src/web_socket.dart
+++ b/lib/src/copy/web_socket_impl.dart
@@ -1,154 +1,29 @@
-// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-library http_parser.web_socket;
+// The following code is copied from sdk/lib/io/websocket_impl.dart. The
+// "dart:io" implementation isn't used directly to support non-"dart:io"
+// applications.
+//
+// Because it's copied directly, only modifications necessary to support the
+// desired public API and to remove "dart:io" dependencies have been made.
+//
+// This is up-to-date as of sdk revision
+// 86227840d75d974feb238f8b3c59c038b99c05cf.
+library http_parser.copy.web_socket_impl;
import 'dart:async';
import 'dart:convert';
import 'dart:math';
import 'dart:typed_data';
-import 'package:crypto/crypto.dart';
-
+import '../web_socket.dart';
import 'bytes_builder.dart';
+import 'io_sink.dart';
+import 'web_socket.dart';
-/// An implementation of the WebSocket protocol that's not specific to "dart:io"
-/// or to any particular HTTP API.
-///
-/// Because this is HTTP-API-agnostic, it doesn't handle the initial [WebSocket
-/// handshake][]. This needs to be handled manually by the user of the code.
-/// Once that's been done, [new CompatibleWebSocket] can be called with the
-/// underlying socket and it will handle the remainder of the protocol.
-///
-/// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4
-abstract class CompatibleWebSocket implements Stream, StreamSink {
- /// The interval for sending ping signals.
- ///
- /// If a ping message is not answered by a pong message from the peer, the
- /// `WebSocket` is assumed disconnected and the connection is closed with a
- /// [WebSocketStatus.GOING_AWAY] close code. When a ping signal is sent, the
- /// pong message must be received within [pingInterval].
- ///
- /// There are never two outstanding pings at any given time, and the next ping
- /// timer starts when the pong is received.
- ///
- /// By default, the [pingInterval] is `null`, indicating that ping messages
- /// are disabled.
- Duration pingInterval;
-
- /// The [close code][] set when the WebSocket connection is closed.
- ///
- /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5
- ///
- /// Before the connection has been closed, this will be `null`.
- int get closeCode;
-
- /// The [close reason][] set when the WebSocket connection is closed.
- ///
- /// [close reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6
- ///
- /// Before the connection has been closed, this will be `null`.
- String get closeReason;
-
- /// Signs a `Sec-WebSocket-Key` header sent by a WebSocket client as part of
- /// the [initial handshake].
- ///
- /// The return value should be sent back to the client in a
- /// `Sec-WebSocket-Accept` header.
- ///
- /// [initial handshake]: https://tools.ietf.org/html/rfc6455#section-4.2.2
- static String signKey(String key) {
- var hash = new SHA1();
- // We use [codeUnits] here rather than UTF-8-decoding the string because
- // [key] is expected to be base64 encoded, and so will be pure ASCII.
- hash.add((key + _webSocketGUID).codeUnits);
- return CryptoUtils.bytesToBase64(hash.close());
- }
-
- /// Creates a new WebSocket handling messaging across an existing socket.
- ///
- /// Because this is HTTP-API-agnostic, the initial [WebSocket handshake][]
- /// must have already been completed on the socket before this is called.
- ///
- /// If [stream] is also a [StreamSink] (for example, if it's a "dart:io"
- /// `Socket`), it will be used for both sending and receiving data. Otherwise,
- /// it will be used for receiving data and [sink] will be used for sending it.
- ///
- /// If this is a WebSocket server, [serverSide] should be `true` (the
- /// default); if it's a client, [serverSide] should be `false`.
- ///
- /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4
- factory CompatibleWebSocket(Stream<List<int>> stream,
- {StreamSink<List<int>> sink, bool serverSide: true}) {
- if (sink == null) {
- if (stream is! StreamSink) {
- throw new ArgumentError("If stream isn't also a StreamSink, sink must "
- "be passed explicitly.");
- }
- sink = stream as StreamSink;
- }
-
- return new _WebSocketImpl._fromSocket(stream, sink, serverSide);
- }
-
- /// Closes the web socket connection.
- ///
- /// [closeCode] and [closeReason] are the [close code][] and [reason][] sent
- /// to the remote peer, respectively. If they are omitted, the peer will see
- /// a "no status received" code with no reason.
- ///
- /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5
- /// [reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6
- Future close([int closeCode, String closeReason]);
-}
-
-/// An exception thrown by [CompatibleWebSocket].
-class CompatibleWebSocketException implements Exception {
- final String message;
-
- CompatibleWebSocketException([this.message]);
-
- String toString() => message == null
- ? "CompatibleWebSocketException" :
- "CompatibleWebSocketException: $message";
-}
-
-// The following code is copied from sdk/lib/io/websocket_impl.dart. The
-// "dart:io" implementation isn't used directly both to support non-"dart:io"
-// applications, and because it's incompatible with non-"dart:io" HTTP requests
-// (issue 18172).
-//
-// Because it's copied directly, only modifications necessary to support the
-// desired public API and to remove "dart:io" dependencies have been made.
-
-/**
- * Web socket status codes used when closing a web socket connection.
- */
-abstract class _WebSocketStatus {
- static const int NORMAL_CLOSURE = 1000;
- static const int GOING_AWAY = 1001;
- static const int PROTOCOL_ERROR = 1002;
- static const int UNSUPPORTED_DATA = 1003;
- static const int RESERVED_1004 = 1004;
- static const int NO_STATUS_RECEIVED = 1005;
- static const int ABNORMAL_CLOSURE = 1006;
- static const int INVALID_FRAME_PAYLOAD_DATA = 1007;
- static const int POLICY_VIOLATION = 1008;
- static const int MESSAGE_TOO_BIG = 1009;
- static const int MISSING_MANDATORY_EXTENSION = 1010;
- static const int INTERNAL_SERVER_ERROR = 1011;
- static const int RESERVED_1015 = 1015;
-}
-
-abstract class _WebSocketState {
- static const int CONNECTING = 0;
- static const int OPEN = 1;
- static const int CLOSING = 2;
- static const int CLOSED = 3;
-}
-
-const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
+const String webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
final _random = new Random();
@@ -208,7 +83,7 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
int _remainingPayloadBytes = -1;
int _unmaskingIndex = 0;
int _currentMessageType = _WebSocketMessageType.NONE;
- int closeCode = _WebSocketStatus.NO_STATUS_RECEIVED;
+ int closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
String closeReason = "";
EventSink _eventSink;
@@ -448,14 +323,14 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
void _controlFrameEnd() {
switch (_opcode) {
case _WebSocketOpcode.CLOSE:
- closeCode = _WebSocketStatus.NO_STATUS_RECEIVED;
+ closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
var payload = _payload.takeBytes();
if (payload.length > 0) {
if (payload.length == 1) {
throw new CompatibleWebSocketException("Protocol error");
}
closeCode = payload[0] << 8 | payload[1];
- if (closeCode == _WebSocketStatus.NO_STATUS_RECEIVED) {
+ if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) {
throw new CompatibleWebSocketException("Protocol error");
}
if (payload.length > 2) {
@@ -509,7 +384,7 @@ class _WebSocketPong {
// TODO(ajohnsen): Make this transformer reusable.
class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
- final _WebSocketImpl webSocket;
+ final WebSocketImpl webSocket;
EventSink _eventSink;
_WebSocketOutgoingTransformer(this.webSocket);
@@ -663,7 +538,7 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
class _WebSocketConsumer implements StreamConsumer {
- final _WebSocketImpl webSocket;
+ final WebSocketImpl webSocket;
final StreamSink<List<int>> sink;
StreamController _controller;
StreamSubscription _subscription;
@@ -785,13 +660,19 @@ class _WebSocketConsumer implements StreamConsumer {
}
-class _WebSocketImpl extends Stream implements CompatibleWebSocket {
+class WebSocketImpl extends Stream with _ServiceObject
+ implements CompatibleWebSocket {
+ // Use default Map so we keep order.
+ static Map<int, WebSocketImpl> _webSockets = new Map<int, WebSocketImpl>();
+
+ final String protocol;
+
StreamController _controller;
StreamSubscription _subscription;
- StreamController _sink;
+ StreamSink _sink;
final bool _serverSide;
- int _readyState = _WebSocketState.CONNECTING;
+ int _readyState = WebSocket.CONNECTING;
bool _writeClosed = false;
int _closeCode;
String _closeReason;
@@ -803,12 +684,11 @@ class _WebSocketImpl extends Stream implements CompatibleWebSocket {
String _outCloseReason;
Timer _closeTimer;
- _WebSocketImpl._fromSocket(Stream<List<int>> stream,
- StreamSink<List<int>> sink, [this._serverSide = false]) {
+ WebSocketImpl.fromSocket(Stream<List<int>> stream,
+ StreamSink<List<int>> sink, this.protocol, [this._serverSide = false]) {
_consumer = new _WebSocketConsumer(this, sink);
- _sink = new StreamController();
- _sink.stream.pipe(_consumer);
- _readyState = _WebSocketState.OPEN;
+ _sink = new StreamSinkImpl(_consumer);
+ _readyState = WebSocket.OPEN;
var transformer = new _WebSocketProtocolTransformer(_serverSide);
_subscription = stream.transform(transformer).listen(
@@ -825,23 +705,27 @@ class _WebSocketImpl extends Stream implements CompatibleWebSocket {
onError: (error) {
if (_closeTimer != null) _closeTimer.cancel();
if (error is FormatException) {
- _close(_WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
+ _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
} else {
- _close(_WebSocketStatus.PROTOCOL_ERROR);
+ _close(WebSocketStatus.PROTOCOL_ERROR);
}
+ // An error happened, set the close code set above.
+ _closeCode = _outCloseCode;
+ _closeReason = _outCloseReason;
_controller.close();
},
onDone: () {
if (_closeTimer != null) _closeTimer.cancel();
- if (_readyState == _WebSocketState.OPEN) {
- _readyState = _WebSocketState.CLOSING;
+ if (_readyState == WebSocket.OPEN) {
+ _readyState = WebSocket.CLOSING;
if (!_isReservedStatusCode(transformer.closeCode)) {
_close(transformer.closeCode);
} else {
_close();
}
- _readyState = _WebSocketState.CLOSED;
+ _readyState = WebSocket.CLOSED;
}
+ // Protocol close, use close code from transformer.
_closeCode = transformer.closeCode;
_closeReason = transformer.closeReason;
_controller.close();
@@ -849,9 +733,15 @@ class _WebSocketImpl extends Stream implements CompatibleWebSocket {
cancelOnError: true);
_subscription.pause();
_controller = new StreamController(sync: true,
- onListen: _subscription.resume,
+ onListen: () => _subscription.resume(),
+ onCancel: () {
+ _subscription.cancel();
+ _subscription = null;
+ },
onPause: _subscription.pause,
onResume: _subscription.resume);
+
+ _webSockets[_serviceId] = this;
}
StreamSubscription listen(void onData(message),
@@ -878,11 +768,14 @@ class _WebSocketImpl extends Stream implements CompatibleWebSocket {
_consumer.add(new _WebSocketPing());
_pingTimer = new Timer(_pingInterval, () {
// No pong received.
- _close(_WebSocketStatus.GOING_AWAY);
+ _close(WebSocketStatus.GOING_AWAY);
});
});
}
+ int get readyState => _readyState;
+
+ String get extensions => null;
int get closeCode => _closeCode;
String get closeReason => _closeReason;
@@ -900,12 +793,26 @@ class _WebSocketImpl extends Stream implements CompatibleWebSocket {
_outCloseCode = code;
_outCloseReason = reason;
}
- if (_closeTimer == null && !_controller.isClosed) {
- // When closing the web-socket, we no longer accept data.
- _closeTimer = new Timer(const Duration(seconds: 5), () {
- _subscription.cancel();
- _controller.close();
- });
+ if (!_controller.isClosed) {
+ // If a close has not yet been received from the other end then
+ // 1) make sure to listen on the stream so the close frame will be
+ // processed if received.
+ // 2) set a timer terminate the connection if a close frame is
+ // not received.
+ if (!_controller.hasListener && _subscription != null) {
+ _controller.stream.drain().catchError((_) => {});
+ }
+ if (_closeTimer == null) {
+ // When closing the web-socket, we no longer accept data.
+ _closeTimer = new Timer(const Duration(seconds: 5), () {
+ // Reuse code and reason from the local close.
+ _closeCode = _outCloseCode;
+ _closeReason = _outCloseReason;
+ if (_subscription != null) _subscription.cancel();
+ _controller.close();
+ _webSockets.remove(_serviceId);
+ });
+ }
}
return _sink.close();
}
@@ -918,18 +825,39 @@ class _WebSocketImpl extends Stream implements CompatibleWebSocket {
}
_writeClosed = true;
_consumer.closeSocket();
+ _webSockets.remove(_serviceId);
}
+ // The _toJSON, _serviceTypePath, and _serviceTypeName methods
+ // have been deleted for http_parser. The methods were unused in WebSocket
+ // code and produced warnings.
+
static bool _isReservedStatusCode(int code) {
return code != null &&
- (code < _WebSocketStatus.NORMAL_CLOSURE ||
- code == _WebSocketStatus.RESERVED_1004 ||
- code == _WebSocketStatus.NO_STATUS_RECEIVED ||
- code == _WebSocketStatus.ABNORMAL_CLOSURE ||
- (code > _WebSocketStatus.INTERNAL_SERVER_ERROR &&
- code < _WebSocketStatus.RESERVED_1015) ||
- (code >= _WebSocketStatus.RESERVED_1015 &&
+ (code < WebSocketStatus.NORMAL_CLOSURE ||
+ code == WebSocketStatus.RESERVED_1004 ||
+ code == WebSocketStatus.NO_STATUS_RECEIVED ||
+ code == WebSocketStatus.ABNORMAL_CLOSURE ||
+ (code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
+ code < WebSocketStatus.RESERVED_1015) ||
+ (code >= WebSocketStatus.RESERVED_1015 &&
code < 3000));
}
}
+// The following code is from sdk/lib/io/service_object.dart.
+
+int _nextServiceId = 1;
+
+// TODO(ajohnsen): Use other way of getting a uniq id.
+abstract class _ServiceObject {
+ int __serviceId = 0;
+ int get _serviceId {
+ if (__serviceId == 0) __serviceId = _nextServiceId++;
+ return __serviceId;
+ }
+
+ // The _toJSON, _servicePath, _serviceTypePath, _serviceTypeName, and
+ // _serviceType methods have been deleted for http_parser. The methods were
+ // unused in WebSocket code and produced warnings.
+}
« no previous file with comments | « lib/src/copy/web_socket.dart ('k') | lib/src/web_socket.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698