Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1523)

Unified Diff: lib/src/copy/web_socket_impl.dart

Issue 1947683006: Bring in the latest version of the SDK's WebSocket impl. (Closed) Base URL: git@github.com:dart-lang/web_socket_channel.git@master
Patch Set: Merge again Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « lib/src/copy/web_socket.dart ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: lib/src/copy/web_socket_impl.dart
diff --git a/lib/src/copy/web_socket_impl.dart b/lib/src/copy/web_socket_impl.dart
index c10d8e8926694d5fbad0bbc9e01cbcb344874f4e..38db9e3d604225d3d1d5d3d1b56a0b8cbfa927ae 100644
--- a/lib/src/copy/web_socket_impl.dart
+++ b/lib/src/copy/web_socket_impl.dart
@@ -10,7 +10,8 @@
// desired public API and to remove "dart:io" dependencies have been made.
//
// This is up-to-date as of sdk revision
-// 86227840d75d974feb238f8b3c59c038b99c05cf.
+// e41fb4cafd6052157dbc1490d437045240f4773f.
+
import 'dart:async';
import 'dart:convert';
import 'dart:math';
@@ -22,6 +23,11 @@ import 'io_sink.dart';
import 'web_socket.dart';
const String webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
+const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
+const String _clientNoContextTakeover = "client_no_context_takeover";
+const String _serverNoContextTakeover = "server_no_context_takeover";
+const String _clientMaxWindowBits = "client_max_window_bits";
+const String _serverMaxWindowBits = "server_max_window_bits";
final _random = new Random();
@@ -32,7 +38,6 @@ class _WebSocketMessageType {
static const int BINARY = 2;
}
-
class _WebSocketOpcode {
static const int CONTINUATION = 0;
static const int TEXT = 1;
@@ -53,16 +58,29 @@ class _WebSocketOpcode {
}
/**
+ * Stores the header and integer value derived from negotiation of
+ * client_max_window_bits and server_max_window_bits. headerValue will be
+ * set in the Websocket response headers.
+ */
+class _CompressionMaxWindowBits {
+ String headerValue;
+ int maxWindowBits;
+ _CompressionMaxWindowBits([this.headerValue, this.maxWindowBits]);
+ String toString() => headerValue;
+}
+
+/**
* The web socket protocol transformer handles the protocol byte stream
* which is supplied through the [:handleData:]. As the protocol is processed,
* it'll output frame data as either a List<int> or String.
*
- * Important infomation about usage: Be sure you use cancelOnError, so the
- * socket will be closed when the processer encounter an error. Not using it
+ * Important information about usage: Be sure you use cancelOnError, so the
+ * socket will be closed when the processor encounter an error. Not using it
* will lead to undefined behaviour.
*/
// TODO(ajohnsen): make this transformer reusable?
-class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
+class _WebSocketProtocolTransformer
+ implements StreamTransformer<List<int>, dynamic>, EventSink<List<int>> {
static const int START = 0;
static const int LEN_FIRST = 1;
static const int LEN_REST = 2;
@@ -70,9 +88,15 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
static const int PAYLOAD = 4;
static const int CLOSED = 5;
static const int FAILURE = 6;
+ static const int FIN = 0x80;
+ static const int RSV1 = 0x40;
+ static const int RSV2 = 0x20;
+ static const int RSV3 = 0x10;
+ static const int OPCODE = 0xF;
int _state = START;
bool _fin = false;
+ bool _compressed = false;
int _opcode = -1;
int _len = -1;
bool _masked = false;
@@ -93,29 +117,28 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
_WebSocketProtocolTransformer([this._serverSide = false]);
Stream bind(Stream stream) {
- return new Stream.eventTransformed(
- stream,
- (EventSink eventSink) {
- if (_eventSink != null) {
- throw new StateError("WebSocket transformer already used.");
- }
- _eventSink = eventSink;
- return this;
- });
+ return new Stream.eventTransformed(stream, (EventSink eventSink) {
+ if (_eventSink != null) {
+ throw new StateError("WebSocket transformer already used.");
+ }
+ _eventSink = eventSink;
+ return this;
+ });
}
- void addError(Object error, [StackTrace stackTrace]) =>
- _eventSink.addError(error, stackTrace);
+ void addError(Object error, [StackTrace stackTrace]) {
+ _eventSink.addError(error, stackTrace);
+ }
- void close() => _eventSink.close();
+ void close() { _eventSink.close(); }
/**
* Process data received from the underlying communication channel.
*/
- void add(Uint8List buffer) {
- int count = buffer.length;
+ void add(List<int> bytes) {
+ var buffer = bytes is Uint8List ? bytes : new Uint8List.fromList(bytes);
int index = 0;
- int lastIndex = count;
+ int lastIndex = buffer.length;
if (_state == CLOSED) {
throw new WebSocketChannelException("Data on closed connection");
}
@@ -126,12 +149,23 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
int byte = buffer[index];
if (_state <= LEN_REST) {
if (_state == START) {
- _fin = (byte & 0x80) != 0;
- if ((byte & 0x70) != 0) {
- // The RSV1, RSV2 bits RSV3 must be all zero.
+ _fin = (byte & FIN) != 0;
+
+ if((byte & (RSV2 | RSV3)) != 0) {
+ // The RSV2, RSV3 bits must both be zero.
throw new WebSocketChannelException("Protocol error");
}
- _opcode = (byte & 0xF);
+
+ _opcode = (byte & OPCODE);
+
+ if (_opcode != _WebSocketOpcode.CONTINUATION) {
+ if ((byte & RSV1) != 0) {
+ _compressed = true;
+ } else {
+ _compressed = false;
+ }
+ }
+
if (_opcode <= _WebSocketOpcode.BINARY) {
if (_opcode == _WebSocketOpcode.CONTINUATION) {
if (_currentMessageType == _WebSocketMessageType.NONE) {
@@ -139,14 +173,14 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
}
} else {
assert(_opcode == _WebSocketOpcode.TEXT ||
- _opcode == _WebSocketOpcode.BINARY);
+ _opcode == _WebSocketOpcode.BINARY);
if (_currentMessageType != _WebSocketMessageType.NONE) {
throw new WebSocketChannelException("Protocol error");
}
_currentMessageType = _opcode;
}
} else if (_opcode >= _WebSocketOpcode.CLOSE &&
- _opcode <= _WebSocketOpcode.PONG) {
+ _opcode <= _WebSocketOpcode.PONG) {
// Control frames cannot be fragmented.
if (!_fin) throw new WebSocketChannelException("Protocol error");
} else {
@@ -195,15 +229,14 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
_unmask(index, payloadLength, buffer);
}
// Control frame and data frame share _payloads.
- _payload.add(
- new Uint8List.view(buffer.buffer, index, payloadLength));
+ _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength));
index += payloadLength;
if (_isControlFrame()) {
if (_remainingPayloadBytes == 0) _controlFrameEnd();
} else {
if (_currentMessageType != _WebSocketMessageType.TEXT &&
_currentMessageType != _WebSocketMessageType.BINARY) {
- throw new WebSocketChannelException("Protocol error");
+ throw new WebSocketChannelException("Protocol error");
}
if (_remainingPayloadBytes == 0) _messageFrameEnd();
}
@@ -238,8 +271,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3];
}
Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
- Int32x4List blockBuffer = new Int32x4List.view(
- buffer.buffer, index, blockCount);
+ Int32x4List blockBuffer =
+ new Int32x4List.view(buffer.buffer, index, blockCount);
for (int i = 0; i < blockBuffer.length; i++) {
blockBuffer[i] ^= blockMask;
}
@@ -305,12 +338,14 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
void _messageFrameEnd() {
if (_fin) {
+ var bytes = _payload.takeBytes();
+
switch (_currentMessageType) {
case _WebSocketMessageType.TEXT:
- _eventSink.add(UTF8.decode(_payload.takeBytes()));
+ _eventSink.add(UTF8.decode(bytes));
break;
case _WebSocketMessageType.BINARY:
- _eventSink.add(_payload.takeBytes());
+ _eventSink.add(bytes);
break;
}
_currentMessageType = _WebSocketMessageType.NONE;
@@ -352,8 +387,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
bool _isControlFrame() {
return _opcode == _WebSocketOpcode.CLOSE ||
- _opcode == _WebSocketOpcode.PING ||
- _opcode == _WebSocketOpcode.PONG;
+ _opcode == _WebSocketOpcode.PING ||
+ _opcode == _WebSocketOpcode.PONG;
}
void _prepareForNextFrame() {
@@ -368,35 +403,32 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
}
}
-
class _WebSocketPing {
final List<int> payload;
_WebSocketPing([this.payload = null]);
}
-
class _WebSocketPong {
final List<int> payload;
_WebSocketPong([this.payload = null]);
}
// TODO(ajohnsen): Make this transformer reusable.
-class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
+class _WebSocketOutgoingTransformer
+ implements StreamTransformer<dynamic, List<int>>, EventSink {
final WebSocketImpl webSocket;
- EventSink _eventSink;
+ EventSink<List<int>> _eventSink;
_WebSocketOutgoingTransformer(this.webSocket);
- Stream bind(Stream stream) {
- return new Stream.eventTransformed(
- stream,
- (EventSink eventSink) {
- if (_eventSink != null) {
- throw new StateError("WebSocket transformer already used");
- }
- _eventSink = eventSink;
- return this;
- });
+ Stream<List<int>> bind(Stream stream) {
+ return new Stream.eventTransformed(stream, (eventSink) {
+ if (_eventSink != null) {
+ throw new StateError("WebSocket transformer already used");
+ }
+ _eventSink = eventSink;
+ return this;
+ });
}
void add(message) {
@@ -415,11 +447,12 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
opcode = _WebSocketOpcode.TEXT;
data = UTF8.encode(message);
} else {
- if (message is !List<int>) {
+ if (message is List<int>) {
+ data = message;
+ opcode = _WebSocketOpcode.BINARY;
+ } else {
throw new ArgumentError(message);
}
- opcode = _WebSocketOpcode.BINARY;
- data = message;
}
} else {
opcode = _WebSocketOpcode.TEXT;
@@ -427,8 +460,9 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
addFrame(opcode, data);
}
- void addError(Object error, [StackTrace stackTrace]) =>
- _eventSink.addError(error, stackTrace);
+ void addError(Object error, [StackTrace stackTrace]) {
+ _eventSink.addError(error, stackTrace);
+ }
void close() {
int code = webSocket._outCloseCode;
@@ -446,11 +480,17 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
_eventSink.close();
}
- void addFrame(int opcode, List<int> data) =>
- createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add);
+ void addFrame(int opcode, List<int> data) => createFrame(
+ opcode,
+ data,
+ webSocket._serverSide,
+ false).forEach((e) {
+ _eventSink.add(e);
+ });
- static Iterable createFrame(int opcode, List<int> data, bool serverSide) {
- bool mask = !serverSide; // Masking not implemented for server.
+ static Iterable<List<int>> createFrame(
+ int opcode, List<int> data, bool serverSide, bool compressed) {
+ bool mask = !serverSide; // Masking not implemented for server.
int dataLength = data == null ? 0 : data.length;
// Determine the header size.
int headerSize = (mask) ? 6 : 2;
@@ -461,8 +501,13 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
}
Uint8List header = new Uint8List(headerSize);
int index = 0;
+
// Set FIN and opcode.
- header[index++] = 0x80 | opcode;
+ var hoc = _WebSocketProtocolTransformer.FIN
+ | (compressed ? _WebSocketProtocolTransformer.RSV1 : 0)
+ | (opcode & _WebSocketProtocolTransformer.OPCODE);
+
+ header[index++] = hoc;
// Determine size and position of length field.
int lengthBytes = 1;
if (dataLength > 65535) {
@@ -495,8 +540,7 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
list = new Uint8List(data.length);
for (int i = 0; i < data.length; i++) {
if (data[i] < 0 || 255 < data[i]) {
- throw new ArgumentError(
- "List element is not a byte value "
+ throw new ArgumentError("List element is not a byte value "
"(value ${data[i]} at index $i)");
}
list[i] = data[i];
@@ -512,8 +556,8 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
mask = (mask << 8) | maskBytes[i];
}
Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
- Int32x4List blockBuffer = new Int32x4List.view(
- list.buffer, 0, blockCount);
+ Int32x4List blockBuffer =
+ new Int32x4List.view(list.buffer, 0, blockCount);
for (int i = 0; i < blockBuffer.length; i++) {
blockBuffer[i] ^= blockMask;
}
@@ -534,7 +578,6 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
}
}
-
class _WebSocketConsumer implements StreamConsumer {
final WebSocketImpl webSocket;
final StreamSink<List<int>> sink;
@@ -579,28 +622,28 @@ class _WebSocketConsumer implements StreamConsumer {
_ensureController() {
if (_controller != null) return;
- _controller = new StreamController(sync: true,
- onPause: _onPause,
- onResume: _onResume,
- onCancel: _onListen);
- var stream = _controller.stream.transform(
- new _WebSocketOutgoingTransformer(webSocket));
- sink.addStream(stream)
- .then((_) {
- _done();
- _closeCompleter.complete(webSocket);
- }, onError: (error, StackTrace stackTrace) {
- _closed = true;
- _cancel();
- if (error is ArgumentError) {
- if (!_done(error, stackTrace)) {
- _closeCompleter.completeError(error, stackTrace);
- }
- } else {
- _done();
- _closeCompleter.complete(webSocket);
- }
- });
+ _controller = new StreamController(
+ sync: true,
+ onPause: _onPause,
+ onResume: _onResume,
+ onCancel: _onListen);
+ var stream = _controller.stream
+ .transform(new _WebSocketOutgoingTransformer(webSocket));
+ sink.addStream(stream).then((_) {
+ _done();
+ _closeCompleter.complete(webSocket);
+ }, onError: (error, StackTrace stackTrace) {
+ _closed = true;
+ _cancel();
+ if (error is ArgumentError) {
+ if (!_done(error, stackTrace)) {
+ _closeCompleter.completeError(error, stackTrace);
+ }
+ } else {
+ _done();
+ _closeCompleter.complete(webSocket);
+ }
+ });
}
bool _done([error, StackTrace stackTrace]) {
@@ -621,13 +664,9 @@ class _WebSocketConsumer implements StreamConsumer {
}
_ensureController();
_completer = new Completer();
- _subscription = stream.listen(
- (data) {
- _controller.add(data);
- },
- onDone: _done,
- onError: _done,
- cancelOnError: true);
+ _subscription = stream.listen((data) {
+ _controller.add(data);
+ }, onDone: _done, onError: _done, cancelOnError: true);
if (_issuedPause) {
_subscription.pause();
_issuedPause = false;
@@ -657,10 +696,11 @@ class _WebSocketConsumer implements StreamConsumer {
}
}
-
class WebSocketImpl extends Stream with _ServiceObject implements StreamSink {
// Use default Map so we keep order.
static Map<int, WebSocketImpl> _webSockets = new Map<int, WebSocketImpl>();
+ static const int DEFAULT_WINDOW_BITS = 15;
+ static const String PER_MESSAGE_DEFLATE = "permessage-deflate";
final String protocol;
@@ -681,74 +721,64 @@ class WebSocketImpl extends Stream with _ServiceObject implements StreamSink {
String _outCloseReason;
Timer _closeTimer;
- WebSocketImpl.fromSocket(Stream<List<int>> stream,
- StreamSink<List<int>> sink, this.protocol, [this._serverSide = false]) {
+ WebSocketImpl.fromSocket(
+ Stream<List<int>> stream, StreamSink<List<int>> sink, this.protocol,
+ [this._serverSide = false]) {
_consumer = new _WebSocketConsumer(this, sink);
_sink = new StreamSinkImpl(_consumer);
_readyState = WebSocket.OPEN;
var transformer = new _WebSocketProtocolTransformer(_serverSide);
- _subscription = stream.transform(transformer).listen(
- (data) {
- if (data is _WebSocketPing) {
- if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
- } else if (data is _WebSocketPong) {
- // Simply set pingInterval, as it'll cancel any timers.
- pingInterval = _pingInterval;
- } else {
- _controller.add(data);
- }
- },
- onError: (error) {
- if (_closeTimer != null) _closeTimer.cancel();
- if (error is FormatException) {
- _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
- } else {
- _close(WebSocketStatus.PROTOCOL_ERROR);
- }
- // An error happened, set the close code set above.
- _closeCode = _outCloseCode;
- _closeReason = _outCloseReason;
- _controller.close();
- },
- onDone: () {
- if (_closeTimer != null) _closeTimer.cancel();
- if (_readyState == WebSocket.OPEN) {
- _readyState = WebSocket.CLOSING;
- if (!_isReservedStatusCode(transformer.closeCode)) {
- _close(transformer.closeCode);
- } else {
- _close();
- }
- _readyState = WebSocket.CLOSED;
- }
- // Protocol close, use close code from transformer.
- _closeCode = transformer.closeCode;
- _closeReason = transformer.closeReason;
- _controller.close();
- },
- cancelOnError: true);
+ _subscription = stream.transform(transformer).listen((data) {
+ if (data is _WebSocketPing) {
+ if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
+ } else if (data is _WebSocketPong) {
+ // Simply set pingInterval, as it'll cancel any timers.
+ pingInterval = _pingInterval;
+ } else {
+ _controller.add(data);
+ }
+ }, onError: (error, stackTrace) {
+ if (_closeTimer != null) _closeTimer.cancel();
+ if (error is FormatException) {
+ _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
+ } else {
+ _close(WebSocketStatus.PROTOCOL_ERROR);
+ }
+ // An error happened, set the close code set above.
+ _closeCode = _outCloseCode;
+ _closeReason = _outCloseReason;
+ _controller.close();
+ }, onDone: () {
+ if (_closeTimer != null) _closeTimer.cancel();
+ if (_readyState == WebSocket.OPEN) {
+ _readyState = WebSocket.CLOSING;
+ if (!_isReservedStatusCode(transformer.closeCode)) {
+ _close(transformer.closeCode, transformer.closeReason);
+ } else {
+ _close();
+ }
+ _readyState = WebSocket.CLOSED;
+ }
+ // Protocol close, use close code from transformer.
+ _closeCode = transformer.closeCode;
+ _closeReason = transformer.closeReason;
+ _controller.close();
+ }, cancelOnError: true);
_subscription.pause();
- _controller = new StreamController(sync: true,
- onListen: () => _subscription.resume(),
- onCancel: () {
- _subscription.cancel();
- _subscription = null;
- },
- onPause: _subscription.pause,
- onResume: _subscription.resume);
+ _controller = new StreamController(
+ sync: true, onListen: () => _subscription.resume(), onCancel: () {
+ _subscription.cancel();
+ _subscription = null;
+ }, onPause: _subscription.pause, onResume: _subscription.resume);
_webSockets[_serviceId] = this;
}
StreamSubscription listen(void onData(message),
- {Function onError,
- void onDone(),
- bool cancelOnError}) {
+ {Function onError, void onDone(), bool cancelOnError}) {
return _controller.stream.listen(onData,
- onError: onError,
- onDone: onDone,
- cancelOnError: cancelOnError);
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
Duration get pingInterval => _pingInterval;
@@ -776,9 +806,10 @@ class WebSocketImpl extends Stream with _ServiceObject implements StreamSink {
int get closeCode => _closeCode;
String get closeReason => _closeReason;
- void add(data) => _sink.add(data);
- void addError(error, [StackTrace stackTrace]) =>
- _sink.addError(error, stackTrace);
+ void add(data) { _sink.add(data); }
+ void addError(error, [StackTrace stackTrace]) {
+ _sink.addError(error, stackTrace);
+ }
Future addStream(Stream stream) => _sink.addStream(stream);
Future get done => _sink.done;
@@ -825,20 +856,19 @@ class WebSocketImpl extends Stream with _ServiceObject implements StreamSink {
_webSockets.remove(_serviceId);
}
- // The _toJSON, _serviceTypePath, and _serviceTypeName methods
- // have been deleted for http_parser. The methods were unused in WebSocket
- // code and produced warnings.
+ // The _toJSON, _serviceTypePath, and _serviceTypeName methods have been
+ // deleted for web_socket_channel. The methods were unused in WebSocket code
+ // and produced warnings.
static bool _isReservedStatusCode(int code) {
return code != null &&
- (code < WebSocketStatus.NORMAL_CLOSURE ||
+ (code < WebSocketStatus.NORMAL_CLOSURE ||
code == WebSocketStatus.RESERVED_1004 ||
code == WebSocketStatus.NO_STATUS_RECEIVED ||
code == WebSocketStatus.ABNORMAL_CLOSURE ||
(code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
- code < WebSocketStatus.RESERVED_1015) ||
- (code >= WebSocketStatus.RESERVED_1015 &&
- code < 3000));
+ code < WebSocketStatus.RESERVED_1015) ||
+ (code >= WebSocketStatus.RESERVED_1015 && code < 3000));
}
}
« no previous file with comments | « lib/src/copy/web_socket.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698