Index: lib/src/copy/web_socket_impl.dart |
diff --git a/lib/src/copy/web_socket_impl.dart b/lib/src/copy/web_socket_impl.dart |
index c10d8e8926694d5fbad0bbc9e01cbcb344874f4e..38db9e3d604225d3d1d5d3d1b56a0b8cbfa927ae 100644 |
--- a/lib/src/copy/web_socket_impl.dart |
+++ b/lib/src/copy/web_socket_impl.dart |
@@ -10,7 +10,8 @@ |
// desired public API and to remove "dart:io" dependencies have been made. |
// |
// This is up-to-date as of sdk revision |
-// 86227840d75d974feb238f8b3c59c038b99c05cf. |
+// e41fb4cafd6052157dbc1490d437045240f4773f. |
+ |
import 'dart:async'; |
import 'dart:convert'; |
import 'dart:math'; |
@@ -22,6 +23,11 @@ import 'io_sink.dart'; |
import 'web_socket.dart'; |
const String webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; |
+const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; |
+const String _clientNoContextTakeover = "client_no_context_takeover"; |
+const String _serverNoContextTakeover = "server_no_context_takeover"; |
+const String _clientMaxWindowBits = "client_max_window_bits"; |
+const String _serverMaxWindowBits = "server_max_window_bits"; |
final _random = new Random(); |
@@ -32,7 +38,6 @@ class _WebSocketMessageType { |
static const int BINARY = 2; |
} |
- |
class _WebSocketOpcode { |
static const int CONTINUATION = 0; |
static const int TEXT = 1; |
@@ -53,16 +58,29 @@ class _WebSocketOpcode { |
} |
/** |
+ * Stores the header and integer value derived from negotiation of |
+ * client_max_window_bits and server_max_window_bits. headerValue will be |
+ * set in the Websocket response headers. |
+ */ |
+class _CompressionMaxWindowBits { |
+ String headerValue; |
+ int maxWindowBits; |
+ _CompressionMaxWindowBits([this.headerValue, this.maxWindowBits]); |
+ String toString() => headerValue; |
+} |
+ |
+/** |
* The web socket protocol transformer handles the protocol byte stream |
* which is supplied through the [:handleData:]. As the protocol is processed, |
* it'll output frame data as either a List<int> or String. |
* |
- * Important infomation about usage: Be sure you use cancelOnError, so the |
- * socket will be closed when the processer encounter an error. Not using it |
+ * Important information about usage: Be sure you use cancelOnError, so the |
+ * socket will be closed when the processor encounter an error. Not using it |
* will lead to undefined behaviour. |
*/ |
// TODO(ajohnsen): make this transformer reusable? |
-class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
+class _WebSocketProtocolTransformer |
+ implements StreamTransformer<List<int>, dynamic>, EventSink<List<int>> { |
static const int START = 0; |
static const int LEN_FIRST = 1; |
static const int LEN_REST = 2; |
@@ -70,9 +88,15 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
static const int PAYLOAD = 4; |
static const int CLOSED = 5; |
static const int FAILURE = 6; |
+ static const int FIN = 0x80; |
+ static const int RSV1 = 0x40; |
+ static const int RSV2 = 0x20; |
+ static const int RSV3 = 0x10; |
+ static const int OPCODE = 0xF; |
int _state = START; |
bool _fin = false; |
+ bool _compressed = false; |
int _opcode = -1; |
int _len = -1; |
bool _masked = false; |
@@ -93,29 +117,28 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
_WebSocketProtocolTransformer([this._serverSide = false]); |
Stream bind(Stream stream) { |
- return new Stream.eventTransformed( |
- stream, |
- (EventSink eventSink) { |
- if (_eventSink != null) { |
- throw new StateError("WebSocket transformer already used."); |
- } |
- _eventSink = eventSink; |
- return this; |
- }); |
+ return new Stream.eventTransformed(stream, (EventSink eventSink) { |
+ if (_eventSink != null) { |
+ throw new StateError("WebSocket transformer already used."); |
+ } |
+ _eventSink = eventSink; |
+ return this; |
+ }); |
} |
- void addError(Object error, [StackTrace stackTrace]) => |
- _eventSink.addError(error, stackTrace); |
+ void addError(Object error, [StackTrace stackTrace]) { |
+ _eventSink.addError(error, stackTrace); |
+ } |
- void close() => _eventSink.close(); |
+ void close() { _eventSink.close(); } |
/** |
* Process data received from the underlying communication channel. |
*/ |
- void add(Uint8List buffer) { |
- int count = buffer.length; |
+ void add(List<int> bytes) { |
+ var buffer = bytes is Uint8List ? bytes : new Uint8List.fromList(bytes); |
int index = 0; |
- int lastIndex = count; |
+ int lastIndex = buffer.length; |
if (_state == CLOSED) { |
throw new WebSocketChannelException("Data on closed connection"); |
} |
@@ -126,12 +149,23 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
int byte = buffer[index]; |
if (_state <= LEN_REST) { |
if (_state == START) { |
- _fin = (byte & 0x80) != 0; |
- if ((byte & 0x70) != 0) { |
- // The RSV1, RSV2 bits RSV3 must be all zero. |
+ _fin = (byte & FIN) != 0; |
+ |
+ if((byte & (RSV2 | RSV3)) != 0) { |
+ // The RSV2, RSV3 bits must both be zero. |
throw new WebSocketChannelException("Protocol error"); |
} |
- _opcode = (byte & 0xF); |
+ |
+ _opcode = (byte & OPCODE); |
+ |
+ if (_opcode != _WebSocketOpcode.CONTINUATION) { |
+ if ((byte & RSV1) != 0) { |
+ _compressed = true; |
+ } else { |
+ _compressed = false; |
+ } |
+ } |
+ |
if (_opcode <= _WebSocketOpcode.BINARY) { |
if (_opcode == _WebSocketOpcode.CONTINUATION) { |
if (_currentMessageType == _WebSocketMessageType.NONE) { |
@@ -139,14 +173,14 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
} |
} else { |
assert(_opcode == _WebSocketOpcode.TEXT || |
- _opcode == _WebSocketOpcode.BINARY); |
+ _opcode == _WebSocketOpcode.BINARY); |
if (_currentMessageType != _WebSocketMessageType.NONE) { |
throw new WebSocketChannelException("Protocol error"); |
} |
_currentMessageType = _opcode; |
} |
} else if (_opcode >= _WebSocketOpcode.CLOSE && |
- _opcode <= _WebSocketOpcode.PONG) { |
+ _opcode <= _WebSocketOpcode.PONG) { |
// Control frames cannot be fragmented. |
if (!_fin) throw new WebSocketChannelException("Protocol error"); |
} else { |
@@ -195,15 +229,14 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
_unmask(index, payloadLength, buffer); |
} |
// Control frame and data frame share _payloads. |
- _payload.add( |
- new Uint8List.view(buffer.buffer, index, payloadLength)); |
+ _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength)); |
index += payloadLength; |
if (_isControlFrame()) { |
if (_remainingPayloadBytes == 0) _controlFrameEnd(); |
} else { |
if (_currentMessageType != _WebSocketMessageType.TEXT && |
_currentMessageType != _WebSocketMessageType.BINARY) { |
- throw new WebSocketChannelException("Protocol error"); |
+ throw new WebSocketChannelException("Protocol error"); |
} |
if (_remainingPayloadBytes == 0) _messageFrameEnd(); |
} |
@@ -238,8 +271,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; |
} |
Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); |
- Int32x4List blockBuffer = new Int32x4List.view( |
- buffer.buffer, index, blockCount); |
+ Int32x4List blockBuffer = |
+ new Int32x4List.view(buffer.buffer, index, blockCount); |
for (int i = 0; i < blockBuffer.length; i++) { |
blockBuffer[i] ^= blockMask; |
} |
@@ -305,12 +338,14 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
void _messageFrameEnd() { |
if (_fin) { |
+ var bytes = _payload.takeBytes(); |
+ |
switch (_currentMessageType) { |
case _WebSocketMessageType.TEXT: |
- _eventSink.add(UTF8.decode(_payload.takeBytes())); |
+ _eventSink.add(UTF8.decode(bytes)); |
break; |
case _WebSocketMessageType.BINARY: |
- _eventSink.add(_payload.takeBytes()); |
+ _eventSink.add(bytes); |
break; |
} |
_currentMessageType = _WebSocketMessageType.NONE; |
@@ -352,8 +387,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
bool _isControlFrame() { |
return _opcode == _WebSocketOpcode.CLOSE || |
- _opcode == _WebSocketOpcode.PING || |
- _opcode == _WebSocketOpcode.PONG; |
+ _opcode == _WebSocketOpcode.PING || |
+ _opcode == _WebSocketOpcode.PONG; |
} |
void _prepareForNextFrame() { |
@@ -368,35 +403,32 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
} |
} |
- |
class _WebSocketPing { |
final List<int> payload; |
_WebSocketPing([this.payload = null]); |
} |
- |
class _WebSocketPong { |
final List<int> payload; |
_WebSocketPong([this.payload = null]); |
} |
// TODO(ajohnsen): Make this transformer reusable. |
-class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
+class _WebSocketOutgoingTransformer |
+ implements StreamTransformer<dynamic, List<int>>, EventSink { |
final WebSocketImpl webSocket; |
- EventSink _eventSink; |
+ EventSink<List<int>> _eventSink; |
_WebSocketOutgoingTransformer(this.webSocket); |
- Stream bind(Stream stream) { |
- return new Stream.eventTransformed( |
- stream, |
- (EventSink eventSink) { |
- if (_eventSink != null) { |
- throw new StateError("WebSocket transformer already used"); |
- } |
- _eventSink = eventSink; |
- return this; |
- }); |
+ Stream<List<int>> bind(Stream stream) { |
+ return new Stream.eventTransformed(stream, (eventSink) { |
+ if (_eventSink != null) { |
+ throw new StateError("WebSocket transformer already used"); |
+ } |
+ _eventSink = eventSink; |
+ return this; |
+ }); |
} |
void add(message) { |
@@ -415,11 +447,12 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
opcode = _WebSocketOpcode.TEXT; |
data = UTF8.encode(message); |
} else { |
- if (message is !List<int>) { |
+ if (message is List<int>) { |
+ data = message; |
+ opcode = _WebSocketOpcode.BINARY; |
+ } else { |
throw new ArgumentError(message); |
} |
- opcode = _WebSocketOpcode.BINARY; |
- data = message; |
} |
} else { |
opcode = _WebSocketOpcode.TEXT; |
@@ -427,8 +460,9 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
addFrame(opcode, data); |
} |
- void addError(Object error, [StackTrace stackTrace]) => |
- _eventSink.addError(error, stackTrace); |
+ void addError(Object error, [StackTrace stackTrace]) { |
+ _eventSink.addError(error, stackTrace); |
+ } |
void close() { |
int code = webSocket._outCloseCode; |
@@ -446,11 +480,17 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
_eventSink.close(); |
} |
- void addFrame(int opcode, List<int> data) => |
- createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); |
+ void addFrame(int opcode, List<int> data) => createFrame( |
+ opcode, |
+ data, |
+ webSocket._serverSide, |
+ false).forEach((e) { |
+ _eventSink.add(e); |
+ }); |
- static Iterable createFrame(int opcode, List<int> data, bool serverSide) { |
- bool mask = !serverSide; // Masking not implemented for server. |
+ static Iterable<List<int>> createFrame( |
+ int opcode, List<int> data, bool serverSide, bool compressed) { |
+ bool mask = !serverSide; // Masking not implemented for server. |
int dataLength = data == null ? 0 : data.length; |
// Determine the header size. |
int headerSize = (mask) ? 6 : 2; |
@@ -461,8 +501,13 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
} |
Uint8List header = new Uint8List(headerSize); |
int index = 0; |
+ |
// Set FIN and opcode. |
- header[index++] = 0x80 | opcode; |
+ var hoc = _WebSocketProtocolTransformer.FIN |
+ | (compressed ? _WebSocketProtocolTransformer.RSV1 : 0) |
+ | (opcode & _WebSocketProtocolTransformer.OPCODE); |
+ |
+ header[index++] = hoc; |
// Determine size and position of length field. |
int lengthBytes = 1; |
if (dataLength > 65535) { |
@@ -495,8 +540,7 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
list = new Uint8List(data.length); |
for (int i = 0; i < data.length; i++) { |
if (data[i] < 0 || 255 < data[i]) { |
- throw new ArgumentError( |
- "List element is not a byte value " |
+ throw new ArgumentError("List element is not a byte value " |
"(value ${data[i]} at index $i)"); |
} |
list[i] = data[i]; |
@@ -512,8 +556,8 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
mask = (mask << 8) | maskBytes[i]; |
} |
Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); |
- Int32x4List blockBuffer = new Int32x4List.view( |
- list.buffer, 0, blockCount); |
+ Int32x4List blockBuffer = |
+ new Int32x4List.view(list.buffer, 0, blockCount); |
for (int i = 0; i < blockBuffer.length; i++) { |
blockBuffer[i] ^= blockMask; |
} |
@@ -534,7 +578,6 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
} |
} |
- |
class _WebSocketConsumer implements StreamConsumer { |
final WebSocketImpl webSocket; |
final StreamSink<List<int>> sink; |
@@ -579,28 +622,28 @@ class _WebSocketConsumer implements StreamConsumer { |
_ensureController() { |
if (_controller != null) return; |
- _controller = new StreamController(sync: true, |
- onPause: _onPause, |
- onResume: _onResume, |
- onCancel: _onListen); |
- var stream = _controller.stream.transform( |
- new _WebSocketOutgoingTransformer(webSocket)); |
- sink.addStream(stream) |
- .then((_) { |
- _done(); |
- _closeCompleter.complete(webSocket); |
- }, onError: (error, StackTrace stackTrace) { |
- _closed = true; |
- _cancel(); |
- if (error is ArgumentError) { |
- if (!_done(error, stackTrace)) { |
- _closeCompleter.completeError(error, stackTrace); |
- } |
- } else { |
- _done(); |
- _closeCompleter.complete(webSocket); |
- } |
- }); |
+ _controller = new StreamController( |
+ sync: true, |
+ onPause: _onPause, |
+ onResume: _onResume, |
+ onCancel: _onListen); |
+ var stream = _controller.stream |
+ .transform(new _WebSocketOutgoingTransformer(webSocket)); |
+ sink.addStream(stream).then((_) { |
+ _done(); |
+ _closeCompleter.complete(webSocket); |
+ }, onError: (error, StackTrace stackTrace) { |
+ _closed = true; |
+ _cancel(); |
+ if (error is ArgumentError) { |
+ if (!_done(error, stackTrace)) { |
+ _closeCompleter.completeError(error, stackTrace); |
+ } |
+ } else { |
+ _done(); |
+ _closeCompleter.complete(webSocket); |
+ } |
+ }); |
} |
bool _done([error, StackTrace stackTrace]) { |
@@ -621,13 +664,9 @@ class _WebSocketConsumer implements StreamConsumer { |
} |
_ensureController(); |
_completer = new Completer(); |
- _subscription = stream.listen( |
- (data) { |
- _controller.add(data); |
- }, |
- onDone: _done, |
- onError: _done, |
- cancelOnError: true); |
+ _subscription = stream.listen((data) { |
+ _controller.add(data); |
+ }, onDone: _done, onError: _done, cancelOnError: true); |
if (_issuedPause) { |
_subscription.pause(); |
_issuedPause = false; |
@@ -657,10 +696,11 @@ class _WebSocketConsumer implements StreamConsumer { |
} |
} |
- |
class WebSocketImpl extends Stream with _ServiceObject implements StreamSink { |
// Use default Map so we keep order. |
static Map<int, WebSocketImpl> _webSockets = new Map<int, WebSocketImpl>(); |
+ static const int DEFAULT_WINDOW_BITS = 15; |
+ static const String PER_MESSAGE_DEFLATE = "permessage-deflate"; |
final String protocol; |
@@ -681,74 +721,64 @@ class WebSocketImpl extends Stream with _ServiceObject implements StreamSink { |
String _outCloseReason; |
Timer _closeTimer; |
- WebSocketImpl.fromSocket(Stream<List<int>> stream, |
- StreamSink<List<int>> sink, this.protocol, [this._serverSide = false]) { |
+ WebSocketImpl.fromSocket( |
+ Stream<List<int>> stream, StreamSink<List<int>> sink, this.protocol, |
+ [this._serverSide = false]) { |
_consumer = new _WebSocketConsumer(this, sink); |
_sink = new StreamSinkImpl(_consumer); |
_readyState = WebSocket.OPEN; |
var transformer = new _WebSocketProtocolTransformer(_serverSide); |
- _subscription = stream.transform(transformer).listen( |
- (data) { |
- if (data is _WebSocketPing) { |
- if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); |
- } else if (data is _WebSocketPong) { |
- // Simply set pingInterval, as it'll cancel any timers. |
- pingInterval = _pingInterval; |
- } else { |
- _controller.add(data); |
- } |
- }, |
- onError: (error) { |
- if (_closeTimer != null) _closeTimer.cancel(); |
- if (error is FormatException) { |
- _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); |
- } else { |
- _close(WebSocketStatus.PROTOCOL_ERROR); |
- } |
- // An error happened, set the close code set above. |
- _closeCode = _outCloseCode; |
- _closeReason = _outCloseReason; |
- _controller.close(); |
- }, |
- onDone: () { |
- if (_closeTimer != null) _closeTimer.cancel(); |
- if (_readyState == WebSocket.OPEN) { |
- _readyState = WebSocket.CLOSING; |
- if (!_isReservedStatusCode(transformer.closeCode)) { |
- _close(transformer.closeCode); |
- } else { |
- _close(); |
- } |
- _readyState = WebSocket.CLOSED; |
- } |
- // Protocol close, use close code from transformer. |
- _closeCode = transformer.closeCode; |
- _closeReason = transformer.closeReason; |
- _controller.close(); |
- }, |
- cancelOnError: true); |
+ _subscription = stream.transform(transformer).listen((data) { |
+ if (data is _WebSocketPing) { |
+ if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); |
+ } else if (data is _WebSocketPong) { |
+ // Simply set pingInterval, as it'll cancel any timers. |
+ pingInterval = _pingInterval; |
+ } else { |
+ _controller.add(data); |
+ } |
+ }, onError: (error, stackTrace) { |
+ if (_closeTimer != null) _closeTimer.cancel(); |
+ if (error is FormatException) { |
+ _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); |
+ } else { |
+ _close(WebSocketStatus.PROTOCOL_ERROR); |
+ } |
+ // An error happened, set the close code set above. |
+ _closeCode = _outCloseCode; |
+ _closeReason = _outCloseReason; |
+ _controller.close(); |
+ }, onDone: () { |
+ if (_closeTimer != null) _closeTimer.cancel(); |
+ if (_readyState == WebSocket.OPEN) { |
+ _readyState = WebSocket.CLOSING; |
+ if (!_isReservedStatusCode(transformer.closeCode)) { |
+ _close(transformer.closeCode, transformer.closeReason); |
+ } else { |
+ _close(); |
+ } |
+ _readyState = WebSocket.CLOSED; |
+ } |
+ // Protocol close, use close code from transformer. |
+ _closeCode = transformer.closeCode; |
+ _closeReason = transformer.closeReason; |
+ _controller.close(); |
+ }, cancelOnError: true); |
_subscription.pause(); |
- _controller = new StreamController(sync: true, |
- onListen: () => _subscription.resume(), |
- onCancel: () { |
- _subscription.cancel(); |
- _subscription = null; |
- }, |
- onPause: _subscription.pause, |
- onResume: _subscription.resume); |
+ _controller = new StreamController( |
+ sync: true, onListen: () => _subscription.resume(), onCancel: () { |
+ _subscription.cancel(); |
+ _subscription = null; |
+ }, onPause: _subscription.pause, onResume: _subscription.resume); |
_webSockets[_serviceId] = this; |
} |
StreamSubscription listen(void onData(message), |
- {Function onError, |
- void onDone(), |
- bool cancelOnError}) { |
+ {Function onError, void onDone(), bool cancelOnError}) { |
return _controller.stream.listen(onData, |
- onError: onError, |
- onDone: onDone, |
- cancelOnError: cancelOnError); |
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
} |
Duration get pingInterval => _pingInterval; |
@@ -776,9 +806,10 @@ class WebSocketImpl extends Stream with _ServiceObject implements StreamSink { |
int get closeCode => _closeCode; |
String get closeReason => _closeReason; |
- void add(data) => _sink.add(data); |
- void addError(error, [StackTrace stackTrace]) => |
- _sink.addError(error, stackTrace); |
+ void add(data) { _sink.add(data); } |
+ void addError(error, [StackTrace stackTrace]) { |
+ _sink.addError(error, stackTrace); |
+ } |
Future addStream(Stream stream) => _sink.addStream(stream); |
Future get done => _sink.done; |
@@ -825,20 +856,19 @@ class WebSocketImpl extends Stream with _ServiceObject implements StreamSink { |
_webSockets.remove(_serviceId); |
} |
- // The _toJSON, _serviceTypePath, and _serviceTypeName methods |
- // have been deleted for http_parser. The methods were unused in WebSocket |
- // code and produced warnings. |
+ // The _toJSON, _serviceTypePath, and _serviceTypeName methods have been |
+ // deleted for web_socket_channel. The methods were unused in WebSocket code |
+ // and produced warnings. |
static bool _isReservedStatusCode(int code) { |
return code != null && |
- (code < WebSocketStatus.NORMAL_CLOSURE || |
+ (code < WebSocketStatus.NORMAL_CLOSURE || |
code == WebSocketStatus.RESERVED_1004 || |
code == WebSocketStatus.NO_STATUS_RECEIVED || |
code == WebSocketStatus.ABNORMAL_CLOSURE || |
(code > WebSocketStatus.INTERNAL_SERVER_ERROR && |
- code < WebSocketStatus.RESERVED_1015) || |
- (code >= WebSocketStatus.RESERVED_1015 && |
- code < 3000)); |
+ code < WebSocketStatus.RESERVED_1015) || |
+ (code >= WebSocketStatus.RESERVED_1015 && code < 3000)); |
} |
} |