Index: sdk/lib/io/websocket_impl.dart |
diff --git a/sdk/lib/io/websocket_impl.dart b/sdk/lib/io/websocket_impl.dart |
index cc68544e536233a5e43d63a699e01d4d58ce5b4e..bcb800d84aa80e8f4ca0d812f641b231012f9336 100644 |
--- a/sdk/lib/io/websocket_impl.dart |
+++ b/sdk/lib/io/websocket_impl.dart |
@@ -5,6 +5,10 @@ |
part of dart.io; |
const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; |
+const String _clientNoContextTakeover = "client_no_context_takeover"; |
+const String _serverNoContextTakeover = "server_no_context_takeover"; |
+const String _clientMaxWindowBits = "client_max_window_bits"; |
+const String _serverMaxWindowBits = "server_max_window_bits"; |
// Matches _WebSocketOpcode. |
class _WebSocketMessageType { |
@@ -13,7 +17,6 @@ class _WebSocketMessageType { |
static const int BINARY = 2; |
} |
- |
class _WebSocketOpcode { |
static const int CONTINUATION = 0; |
static const int TEXT = 1; |
@@ -38,8 +41,8 @@ class _WebSocketOpcode { |
* which is supplied through the [:handleData:]. As the protocol is processed, |
* it'll output frame data as either a List<int> or String. |
* |
- * Important infomation about usage: Be sure you use cancelOnError, so the |
- * socket will be closed when the processer encounter an error. Not using it |
+ * Important information about usage: Be sure you use cancelOnError, so the |
+ * socket will be closed when the processor encounter an error. Not using it |
* will lead to undefined behaviour. |
*/ |
// TODO(ajohnsen): make this transformer reusable? |
@@ -51,9 +54,15 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
static const int PAYLOAD = 4; |
static const int CLOSED = 5; |
static const int FAILURE = 6; |
+ static const int FIN = 0x80; |
+ static const int RSV1 = 0x40; |
+ static const int RSV2 = 0x20; |
+ static const int RSV3 = 0x10; |
+ static const int OPCODE = 0xF; |
int _state = START; |
bool _fin = false; |
+ bool _compressed = false; |
int _opcode = -1; |
int _len = -1; |
bool _masked = false; |
@@ -71,18 +80,17 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
final List _maskingBytes = new List(4); |
final BytesBuilder _payload = new BytesBuilder(copy: false); |
- _WebSocketProtocolTransformer([this._serverSide = false]); |
+ _WebSocketPerMessageDeflate _deflate; |
+ _WebSocketProtocolTransformer([this._serverSide = false, this._deflate]); |
Stream bind(Stream stream) { |
- return new Stream.eventTransformed( |
- stream, |
- (EventSink eventSink) { |
- if (_eventSink != null) { |
- throw new StateError("WebSocket transformer already used."); |
- } |
- _eventSink = eventSink; |
- return this; |
- }); |
+ return new Stream.eventTransformed(stream, (EventSink eventSink) { |
+ if (_eventSink != null) { |
+ throw new StateError("WebSocket transformer already used."); |
+ } |
+ _eventSink = eventSink; |
+ return this; |
+ }); |
} |
void addError(Object error, [StackTrace stackTrace]) => |
@@ -94,9 +102,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
* Process data received from the underlying communication channel. |
*/ |
void add(Uint8List buffer) { |
- int count = buffer.length; |
int index = 0; |
- int lastIndex = count; |
+ int lastIndex = buffer.length; |
if (_state == CLOSED) { |
throw new WebSocketException("Data on closed connection"); |
} |
@@ -107,12 +114,20 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
int byte = buffer[index]; |
if (_state <= LEN_REST) { |
if (_state == START) { |
- _fin = (byte & 0x80) != 0; |
- if ((byte & 0x70) != 0) { |
- // The RSV1, RSV2 bits RSV3 must be all zero. |
+ _fin = (byte & FIN) != 0; |
+ |
+ if((byte & (RSV2 | RSV3)) != 0) { |
+ // The RSV2, RSV3 bits must both be zero. |
throw new WebSocketException("Protocol error"); |
} |
- _opcode = (byte & 0xF); |
+ |
+ if ((byte & RSV1) != 0) { |
+ _compressed = true; |
+ } else { |
+ _compressed = false; |
+ } |
+ _opcode = (byte & OPCODE); |
+ |
if (_opcode <= _WebSocketOpcode.BINARY) { |
if (_opcode == _WebSocketOpcode.CONTINUATION) { |
if (_currentMessageType == _WebSocketMessageType.NONE) { |
@@ -120,14 +135,14 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
} |
} else { |
assert(_opcode == _WebSocketOpcode.TEXT || |
- _opcode == _WebSocketOpcode.BINARY); |
+ _opcode == _WebSocketOpcode.BINARY); |
if (_currentMessageType != _WebSocketMessageType.NONE) { |
throw new WebSocketException("Protocol error"); |
} |
_currentMessageType = _opcode; |
} |
} else if (_opcode >= _WebSocketOpcode.CLOSE && |
- _opcode <= _WebSocketOpcode.PONG) { |
+ _opcode <= _WebSocketOpcode.PONG) { |
// Control frames cannot be fragmented. |
if (!_fin) throw new WebSocketException("Protocol error"); |
} else { |
@@ -176,15 +191,14 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
_unmask(index, payloadLength, buffer); |
} |
// Control frame and data frame share _payloads. |
- _payload.add( |
- new Uint8List.view(buffer.buffer, index, payloadLength)); |
+ _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength)); |
index += payloadLength; |
if (_isControlFrame()) { |
if (_remainingPayloadBytes == 0) _controlFrameEnd(); |
} else { |
if (_currentMessageType != _WebSocketMessageType.TEXT && |
_currentMessageType != _WebSocketMessageType.BINARY) { |
- throw new WebSocketException("Protocol error"); |
+ throw new WebSocketException("Protocol error"); |
} |
if (_remainingPayloadBytes == 0) _messageFrameEnd(); |
} |
@@ -219,8 +233,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; |
} |
Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); |
- Int32x4List blockBuffer = new Int32x4List.view( |
- buffer.buffer, index, blockCount); |
+ Int32x4List blockBuffer = |
+ new Int32x4List.view(buffer.buffer, index, blockCount); |
for (int i = 0; i < blockBuffer.length; i++) { |
blockBuffer[i] ^= blockMask; |
} |
@@ -284,12 +298,17 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
void _messageFrameEnd() { |
if (_fin) { |
+ var bytes = _payload.takeBytes(); |
+ if (_deflate != null && _compressed) { |
+ bytes = _deflate.processIncomingMessage(bytes); |
+ } |
+ |
switch (_currentMessageType) { |
case _WebSocketMessageType.TEXT: |
- _eventSink.add(UTF8.decode(_payload.takeBytes())); |
+ _eventSink.add(UTF8.decode(bytes)); |
break; |
case _WebSocketMessageType.BINARY: |
- _eventSink.add(_payload.takeBytes()); |
+ _eventSink.add(bytes); |
break; |
} |
_currentMessageType = _WebSocketMessageType.NONE; |
@@ -331,8 +350,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
bool _isControlFrame() { |
return _opcode == _WebSocketOpcode.CLOSE || |
- _opcode == _WebSocketOpcode.PING || |
- _opcode == _WebSocketOpcode.PONG; |
+ _opcode == _WebSocketOpcode.PING || |
+ _opcode == _WebSocketOpcode.PONG; |
} |
void _prepareForNextFrame() { |
@@ -347,31 +366,29 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
} |
} |
- |
class _WebSocketPing { |
final List<int> payload; |
_WebSocketPing([this.payload = null]); |
} |
- |
class _WebSocketPong { |
final List<int> payload; |
_WebSocketPong([this.payload = null]); |
} |
- |
class _WebSocketTransformerImpl implements WebSocketTransformer { |
final StreamController<WebSocket> _controller = |
new StreamController<WebSocket>(sync: true); |
final Function _protocolSelector; |
+ final CompressionOptions _compression; |
- _WebSocketTransformerImpl(this._protocolSelector); |
+ _WebSocketTransformerImpl(this._protocolSelector, this._compression); |
Stream<WebSocket> bind(Stream<HttpRequest> stream) { |
stream.listen((request) { |
- _upgrade(request, _protocolSelector) |
- .then((WebSocket webSocket) => _controller.add(webSocket)) |
- .catchError(_controller.addError); |
+ _upgrade(request, _protocolSelector, _compression) |
+ .then((WebSocket webSocket) => _controller.add(webSocket)) |
+ .catchError(_controller.addError); |
}, onDone: () { |
_controller.close(); |
}); |
@@ -379,13 +396,14 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { |
return _controller.stream; |
} |
- static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) { |
+ static Future<WebSocket> _upgrade( |
+ HttpRequest request, _protocolSelector, CompressionOptions compression) { |
var response = request.response; |
if (!_isUpgradeRequest(request)) { |
// Send error response. |
response |
- ..statusCode = HttpStatus.BAD_REQUEST |
- ..close(); |
+ ..statusCode = HttpStatus.BAD_REQUEST |
+ ..close(); |
return new Future.error( |
new WebSocketException("Invalid WebSocket upgrade request")); |
} |
@@ -393,9 +411,9 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { |
Future upgrade(String protocol) { |
// Send the upgrade response. |
response |
- ..statusCode = HttpStatus.SWITCHING_PROTOCOLS |
- ..headers.add(HttpHeaders.CONNECTION, "Upgrade") |
- ..headers.add(HttpHeaders.UPGRADE, "websocket"); |
+ ..statusCode = HttpStatus.SWITCHING_PROTOCOLS |
+ ..headers.add(HttpHeaders.CONNECTION, "Upgrade") |
+ ..headers.add(HttpHeaders.UPGRADE, "websocket"); |
String key = request.headers.value("Sec-WebSocket-Key"); |
_SHA1 sha1 = new _SHA1(); |
sha1.add("$key$_webSocketGUID".codeUnits); |
@@ -404,10 +422,13 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { |
if (protocol != null) { |
response.headers.add("Sec-WebSocket-Protocol", protocol); |
} |
+ |
+ var deflate = _negotiateCompression(request, response, compression); |
+ |
response.headers.contentLength = 0; |
- return response.detachSocket() |
- .then((socket) => new _WebSocketImpl._fromSocket( |
- socket, protocol, true)); |
+ return response.detachSocket().then((socket) => |
+ new _WebSocketImpl._fromSocket( |
+ socket, protocol, compression, true, deflate)); |
} |
var protocols = request.headers['Sec-WebSocket-Protocol']; |
@@ -416,26 +437,53 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { |
// consisting of multiple protocols. To unify all of them, first join |
// the lists with ', ' and then tokenize. |
protocols = _HttpParser._tokenizeFieldValue(protocols.join(', ')); |
- return new Future(() => _protocolSelector(protocols)) |
- .then((protocol) { |
- if (protocols.indexOf(protocol) < 0) { |
- throw new WebSocketException( |
- "Selected protocol is not in the list of available protocols"); |
- } |
- return protocol; |
- }) |
- .catchError((error) { |
- response |
- ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR |
- ..close(); |
- throw error; |
- }) |
- .then(upgrade); |
+ return new Future(() => _protocolSelector(protocols)).then((protocol) { |
+ if (protocols.indexOf(protocol) < 0) { |
+ throw new WebSocketException( |
+ "Selected protocol is not in the list of available protocols"); |
+ } |
+ return protocol; |
+ }).catchError((error) { |
+ response |
+ ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR |
+ ..close(); |
+ throw error; |
+ }).then(upgrade); |
} else { |
return upgrade(null); |
} |
} |
+ static _WebSocketPerMessageDeflate _negotiateCompression(HttpRequest request, |
+ HttpResponse response, CompressionOptions compression) { |
+ var extensionHeader = request.headers.value("Sec-WebSocket-Extensions"); |
+ |
+ if (extensionHeader == null) { |
+ extensionHeader = ""; |
+ } |
+ |
+ var hv = HeaderValue.parse(extensionHeader, valueSeparator: ','); |
+ if (compression.enabled && hv.value == _WebSocketImpl.PER_MESSAGE_DEFLATE) { |
+ var info = compression._createHeader(hv); |
+ |
+ response.headers.add("Sec-WebSocket-Extensions", info[0]); |
+ var serverNoContextTakeover = |
+ hv.parameters.containsKey(_serverNoContextTakeover); |
+ var clientNoContextTakeover = |
+ hv.parameters.containsKey(_clientNoContextTakeover); |
+ var deflate = new _WebSocketPerMessageDeflate( |
+ serverNoContextTakeover: serverNoContextTakeover, |
+ clientNoContextTakeover: clientNoContextTakeover, |
+ serverMaxWindowBits: info[1], |
+ clientMaxWindowBits: info[1], |
+ serverSide: true); |
+ |
+ return deflate; |
+ } |
+ |
+ return null; |
+ } |
+ |
static bool _isUpgradeRequest(HttpRequest request) { |
if (request.method != "GET") { |
return false; |
@@ -464,24 +512,127 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { |
} |
} |
+class _WebSocketPerMessageDeflate { |
+ bool serverNoContextTakeover; |
+ bool clientNoContextTakeover; |
+ int clientMaxWindowBits; |
+ int serverMaxWindowBits; |
+ bool serverSide; |
+ |
+ _Filter decoder; |
+ _Filter encoder; |
+ |
+ _WebSocketPerMessageDeflate( |
+ {this.clientMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS, |
+ this.serverMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS, |
+ this.serverNoContextTakeover: false, |
+ this.clientNoContextTakeover: false, |
+ this.serverSide: false}); |
+ |
+ void _ensureDecoder() { |
+ if (decoder == null) { |
+ decoder = _Filter._newZLibInflateFilter( |
+ serverSide ? clientMaxWindowBits : serverMaxWindowBits, null, true); |
+ } |
+ } |
+ |
+ void _ensureEncoder() { |
+ if (encoder == null) { |
+ encoder = _Filter._newZLibDeflateFilter( |
+ false, |
+ ZLibOption.DEFAULT_LEVEL, |
+ serverSide ? serverMaxWindowBits : clientMaxWindowBits, |
+ ZLibOption.DEFAULT_MEM_LEVEL, |
+ ZLibOption.STRATEGY_DEFAULT, |
+ null, |
+ true); |
+ } |
+ } |
+ |
+ Uint8List processIncomingMessage(List<int> msg) { |
+ _ensureDecoder(); |
+ |
+ var data = []; |
+ data.addAll(msg); |
+ data.addAll(const [0x00, 0x00, 0xff, 0xff]); |
+ |
+ decoder.process(data, 0, data.length); |
+ var reuse = |
+ !(serverSide ? clientNoContextTakeover : serverNoContextTakeover); |
+ var result = []; |
+ var out; |
+ |
+ while ((out = decoder.processed(flush: reuse)) != null) { |
+ result.addAll(out); |
+ } |
+ |
+ decoder.processed(flush: reuse); |
+ |
+ if (!reuse) { |
+ decoder.end(); |
+ decoder = null; |
+ } |
+ return new Uint8List.fromList(result); |
+ } |
+ |
+ List<int> processOutgoingMessage(List<int> msg) { |
+ _ensureEncoder(); |
+ var reuse = |
+ !(serverSide ? serverNoContextTakeover : clientNoContextTakeover); |
+ var result = []; |
+ Uint8List buffer; |
+ var out; |
+ |
+ if (msg is! Uint8List) { |
+ for (var i = 0; i < msg.length; i++) { |
+ if (msg[i] < 0 || 255 < msg[i]) { |
+ throw new ArgumentError("List element is not a byte value " |
+ "(value ${msg[i]} at index $i)"); |
+ } |
+ } |
+ buffer = new Uint8List.fromList(msg); |
+ } else { |
+ buffer = msg; |
+ } |
+ |
+ encoder.process(buffer, 0, buffer.length); |
+ |
+ while ((out = encoder.processed(flush: reuse)) != null) { |
+ result.addAll(out); |
+ } |
+ |
+ if (serverSide ? serverNoContextTakeover : clientNoContextTakeover) { |
+ encoder.end(); |
+ encoder = null; |
+ } |
+ |
+ if (result.length > 4) { |
+ result = result.sublist(0, result.length - 4); |
+ } |
+ |
+ return result; |
+ } |
+} |
// TODO(ajohnsen): Make this transformer reusable. |
class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
final _WebSocketImpl webSocket; |
EventSink _eventSink; |
- _WebSocketOutgoingTransformer(this.webSocket); |
+ _WebSocketPerMessageDeflate _deflateHelper; |
+ |
+ _WebSocketOutgoingTransformer(this.webSocket) { |
+ _deflateHelper = webSocket._deflate; |
+ } |
Stream bind(Stream stream) { |
- return new Stream.eventTransformed( |
- stream, |
- (EventSink eventSink) { |
- if (_eventSink != null) { |
- throw new StateError("WebSocket transformer already used"); |
- } |
- _eventSink = eventSink; |
- return this; |
- }); |
+ return new Stream.eventTransformed(stream, (EventSink eventSink) { |
+ if (_eventSink != null) { |
+ throw new StateError("WebSocket transformer already used"); |
+ } |
+ _eventSink = eventSink; |
+ return this; |
+ }); |
} |
void add(message) { |
@@ -500,12 +651,16 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
opcode = _WebSocketOpcode.TEXT; |
data = UTF8.encode(message); |
} else { |
- if (message is !List<int>) { |
+ if (message is! List<int>) { |
throw new ArgumentError(message); |
} |
opcode = _WebSocketOpcode.BINARY; |
data = message; |
} |
+ |
+ if (_deflateHelper != null) { |
+ data = _deflateHelper.processOutgoingMessage(data); |
+ } |
} else { |
opcode = _WebSocketOpcode.TEXT; |
} |
@@ -531,11 +686,19 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
_eventSink.close(); |
} |
- void addFrame(int opcode, List<int> data) => |
- createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); |
+ void addFrame(int opcode, List<int> data) => createFrame( |
+ opcode, |
+ data, |
+ webSocket._serverSide, |
+ _deflateHelper != null && |
+ (opcode == _WebSocketOpcode.TEXT || |
+ opcode == _WebSocketOpcode.BINARY)).forEach((e) { |
+ _eventSink.add(e); |
+ }); |
- static Iterable createFrame(int opcode, List<int> data, bool serverSide) { |
- bool mask = !serverSide; // Masking not implemented for server. |
+ static Iterable createFrame( |
+ int opcode, List<int> data, bool serverSide, bool compressed) { |
+ bool mask = !serverSide; // Masking not implemented for server. |
int dataLength = data == null ? 0 : data.length; |
// Determine the header size. |
int headerSize = (mask) ? 6 : 2; |
@@ -546,11 +709,15 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
} |
Uint8List header = new Uint8List(headerSize); |
int index = 0; |
+ |
// Set FIN and opcode. |
- header[index++] = 0x80 | opcode; |
+ var hoc = _WebSocketProtocolTransformer.FIN |
+ | (compressed ? _WebSocketProtocolTransformer.RSV1 : 0) |
+ | (opcode & _WebSocketProtocolTransformer.OPCODE); |
+ |
+ header[index++] = hoc; |
// Determine size and position of length field. |
int lengthBytes = 1; |
- int firstLengthByte = 1; |
if (dataLength > 65535) { |
header[index++] = 127; |
lengthBytes = 8; |
@@ -580,8 +747,7 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
list = new Uint8List(data.length); |
for (int i = 0; i < data.length; i++) { |
if (data[i] < 0 || 255 < data[i]) { |
- throw new ArgumentError( |
- "List element is not a byte value " |
+ throw new ArgumentError("List element is not a byte value " |
"(value ${data[i]} at index $i)"); |
} |
list[i] = data[i]; |
@@ -597,8 +763,8 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
mask = (mask << 8) | maskBytes[i]; |
} |
Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); |
- Int32x4List blockBuffer = new Int32x4List.view( |
- list.buffer, 0, blockCount); |
+ Int32x4List blockBuffer = |
+ new Int32x4List.view(list.buffer, 0, blockCount); |
for (int i = 0; i < blockBuffer.length; i++) { |
blockBuffer[i] ^= blockMask; |
} |
@@ -619,7 +785,6 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
} |
} |
- |
class _WebSocketConsumer implements StreamConsumer { |
final _WebSocketImpl webSocket; |
final Socket socket; |
@@ -664,28 +829,28 @@ class _WebSocketConsumer implements StreamConsumer { |
_ensureController() { |
if (_controller != null) return; |
- _controller = new StreamController(sync: true, |
- onPause: _onPause, |
- onResume: _onResume, |
- onCancel: _onListen); |
- var stream = _controller.stream.transform( |
- new _WebSocketOutgoingTransformer(webSocket)); |
- socket.addStream(stream) |
- .then((_) { |
- _done(); |
- _closeCompleter.complete(webSocket); |
- }, onError: (error, StackTrace stackTrace) { |
- _closed = true; |
- _cancel(); |
- if (error is ArgumentError) { |
- if (!_done(error, stackTrace)) { |
- _closeCompleter.completeError(error, stackTrace); |
- } |
- } else { |
- _done(); |
- _closeCompleter.complete(webSocket); |
- } |
- }); |
+ _controller = new StreamController( |
+ sync: true, |
+ onPause: _onPause, |
+ onResume: _onResume, |
+ onCancel: _onListen); |
+ var stream = _controller.stream |
+ .transform(new _WebSocketOutgoingTransformer(webSocket)); |
+ socket.addStream(stream).then((_) { |
+ _done(); |
+ _closeCompleter.complete(webSocket); |
+ }, onError: (error, StackTrace stackTrace) { |
+ _closed = true; |
+ _cancel(); |
+ if (error is ArgumentError) { |
+ if (!_done(error, stackTrace)) { |
+ _closeCompleter.completeError(error, stackTrace); |
+ } |
+ } else { |
+ _done(); |
+ _closeCompleter.complete(webSocket); |
+ } |
+ }); |
} |
bool _done([error, StackTrace stackTrace]) { |
@@ -706,13 +871,9 @@ class _WebSocketConsumer implements StreamConsumer { |
} |
_ensureController(); |
_completer = new Completer(); |
- _subscription = stream.listen( |
- (data) { |
- _controller.add(data); |
- }, |
- onDone: _done, |
- onError: _done, |
- cancelOnError: true); |
+ _subscription = stream.listen((data) { |
+ _controller.add(data); |
+ }, onDone: _done, onError: _done, cancelOnError: true); |
if (_issuedPause) { |
_subscription.pause(); |
_issuedPause = false; |
@@ -742,10 +903,11 @@ class _WebSocketConsumer implements StreamConsumer { |
} |
} |
- |
class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { |
// Use default Map so we keep order. |
static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>(); |
+ static const int DEFAULT_WINDOW_BITS = 15; |
+ static const String PER_MESSAGE_DEFLATE = "permessage-deflate"; |
final String protocol; |
@@ -766,11 +928,13 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { |
int _outCloseCode; |
String _outCloseReason; |
Timer _closeTimer; |
+ _WebSocketPerMessageDeflate _deflate; |
static final HttpClient _httpClient = new HttpClient(); |
static Future<WebSocket> connect( |
- String url, Iterable<String> protocols, Map<String, dynamic> headers) { |
+ String url, Iterable<String> protocols, Map<String, dynamic> headers, |
+ {CompressionOptions compression: CompressionOptions.DEFAULT}) { |
Uri uri = Uri.parse(url); |
if (uri.scheme != "ws" && uri.scheme != "wss") { |
throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'"); |
@@ -784,144 +948,182 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { |
} |
String nonce = _CryptoUtils.bytesToBase64(nonceData); |
- uri = new Uri(scheme: uri.scheme == "wss" ? "https" : "http", |
- userInfo: uri.userInfo, |
- host: uri.host, |
- port: uri.port, |
- path: uri.path, |
- query: uri.query, |
- fragment: uri.fragment); |
- return _httpClient.openUrl("GET", uri) |
- .then((request) { |
- if (uri.userInfo != null && !uri.userInfo.isEmpty) { |
- // If the URL contains user information use that for basic |
- // authorization. |
- String auth = |
- _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo)); |
- request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); |
- } |
- if (headers != null) { |
- headers.forEach((field, value) => request.headers.add(field, value)); |
- } |
- // Setup the initial handshake. |
+ uri = new Uri( |
+ scheme: uri.scheme == "wss" ? "https" : "http", |
+ userInfo: uri.userInfo, |
+ host: uri.host, |
+ port: uri.port, |
+ path: uri.path, |
+ query: uri.query, |
+ fragment: uri.fragment); |
+ return _httpClient.openUrl("GET", uri).then((request) { |
+ if (uri.userInfo != null && !uri.userInfo.isEmpty) { |
+ // If the URL contains user information use that for basic |
+ // authorization. |
+ String auth = _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo)); |
+ request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); |
+ } |
+ if (headers != null) { |
+ headers.forEach((field, value) => request.headers.add(field, value)); |
+ } |
+ // Setup the initial handshake. |
+ request.headers |
+ ..set(HttpHeaders.CONNECTION, "Upgrade") |
+ ..set(HttpHeaders.UPGRADE, "websocket") |
+ ..set("Sec-WebSocket-Key", nonce) |
+ ..set("Cache-Control", "no-cache") |
+ ..set("Sec-WebSocket-Version", "13"); |
+ if (protocols != null) { |
+ request.headers.add("Sec-WebSocket-Protocol", protocols.toList()); |
+ } |
+ |
+ if (compression.enabled) { |
request.headers |
- ..set(HttpHeaders.CONNECTION, "Upgrade") |
- ..set(HttpHeaders.UPGRADE, "websocket") |
- ..set("Sec-WebSocket-Key", nonce) |
- ..set("Cache-Control", "no-cache") |
- ..set("Sec-WebSocket-Version", "13"); |
- if (protocols != null) { |
- request.headers.add("Sec-WebSocket-Protocol", protocols.toList()); |
- } |
- return request.close(); |
- }) |
- .then((response) { |
- void error(String message) { |
- // Flush data. |
- response.detachSocket().then((socket) { |
- socket.destroy(); |
- }); |
- throw new WebSocketException(message); |
- } |
- if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS || |
- response.headers[HttpHeaders.CONNECTION] == null || |
- !response.headers[HttpHeaders.CONNECTION].any( |
- (value) => value.toLowerCase() == "upgrade") || |
- response.headers.value(HttpHeaders.UPGRADE).toLowerCase() != |
- "websocket") { |
- error("Connection to '$uri' was not upgraded to websocket"); |
- } |
- String accept = response.headers.value("Sec-WebSocket-Accept"); |
- if (accept == null) { |
- error("Response did not contain a 'Sec-WebSocket-Accept' header"); |
- } |
- _SHA1 sha1 = new _SHA1(); |
- sha1.add("$nonce$_webSocketGUID".codeUnits); |
- List<int> expectedAccept = sha1.close(); |
- List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept); |
- if (expectedAccept.length != receivedAccept.length) { |
- error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length"); |
+ .add("Sec-WebSocket-Extensions", compression._createHeader()); |
+ } |
+ |
+ return request.close(); |
+ }).then((response) { |
+ void error(String message) { |
+ // Flush data. |
+ response.detachSocket().then((socket) { |
+ socket.destroy(); |
+ }); |
+ throw new WebSocketException(message); |
+ } |
+ if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS || |
+ response.headers[HttpHeaders.CONNECTION] == null || |
+ !response.headers[HttpHeaders.CONNECTION] |
+ .any((value) => value.toLowerCase() == "upgrade") || |
+ response.headers.value(HttpHeaders.UPGRADE).toLowerCase() != |
+ "websocket") { |
+ error("Connection to '$uri' was not upgraded to websocket"); |
+ } |
+ String accept = response.headers.value("Sec-WebSocket-Accept"); |
+ if (accept == null) { |
+ error("Response did not contain a 'Sec-WebSocket-Accept' header"); |
+ } |
+ _SHA1 sha1 = new _SHA1(); |
+ sha1.add("$nonce$_webSocketGUID".codeUnits); |
+ List<int> expectedAccept = sha1.close(); |
+ List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept); |
+ if (expectedAccept.length != receivedAccept.length) { |
+ error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length"); |
+ } |
+ for (int i = 0; i < expectedAccept.length; i++) { |
+ if (expectedAccept[i] != receivedAccept[i]) { |
+ error("Bad response 'Sec-WebSocket-Accept' header"); |
} |
- for (int i = 0; i < expectedAccept.length; i++) { |
- if (expectedAccept[i] != receivedAccept[i]) { |
- error("Bad response 'Sec-WebSocket-Accept' header"); |
- } |
+ } |
+ var protocol = response.headers.value('Sec-WebSocket-Protocol'); |
+ |
+ _WebSocketPerMessageDeflate deflate = |
+ negotiateClientCompression(response, compression); |
+ |
+ return response.detachSocket().then((socket) => |
+ new _WebSocketImpl._fromSocket( |
+ socket, protocol, compression, false, deflate)); |
+ }); |
+ } |
+ |
+ static _WebSocketPerMessageDeflate negotiateClientCompression( |
+ HttpClientResponse response, CompressionOptions compression) { |
+ String extensionHeader = response.headers.value('Sec-WebSocket-Extensions'); |
+ |
+ if (extensionHeader == null) { |
+ extensionHeader = ""; |
+ } |
+ |
+ var hv = HeaderValue.parse(extensionHeader, valueSeparator: ','); |
+ |
+ if (compression.enabled && hv.value == PER_MESSAGE_DEFLATE) { |
+ var serverNoContextTakeover = |
+ hv.parameters.containsKey(_serverNoContextTakeover); |
+ var clientNoContextTakeover = |
+ hv.parameters.containsKey(_clientNoContextTakeover); |
+ |
+ int getWindowBits(String type) { |
+ var o = hv.parameters[type]; |
+ if (o == null) { |
+ return DEFAULT_WINDOW_BITS; |
} |
- var protocol = response.headers.value('Sec-WebSocket-Protocol'); |
- return response.detachSocket() |
- .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol)); |
- }); |
+ |
+ o = int.parse(o, onError: (s) => DEFAULT_WINDOW_BITS); |
+ return o; |
+ } |
+ |
+ return new _WebSocketPerMessageDeflate( |
+ clientMaxWindowBits: getWindowBits(_clientMaxWindowBits), |
+ serverMaxWindowBits: getWindowBits(_serverMaxWindowBits), |
+ clientNoContextTakeover: clientNoContextTakeover, |
+ serverNoContextTakeover: serverNoContextTakeover); |
+ } |
+ |
+ return null; |
} |
- _WebSocketImpl._fromSocket(this._socket, this.protocol, |
- [this._serverSide = false]) { |
+ _WebSocketImpl._fromSocket( |
+ this._socket, this.protocol, CompressionOptions compression, |
+ [this._serverSide = false, _WebSocketPerMessageDeflate deflate]) { |
_consumer = new _WebSocketConsumer(this, _socket); |
_sink = new _StreamSinkImpl(_consumer); |
_readyState = WebSocket.OPEN; |
- |
- var transformer = new _WebSocketProtocolTransformer(_serverSide); |
- _subscription = _socket.transform(transformer).listen( |
- (data) { |
- if (data is _WebSocketPing) { |
- if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); |
- } else if (data is _WebSocketPong) { |
- // Simply set pingInterval, as it'll cancel any timers. |
- pingInterval = _pingInterval; |
- } else { |
- _controller.add(data); |
- } |
- }, |
- onError: (error) { |
- if (_closeTimer != null) _closeTimer.cancel(); |
- if (error is FormatException) { |
- _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); |
- } else { |
- _close(WebSocketStatus.PROTOCOL_ERROR); |
- } |
- // An error happened, set the close code set above. |
- _closeCode = _outCloseCode; |
- _closeReason = _outCloseReason; |
- _controller.close(); |
- }, |
- onDone: () { |
- if (_closeTimer != null) _closeTimer.cancel(); |
- if (_readyState == WebSocket.OPEN) { |
- _readyState = WebSocket.CLOSING; |
- if (!_isReservedStatusCode(transformer.closeCode)) { |
- _close(transformer.closeCode, transformer.closeReason); |
- } else { |
- _close(); |
- } |
- _readyState = WebSocket.CLOSED; |
- } |
- // Protocol close, use close code from transformer. |
- _closeCode = transformer.closeCode; |
- _closeReason = transformer.closeReason; |
- _controller.close(); |
- }, |
- cancelOnError: true); |
+ _deflate = deflate; |
+ |
+ var transformer = new _WebSocketProtocolTransformer(_serverSide, _deflate); |
+ _subscription = _socket.transform(transformer).listen((data) { |
+ if (data is _WebSocketPing) { |
+ if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); |
+ } else if (data is _WebSocketPong) { |
+ // Simply set pingInterval, as it'll cancel any timers. |
+ pingInterval = _pingInterval; |
+ } else { |
+ _controller.add(data); |
+ } |
+ }, onError: (error, stackTrace) { |
+ if (_closeTimer != null) _closeTimer.cancel(); |
+ if (error is FormatException) { |
+ _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); |
+ } else { |
+ _close(WebSocketStatus.PROTOCOL_ERROR); |
+ } |
+ // An error happened, set the close code set above. |
+ _closeCode = _outCloseCode; |
+ _closeReason = _outCloseReason; |
+ _controller.close(); |
+ }, onDone: () { |
+ if (_closeTimer != null) _closeTimer.cancel(); |
+ if (_readyState == WebSocket.OPEN) { |
+ _readyState = WebSocket.CLOSING; |
+ if (!_isReservedStatusCode(transformer.closeCode)) { |
+ _close(transformer.closeCode, transformer.closeReason); |
+ } else { |
+ _close(); |
+ } |
+ _readyState = WebSocket.CLOSED; |
+ } |
+ // Protocol close, use close code from transformer. |
+ _closeCode = transformer.closeCode; |
+ _closeReason = transformer.closeReason; |
+ _controller.close(); |
+ }, cancelOnError: true); |
_subscription.pause(); |
- _controller = new StreamController(sync: true, |
- onListen: _subscription.resume, |
- onCancel: () { |
- _subscription.cancel(); |
- _subscription = null; |
- }, |
- onPause: _subscription.pause, |
- onResume: _subscription.resume); |
+ _controller = new StreamController( |
+ sync: true, onListen: _subscription.resume, onCancel: () { |
+ _subscription.cancel(); |
+ _subscription = null; |
+ }, onPause: _subscription.pause, onResume: _subscription.resume); |
_webSockets[_serviceId] = this; |
- try { _socket._owner = this; } catch (_) {} |
+ try { |
+ _socket._owner = this; |
+ } catch (_) {} |
} |
StreamSubscription listen(void onData(message), |
- {Function onError, |
- void onDone(), |
- bool cancelOnError}) { |
+ {Function onError, void onDone(), bool cancelOnError}) { |
return _controller.stream.listen(onData, |
- onError: onError, |
- onDone: onDone, |
- cancelOnError: cancelOnError); |
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
} |
Duration get pingInterval => _pingInterval; |
@@ -1027,13 +1229,12 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { |
static bool _isReservedStatusCode(int code) { |
return code != null && |
- (code < WebSocketStatus.NORMAL_CLOSURE || |
+ (code < WebSocketStatus.NORMAL_CLOSURE || |
code == WebSocketStatus.RESERVED_1004 || |
code == WebSocketStatus.NO_STATUS_RECEIVED || |
code == WebSocketStatus.ABNORMAL_CLOSURE || |
(code > WebSocketStatus.INTERNAL_SERVER_ERROR && |
- code < WebSocketStatus.RESERVED_1015) || |
- (code >= WebSocketStatus.RESERVED_1015 && |
- code < 3000)); |
+ code < WebSocketStatus.RESERVED_1015) || |
+ (code >= WebSocketStatus.RESERVED_1015 && code < 3000)); |
} |
} |