Chromium Code Reviews| Index: sdk/lib/io/websocket_impl.dart |
| diff --git a/sdk/lib/io/websocket_impl.dart b/sdk/lib/io/websocket_impl.dart |
| index cc68544e536233a5e43d63a699e01d4d58ce5b4e..eb18b7c7b4bba56f903ced5a2d0597990ee5fbc7 100644 |
| --- a/sdk/lib/io/websocket_impl.dart |
| +++ b/sdk/lib/io/websocket_impl.dart |
| @@ -13,7 +13,6 @@ class _WebSocketMessageType { |
| static const int BINARY = 2; |
| } |
| - |
| class _WebSocketOpcode { |
| static const int CONTINUATION = 0; |
| static const int TEXT = 1; |
| @@ -38,8 +37,8 @@ class _WebSocketOpcode { |
| * which is supplied through the [:handleData:]. As the protocol is processed, |
| * it'll output frame data as either a List<int> or String. |
| * |
| - * Important infomation about usage: Be sure you use cancelOnError, so the |
| - * socket will be closed when the processer encounter an error. Not using it |
| + * Important information about usage: Be sure you use cancelOnError, so the |
| + * socket will be closed when the processor encounter an error. Not using it |
| * will lead to undefined behaviour. |
| */ |
| // TODO(ajohnsen): make this transformer reusable? |
| @@ -51,9 +50,15 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
| static const int PAYLOAD = 4; |
| static const int CLOSED = 5; |
| static const int FAILURE = 6; |
| + static const int FIN = 0x80; |
| + static const int RSV1 = 0x40; |
| + static const int RSV2 = 0x20; |
| + static const int RSV3 = 0x10; |
| + static const int OPCODE = 0xF; |
| int _state = START; |
| bool _fin = false; |
| + bool _compressed = false; |
| int _opcode = -1; |
| int _len = -1; |
| bool _masked = false; |
| @@ -71,18 +76,17 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
| final List _maskingBytes = new List(4); |
| final BytesBuilder _payload = new BytesBuilder(copy: false); |
| - _WebSocketProtocolTransformer([this._serverSide = false]); |
| + _WebSocketPerMessageDeflate _deflate; |
| + _WebSocketProtocolTransformer([this._serverSide = false, this._deflate]); |
| Stream bind(Stream stream) { |
| - return new Stream.eventTransformed( |
| - stream, |
| - (EventSink eventSink) { |
| - if (_eventSink != null) { |
| - throw new StateError("WebSocket transformer already used."); |
| - } |
| - _eventSink = eventSink; |
| - return this; |
| - }); |
| + return new Stream.eventTransformed(stream, (EventSink eventSink) { |
| + if (_eventSink != null) { |
| + throw new StateError("WebSocket transformer already used."); |
| + } |
| + _eventSink = eventSink; |
| + return this; |
| + }); |
| } |
| void addError(Object error, [StackTrace stackTrace]) => |
| @@ -108,11 +112,20 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
| if (_state <= LEN_REST) { |
| if (_state == START) { |
| _fin = (byte & 0x80) != 0; |
| - if ((byte & 0x70) != 0) { |
| + var rsv = (byte & 0x70) >> 4; |
|
Søren Gjesse
2015/08/24 08:21:50
I don't think this temp is required - see below.
|
| + |
| + if (rsv != 0 && rsv != 4) { |
|
Søren Gjesse
2015/08/24 08:21:50
Please use
if ((byte & (RSV1 | RSV2)) != 0)
to
|
| // The RSV1, RSV2 bits RSV3 must be all zero. |
|
Søren Gjesse
2015/08/24 08:21:50
Please remove RSV3 from this comment.
|
| throw new WebSocketException("Protocol error"); |
| } |
| - _opcode = (byte & 0xF); |
| + |
| + if (rsv == 4) { |
|
Søren Gjesse
2015/08/24 08:21:50
Please use
if ((byte & RSV3) != 0) {
to test a
|
| + _compressed = true; |
| + } else { |
| + _compressed = false; |
| + } |
| + _opcode = (byte & 0x0F); |
|
Søren Gjesse
2015/08/24 08:21:50
Please use the new constant OPCODE here.
|
| + |
| if (_opcode <= _WebSocketOpcode.BINARY) { |
| if (_opcode == _WebSocketOpcode.CONTINUATION) { |
| if (_currentMessageType == _WebSocketMessageType.NONE) { |
| @@ -120,14 +133,14 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
| } |
| } else { |
| assert(_opcode == _WebSocketOpcode.TEXT || |
| - _opcode == _WebSocketOpcode.BINARY); |
| + _opcode == _WebSocketOpcode.BINARY); |
| if (_currentMessageType != _WebSocketMessageType.NONE) { |
| throw new WebSocketException("Protocol error"); |
| } |
| _currentMessageType = _opcode; |
| } |
| } else if (_opcode >= _WebSocketOpcode.CLOSE && |
| - _opcode <= _WebSocketOpcode.PONG) { |
| + _opcode <= _WebSocketOpcode.PONG) { |
| // Control frames cannot be fragmented. |
| if (!_fin) throw new WebSocketException("Protocol error"); |
| } else { |
| @@ -176,15 +189,14 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
| _unmask(index, payloadLength, buffer); |
| } |
| // Control frame and data frame share _payloads. |
| - _payload.add( |
| - new Uint8List.view(buffer.buffer, index, payloadLength)); |
| + _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength)); |
| index += payloadLength; |
| if (_isControlFrame()) { |
| if (_remainingPayloadBytes == 0) _controlFrameEnd(); |
| } else { |
| if (_currentMessageType != _WebSocketMessageType.TEXT && |
| _currentMessageType != _WebSocketMessageType.BINARY) { |
| - throw new WebSocketException("Protocol error"); |
| + throw new WebSocketException("Protocol error"); |
| } |
| if (_remainingPayloadBytes == 0) _messageFrameEnd(); |
| } |
| @@ -219,8 +231,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
| mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; |
| } |
| Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); |
| - Int32x4List blockBuffer = new Int32x4List.view( |
| - buffer.buffer, index, blockCount); |
| + Int32x4List blockBuffer = |
| + new Int32x4List.view(buffer.buffer, index, blockCount); |
| for (int i = 0; i < blockBuffer.length; i++) { |
| blockBuffer[i] ^= blockMask; |
| } |
| @@ -284,12 +296,17 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
| void _messageFrameEnd() { |
| if (_fin) { |
| + var bytes = _payload.takeBytes(); |
| + if (_deflate != null && _compressed) { |
| + bytes = _deflate.processIncomingMessage(bytes); |
| + } |
| + |
| switch (_currentMessageType) { |
| case _WebSocketMessageType.TEXT: |
| - _eventSink.add(UTF8.decode(_payload.takeBytes())); |
| + _eventSink.add(UTF8.decode(bytes)); |
| break; |
| case _WebSocketMessageType.BINARY: |
| - _eventSink.add(_payload.takeBytes()); |
| + _eventSink.add(bytes); |
| break; |
| } |
| _currentMessageType = _WebSocketMessageType.NONE; |
| @@ -331,8 +348,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
| bool _isControlFrame() { |
| return _opcode == _WebSocketOpcode.CLOSE || |
| - _opcode == _WebSocketOpcode.PING || |
| - _opcode == _WebSocketOpcode.PONG; |
| + _opcode == _WebSocketOpcode.PING || |
| + _opcode == _WebSocketOpcode.PONG; |
| } |
| void _prepareForNextFrame() { |
| @@ -347,31 +364,29 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
| } |
| } |
| - |
| class _WebSocketPing { |
| final List<int> payload; |
| _WebSocketPing([this.payload = null]); |
| } |
| - |
| class _WebSocketPong { |
| final List<int> payload; |
| _WebSocketPong([this.payload = null]); |
| } |
| - |
| class _WebSocketTransformerImpl implements WebSocketTransformer { |
| final StreamController<WebSocket> _controller = |
| new StreamController<WebSocket>(sync: true); |
| final Function _protocolSelector; |
| + final CompressionOptions _compression; |
| - _WebSocketTransformerImpl(this._protocolSelector); |
| + _WebSocketTransformerImpl(this._protocolSelector, this._compression); |
| Stream<WebSocket> bind(Stream<HttpRequest> stream) { |
| stream.listen((request) { |
| - _upgrade(request, _protocolSelector) |
| - .then((WebSocket webSocket) => _controller.add(webSocket)) |
| - .catchError(_controller.addError); |
| + _upgrade(request, _protocolSelector, _compression) |
| + .then((WebSocket webSocket) => _controller.add(webSocket)) |
| + .catchError(_controller.addError); |
| }, onDone: () { |
| _controller.close(); |
| }); |
| @@ -379,13 +394,14 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { |
| return _controller.stream; |
| } |
| - static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) { |
| + static Future<WebSocket> _upgrade( |
| + HttpRequest request, _protocolSelector, CompressionOptions compression) { |
| var response = request.response; |
| if (!_isUpgradeRequest(request)) { |
| // Send error response. |
| response |
| - ..statusCode = HttpStatus.BAD_REQUEST |
| - ..close(); |
| + ..statusCode = HttpStatus.BAD_REQUEST |
| + ..close(); |
| return new Future.error( |
| new WebSocketException("Invalid WebSocket upgrade request")); |
| } |
| @@ -393,9 +409,9 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { |
| Future upgrade(String protocol) { |
| // Send the upgrade response. |
| response |
| - ..statusCode = HttpStatus.SWITCHING_PROTOCOLS |
| - ..headers.add(HttpHeaders.CONNECTION, "Upgrade") |
| - ..headers.add(HttpHeaders.UPGRADE, "websocket"); |
| + ..statusCode = HttpStatus.SWITCHING_PROTOCOLS |
| + ..headers.add(HttpHeaders.CONNECTION, "Upgrade") |
| + ..headers.add(HttpHeaders.UPGRADE, "websocket"); |
| String key = request.headers.value("Sec-WebSocket-Key"); |
| _SHA1 sha1 = new _SHA1(); |
| sha1.add("$key$_webSocketGUID".codeUnits); |
| @@ -404,10 +420,13 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { |
| if (protocol != null) { |
| response.headers.add("Sec-WebSocket-Protocol", protocol); |
| } |
| + |
| + var deflate = _negotiateCompression(request, response, compression); |
| + |
| response.headers.contentLength = 0; |
| - return response.detachSocket() |
| - .then((socket) => new _WebSocketImpl._fromSocket( |
| - socket, protocol, true)); |
| + return response.detachSocket().then((socket) => |
| + new _WebSocketImpl._fromSocket( |
| + socket, protocol, compression, true, deflate)); |
| } |
| var protocols = request.headers['Sec-WebSocket-Protocol']; |
| @@ -416,26 +435,58 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { |
| // consisting of multiple protocols. To unify all of them, first join |
| // the lists with ', ' and then tokenize. |
| protocols = _HttpParser._tokenizeFieldValue(protocols.join(', ')); |
| - return new Future(() => _protocolSelector(protocols)) |
| - .then((protocol) { |
| - if (protocols.indexOf(protocol) < 0) { |
| - throw new WebSocketException( |
| - "Selected protocol is not in the list of available protocols"); |
| - } |
| - return protocol; |
| - }) |
| - .catchError((error) { |
| - response |
| - ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR |
| - ..close(); |
| - throw error; |
| - }) |
| - .then(upgrade); |
| + return new Future(() => _protocolSelector(protocols)).then((protocol) { |
| + if (protocols.indexOf(protocol) < 0) { |
| + throw new WebSocketException( |
| + "Selected protocol is not in the list of available protocols"); |
| + } |
| + return protocol; |
| + }).catchError((error) { |
| + response |
| + ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR |
| + ..close(); |
| + throw error; |
| + }).then(upgrade); |
| } else { |
| return upgrade(null); |
| } |
| } |
| + static _WebSocketPerMessageDeflate _negotiateCompression(HttpRequest request, |
| + HttpResponse response, CompressionOptions compression) { |
| + var extensionHeader = request.headers.value("Sec-WebSocket-Extensions"); |
| + |
| + if (extensionHeader == null) { |
| + extensionHeader = ""; |
| + } |
| + |
| + Iterable<List<String>> extensions = |
| + extensionHeader.split(",").map((it) => it.split("; ")); |
|
Søren Gjesse
2015/08/24 08:21:50
Both the splitting using "," and "; " seems fragil
|
| + |
| + var perMessageDeflate = extensions.firstWhere( |
| + (x) => x[0] == _WebSocketImpl.PER_MESSAGE_DEFLATE, |
| + orElse: () => null); |
| + if (compression.enabled && perMessageDeflate != null) { |
| + var info = compression._createHeader(perMessageDeflate); |
| + |
| + response.headers.add("Sec-WebSocket-Extensions", info[0]); |
| + var serverNoContextTakeover = |
| + perMessageDeflate.contains("server_no_context_takeover"); |
| + var clientNoContextTakeover = |
| + perMessageDeflate.contains("client_no_context_takeover"); |
| + var deflate = new _WebSocketPerMessageDeflate( |
| + serverNoContextTakeover: serverNoContextTakeover, |
| + clientNoContextTakeover: clientNoContextTakeover, |
| + serverMaxWindowBits: info[1], |
| + clientMaxWindowBits: info[1], |
| + serverSide: true); |
| + |
| + return deflate; |
| + } |
| + |
| + return null; |
| + } |
| + |
| static bool _isUpgradeRequest(HttpRequest request) { |
| if (request.method != "GET") { |
| return false; |
| @@ -464,24 +515,122 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { |
| } |
| } |
| +class _WebSocketPerMessageDeflate { |
| + bool serverNoContextTakeover; |
| + bool clientNoContextTakeover; |
| + int clientMaxWindowBits; |
| + int serverMaxWindowBits; |
| + bool serverSide; |
| + |
| + _Filter decoder; |
| + _Filter encoder; |
| + |
| + _WebSocketPerMessageDeflate( |
| + {this.clientMaxWindowBits, |
|
Søren Gjesse
2015/08/24 08:21:50
nit: Please indent like this (one space more for t
|
| + this.serverMaxWindowBits, |
| + this.serverNoContextTakeover: false, |
| + this.clientNoContextTakeover: false, |
| + this.serverSide: false}) { |
| + if (clientMaxWindowBits == null) { |
| + clientMaxWindowBits = _WebSocketImpl.DEFAULT_WINDOW_BITS; |
| + } |
| + |
| + if (serverMaxWindowBits == null) { |
| + serverMaxWindowBits = _WebSocketImpl.DEFAULT_WINDOW_BITS; |
| + } |
| + } |
| + |
| + void _ensureDecoder() { |
| + if (decoder == null) { |
| + decoder = _Filter._newZLibInflateFilter( |
| + serverSide ? clientMaxWindowBits : serverMaxWindowBits, null, true); |
| + } |
| + } |
| + |
| + void _ensureEncoder() { |
| + if (encoder == null) { |
| + encoder = _Filter._newZLibDeflateFilter( |
| + false, |
| + ZLibOption.DEFAULT_LEVEL, |
| + serverSide ? serverMaxWindowBits : clientMaxWindowBits, |
| + ZLibOption.DEFAULT_MEM_LEVEL, |
| + ZLibOption.STRATEGY_DEFAULT, |
| + null, |
| + true); |
| + } |
| + } |
| + |
| + List<int> processIncomingMessage(List<int> msg) { |
| + _ensureDecoder(); |
| + |
| + var data = []; |
| + data.addAll(msg); |
| + data.addAll(const [0x00, 0x00, 0xff, 0xff]); |
| + |
| + decoder.process(data, 0, data.length); |
| + var reuse = |
| + !(serverSide ? clientNoContextTakeover : serverNoContextTakeover); |
| + var result = []; |
| + var out; |
| + |
| + while ((out = decoder.processed(flush: reuse)) != null) { |
| + result.addAll(out); |
| + } |
| + |
| + decoder.processed(flush: reuse); |
| + |
| + if (!reuse) { |
| + decoder.end(); |
| + decoder = null; |
| + } |
| + return result; |
| + } |
| + |
| + List<int> processOutgoingMessage(List<int> msg) { |
| + _ensureEncoder(); |
| + var reuse = |
| + !(serverSide ? serverNoContextTakeover : clientNoContextTakeover); |
| + var result = []; |
| + var out; |
| + |
| + encoder.process(msg, 0, msg.length); |
| + |
| + while ((out = encoder.processed(flush: reuse)) != null) { |
| + result.addAll(out); |
| + } |
| + |
| + if (serverSide ? serverNoContextTakeover : clientNoContextTakeover) { |
| + encoder.end(); |
| + encoder = null; |
| + } |
| + |
| + if (result.length > 4) { |
| + result = result.sublist(0, result.length - 4); |
| + } |
| + |
| + return result; |
| + } |
| +} |
| // TODO(ajohnsen): Make this transformer reusable. |
| class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
| final _WebSocketImpl webSocket; |
| EventSink _eventSink; |
| - _WebSocketOutgoingTransformer(this.webSocket); |
| + _WebSocketPerMessageDeflate _deflateHelper; |
| + |
| + _WebSocketOutgoingTransformer(this.webSocket) { |
| + _deflateHelper = webSocket._deflate; |
| + } |
| Stream bind(Stream stream) { |
| - return new Stream.eventTransformed( |
| - stream, |
| - (EventSink eventSink) { |
| - if (_eventSink != null) { |
| - throw new StateError("WebSocket transformer already used"); |
| - } |
| - _eventSink = eventSink; |
| - return this; |
| - }); |
| + return new Stream.eventTransformed(stream, (EventSink eventSink) { |
| + if (_eventSink != null) { |
| + throw new StateError("WebSocket transformer already used"); |
| + } |
| + _eventSink = eventSink; |
| + return this; |
| + }); |
| } |
| void add(message) { |
| @@ -500,12 +649,16 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
| opcode = _WebSocketOpcode.TEXT; |
| data = UTF8.encode(message); |
| } else { |
| - if (message is !List<int>) { |
| + if (message is! List<int>) { |
| throw new ArgumentError(message); |
| } |
| opcode = _WebSocketOpcode.BINARY; |
| data = message; |
| } |
| + |
| + if (_deflateHelper != null) { |
| + data = _deflateHelper.processOutgoingMessage(data); |
| + } |
| } else { |
| opcode = _WebSocketOpcode.TEXT; |
| } |
| @@ -531,11 +684,19 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
| _eventSink.close(); |
| } |
| - void addFrame(int opcode, List<int> data) => |
| - createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); |
| + void addFrame(int opcode, List<int> data) => createFrame( |
| + opcode, |
| + data, |
| + webSocket._serverSide, |
| + _deflateHelper != null && |
| + (opcode == _WebSocketOpcode.TEXT || |
| + opcode == _WebSocketOpcode.BINARY)).forEach((e) { |
| + _eventSink.add(e); |
| + }); |
| - static Iterable createFrame(int opcode, List<int> data, bool serverSide) { |
| - bool mask = !serverSide; // Masking not implemented for server. |
| + static Iterable createFrame( |
| + int opcode, List<int> data, bool serverSide, bool compressed) { |
| + bool mask = !serverSide; // Masking not implemented for server. |
| int dataLength = data == null ? 0 : data.length; |
| // Determine the header size. |
| int headerSize = (mask) ? 6 : 2; |
| @@ -546,11 +707,18 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
| } |
| Uint8List header = new Uint8List(headerSize); |
| int index = 0; |
| + |
| + var rsv = 0; |
| + if (compressed) { |
| + rsv = 4; |
| + } |
| + |
| // Set FIN and opcode. |
| - header[index++] = 0x80 | opcode; |
| + var hoc = (1 << 7) | (rsv % 8) << 4 | (opcode % 128); |
| + |
| + header[index++] = hoc; |
|
Søren Gjesse
2015/08/24 08:21:50
Just set this byte like this (no need to the rsv a
|
| // Determine size and position of length field. |
| int lengthBytes = 1; |
| - int firstLengthByte = 1; |
| if (dataLength > 65535) { |
| header[index++] = 127; |
| lengthBytes = 8; |
| @@ -580,8 +748,7 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
| list = new Uint8List(data.length); |
| for (int i = 0; i < data.length; i++) { |
| if (data[i] < 0 || 255 < data[i]) { |
| - throw new ArgumentError( |
| - "List element is not a byte value " |
| + throw new ArgumentError("List element is not a byte value " |
| "(value ${data[i]} at index $i)"); |
| } |
| list[i] = data[i]; |
| @@ -597,8 +764,8 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
| mask = (mask << 8) | maskBytes[i]; |
| } |
| Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); |
| - Int32x4List blockBuffer = new Int32x4List.view( |
| - list.buffer, 0, blockCount); |
| + Int32x4List blockBuffer = |
| + new Int32x4List.view(list.buffer, 0, blockCount); |
| for (int i = 0; i < blockBuffer.length; i++) { |
| blockBuffer[i] ^= blockMask; |
| } |
| @@ -619,7 +786,6 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
| } |
| } |
| - |
| class _WebSocketConsumer implements StreamConsumer { |
| final _WebSocketImpl webSocket; |
| final Socket socket; |
| @@ -664,28 +830,28 @@ class _WebSocketConsumer implements StreamConsumer { |
| _ensureController() { |
| if (_controller != null) return; |
| - _controller = new StreamController(sync: true, |
| - onPause: _onPause, |
| - onResume: _onResume, |
| - onCancel: _onListen); |
| - var stream = _controller.stream.transform( |
| - new _WebSocketOutgoingTransformer(webSocket)); |
| - socket.addStream(stream) |
| - .then((_) { |
| - _done(); |
| - _closeCompleter.complete(webSocket); |
| - }, onError: (error, StackTrace stackTrace) { |
| - _closed = true; |
| - _cancel(); |
| - if (error is ArgumentError) { |
| - if (!_done(error, stackTrace)) { |
| - _closeCompleter.completeError(error, stackTrace); |
| - } |
| - } else { |
| - _done(); |
| - _closeCompleter.complete(webSocket); |
| - } |
| - }); |
| + _controller = new StreamController( |
| + sync: true, |
| + onPause: _onPause, |
| + onResume: _onResume, |
| + onCancel: _onListen); |
| + var stream = _controller.stream |
| + .transform(new _WebSocketOutgoingTransformer(webSocket)); |
| + socket.addStream(stream).then((_) { |
| + _done(); |
| + _closeCompleter.complete(webSocket); |
| + }, onError: (error, StackTrace stackTrace) { |
| + _closed = true; |
| + _cancel(); |
| + if (error is ArgumentError) { |
| + if (!_done(error, stackTrace)) { |
| + _closeCompleter.completeError(error, stackTrace); |
| + } |
| + } else { |
| + _done(); |
| + _closeCompleter.complete(webSocket); |
| + } |
| + }); |
| } |
| bool _done([error, StackTrace stackTrace]) { |
| @@ -706,13 +872,9 @@ class _WebSocketConsumer implements StreamConsumer { |
| } |
| _ensureController(); |
| _completer = new Completer(); |
| - _subscription = stream.listen( |
| - (data) { |
| - _controller.add(data); |
| - }, |
| - onDone: _done, |
| - onError: _done, |
| - cancelOnError: true); |
| + _subscription = stream.listen((data) { |
| + _controller.add(data); |
| + }, onDone: _done, onError: _done, cancelOnError: true); |
| if (_issuedPause) { |
| _subscription.pause(); |
| _issuedPause = false; |
| @@ -742,10 +904,11 @@ class _WebSocketConsumer implements StreamConsumer { |
| } |
| } |
| - |
| class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { |
| // Use default Map so we keep order. |
| static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>(); |
| + static const int DEFAULT_WINDOW_BITS = 15; |
| + static const String PER_MESSAGE_DEFLATE = "permessage-deflate"; |
| final String protocol; |
| @@ -766,11 +929,13 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { |
| int _outCloseCode; |
| String _outCloseReason; |
| Timer _closeTimer; |
| + _WebSocketPerMessageDeflate _deflate; |
| static final HttpClient _httpClient = new HttpClient(); |
| static Future<WebSocket> connect( |
| - String url, Iterable<String> protocols, Map<String, dynamic> headers) { |
| + String url, Iterable<String> protocols, Map<String, dynamic> headers, |
| + {CompressionOptions compression: CompressionOptions.DEFAULT}) { |
| Uri uri = Uri.parse(url); |
| if (uri.scheme != "ws" && uri.scheme != "wss") { |
| throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'"); |
| @@ -784,144 +949,189 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { |
| } |
| String nonce = _CryptoUtils.bytesToBase64(nonceData); |
| - uri = new Uri(scheme: uri.scheme == "wss" ? "https" : "http", |
| - userInfo: uri.userInfo, |
| - host: uri.host, |
| - port: uri.port, |
| - path: uri.path, |
| - query: uri.query, |
| - fragment: uri.fragment); |
| - return _httpClient.openUrl("GET", uri) |
| - .then((request) { |
| - if (uri.userInfo != null && !uri.userInfo.isEmpty) { |
| - // If the URL contains user information use that for basic |
| - // authorization. |
| - String auth = |
| - _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo)); |
| - request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); |
| - } |
| - if (headers != null) { |
| - headers.forEach((field, value) => request.headers.add(field, value)); |
| - } |
| - // Setup the initial handshake. |
| - request.headers |
| - ..set(HttpHeaders.CONNECTION, "Upgrade") |
| - ..set(HttpHeaders.UPGRADE, "websocket") |
| - ..set("Sec-WebSocket-Key", nonce) |
| - ..set("Cache-Control", "no-cache") |
| - ..set("Sec-WebSocket-Version", "13"); |
| - if (protocols != null) { |
| - request.headers.add("Sec-WebSocket-Protocol", protocols.toList()); |
| - } |
| - return request.close(); |
| - }) |
| - .then((response) { |
| - void error(String message) { |
| - // Flush data. |
| - response.detachSocket().then((socket) { |
| - socket.destroy(); |
| - }); |
| - throw new WebSocketException(message); |
| - } |
| - if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS || |
| - response.headers[HttpHeaders.CONNECTION] == null || |
| - !response.headers[HttpHeaders.CONNECTION].any( |
| - (value) => value.toLowerCase() == "upgrade") || |
| - response.headers.value(HttpHeaders.UPGRADE).toLowerCase() != |
| - "websocket") { |
| - error("Connection to '$uri' was not upgraded to websocket"); |
| - } |
| - String accept = response.headers.value("Sec-WebSocket-Accept"); |
| - if (accept == null) { |
| - error("Response did not contain a 'Sec-WebSocket-Accept' header"); |
| + uri = new Uri( |
| + scheme: uri.scheme == "wss" ? "https" : "http", |
| + userInfo: uri.userInfo, |
| + host: uri.host, |
| + port: uri.port, |
| + path: uri.path, |
| + query: uri.query, |
| + fragment: uri.fragment); |
| + return _httpClient.openUrl("GET", uri).then((request) { |
| + if (uri.userInfo != null && !uri.userInfo.isEmpty) { |
| + // If the URL contains user information use that for basic |
| + // authorization. |
| + String auth = _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo)); |
| + request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); |
| + } |
| + if (headers != null) { |
| + headers.forEach((field, value) => request.headers.add(field, value)); |
| + } |
| + // Setup the initial handshake. |
| + request.headers |
| + ..set(HttpHeaders.CONNECTION, "Upgrade") |
| + ..set(HttpHeaders.UPGRADE, "websocket") |
| + ..set("Sec-WebSocket-Key", nonce) |
| + ..set("Cache-Control", "no-cache") |
| + ..set("Sec-WebSocket-Version", "13"); |
| + if (protocols != null) { |
| + request.headers.add("Sec-WebSocket-Protocol", protocols.toList()); |
| + } |
| + |
| + request.headers |
| + .add("Sec-WebSocket-Extensions", compression._createHeader()); |
| + |
| + return request.close(); |
| + }).then((response) { |
| + void error(String message) { |
| + // Flush data. |
| + response.detachSocket().then((socket) { |
| + socket.destroy(); |
| + }); |
| + throw new WebSocketException(message); |
| + } |
| + if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS || |
| + response.headers[HttpHeaders.CONNECTION] == null || |
| + !response.headers[HttpHeaders.CONNECTION] |
| + .any((value) => value.toLowerCase() == "upgrade") || |
| + response.headers.value(HttpHeaders.UPGRADE).toLowerCase() != |
| + "websocket") { |
| + error("Connection to '$uri' was not upgraded to websocket"); |
| + } |
| + String accept = response.headers.value("Sec-WebSocket-Accept"); |
| + if (accept == null) { |
| + error("Response did not contain a 'Sec-WebSocket-Accept' header"); |
| + } |
| + _SHA1 sha1 = new _SHA1(); |
| + sha1.add("$nonce$_webSocketGUID".codeUnits); |
| + List<int> expectedAccept = sha1.close(); |
| + List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept); |
| + if (expectedAccept.length != receivedAccept.length) { |
| + error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length"); |
| + } |
| + for (int i = 0; i < expectedAccept.length; i++) { |
| + if (expectedAccept[i] != receivedAccept[i]) { |
| + error("Bad response 'Sec-WebSocket-Accept' header"); |
| } |
| - _SHA1 sha1 = new _SHA1(); |
| - sha1.add("$nonce$_webSocketGUID".codeUnits); |
| - List<int> expectedAccept = sha1.close(); |
| - List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept); |
| - if (expectedAccept.length != receivedAccept.length) { |
| - error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length"); |
| + } |
| + var protocol = response.headers.value('Sec-WebSocket-Protocol'); |
| + |
| + _WebSocketPerMessageDeflate deflate = |
| + negotiateClientCompression(response, compression); |
| + |
| + return response.detachSocket().then((socket) => |
| + new _WebSocketImpl._fromSocket( |
| + socket, protocol, compression, false, deflate)); |
| + }); |
| + } |
| + |
| + static _WebSocketPerMessageDeflate negotiateClientCompression( |
| + HttpClientResponse response, CompressionOptions compression) { |
| + String extensionHeader = response.headers.value('Sec-WebSocket-Extensions'); |
| + |
| + if (extensionHeader == null) { |
| + extensionHeader = ""; |
| + } |
| + |
| + Iterable<List<String>> extensions = |
| + extensionHeader.split(", ").map((it) => it.split("; ")); |
|
Søren Gjesse
2015/08/24 08:21:50
Please see above (comment on _negoatiateCompressio
|
| + |
| + if (compression.enabled && |
| + extensions.any((x) => x[0] == PER_MESSAGE_DEFLATE)) { |
| + var opts = extensions.firstWhere((x) => x[0] == PER_MESSAGE_DEFLATE); |
| + var serverNoContextTakeover = opts.contains("server_no_context_takeover"); |
| + var clientNoContextTakeover = opts.contains("client_no_context_takeover"); |
| + |
| + int getWindowBits(String type) { |
| + var o = opts.firstWhere((x) => x.startsWith("${type}_max_window_bits="), |
| + orElse: () => null); |
| + |
| + if (o == null) { |
| + return DEFAULT_WINDOW_BITS; |
| } |
| - for (int i = 0; i < expectedAccept.length; i++) { |
| - if (expectedAccept[i] != receivedAccept[i]) { |
| - error("Bad response 'Sec-WebSocket-Accept' header"); |
| - } |
| + |
| + try { |
| + o = o.substring("${type}_max_window_bits=".length); |
| + o = int.parse(o); |
| + } catch (e) { |
| + return DEFAULT_WINDOW_BITS; |
| } |
| - var protocol = response.headers.value('Sec-WebSocket-Protocol'); |
| - return response.detachSocket() |
| - .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol)); |
| - }); |
| + |
| + return o; |
| + } |
| + |
| + return new _WebSocketPerMessageDeflate( |
| + clientMaxWindowBits: getWindowBits("client"), |
| + serverMaxWindowBits: getWindowBits("server"), |
| + clientNoContextTakeover: clientNoContextTakeover, |
| + serverNoContextTakeover: serverNoContextTakeover); |
| + } |
| + |
| + return null; |
| } |
| - _WebSocketImpl._fromSocket(this._socket, this.protocol, |
| - [this._serverSide = false]) { |
| + _WebSocketImpl._fromSocket( |
| + this._socket, this.protocol, CompressionOptions compression, |
| + [this._serverSide = false, _WebSocketPerMessageDeflate deflate]) { |
| _consumer = new _WebSocketConsumer(this, _socket); |
| _sink = new _StreamSinkImpl(_consumer); |
| _readyState = WebSocket.OPEN; |
| - |
| - var transformer = new _WebSocketProtocolTransformer(_serverSide); |
| - _subscription = _socket.transform(transformer).listen( |
| - (data) { |
| - if (data is _WebSocketPing) { |
| - if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); |
| - } else if (data is _WebSocketPong) { |
| - // Simply set pingInterval, as it'll cancel any timers. |
| - pingInterval = _pingInterval; |
| - } else { |
| - _controller.add(data); |
| - } |
| - }, |
| - onError: (error) { |
| - if (_closeTimer != null) _closeTimer.cancel(); |
| - if (error is FormatException) { |
| - _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); |
| - } else { |
| - _close(WebSocketStatus.PROTOCOL_ERROR); |
| - } |
| - // An error happened, set the close code set above. |
| - _closeCode = _outCloseCode; |
| - _closeReason = _outCloseReason; |
| - _controller.close(); |
| - }, |
| - onDone: () { |
| - if (_closeTimer != null) _closeTimer.cancel(); |
| - if (_readyState == WebSocket.OPEN) { |
| - _readyState = WebSocket.CLOSING; |
| - if (!_isReservedStatusCode(transformer.closeCode)) { |
| - _close(transformer.closeCode, transformer.closeReason); |
| - } else { |
| - _close(); |
| - } |
| - _readyState = WebSocket.CLOSED; |
| - } |
| - // Protocol close, use close code from transformer. |
| - _closeCode = transformer.closeCode; |
| - _closeReason = transformer.closeReason; |
| - _controller.close(); |
| - }, |
| - cancelOnError: true); |
| + _deflate = deflate; |
| + |
| + var transformer = new _WebSocketProtocolTransformer(_serverSide, _deflate); |
| + _subscription = _socket.transform(transformer).listen((data) { |
| + if (data is _WebSocketPing) { |
| + if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); |
| + } else if (data is _WebSocketPong) { |
| + // Simply set pingInterval, as it'll cancel any timers. |
| + pingInterval = _pingInterval; |
| + } else { |
| + _controller.add(data); |
| + } |
| + }, onError: (error, stackTrace) { |
| + if (_closeTimer != null) _closeTimer.cancel(); |
| + if (error is FormatException) { |
| + _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); |
| + } else { |
| + _close(WebSocketStatus.PROTOCOL_ERROR); |
| + } |
| + // An error happened, set the close code set above. |
| + _closeCode = _outCloseCode; |
| + _closeReason = _outCloseReason; |
| + _controller.close(); |
| + }, onDone: () { |
| + if (_closeTimer != null) _closeTimer.cancel(); |
| + if (_readyState == WebSocket.OPEN) { |
| + _readyState = WebSocket.CLOSING; |
| + if (!_isReservedStatusCode(transformer.closeCode)) { |
| + _close(transformer.closeCode, transformer.closeReason); |
| + } else { |
| + _close(); |
| + } |
| + _readyState = WebSocket.CLOSED; |
| + } |
| + // Protocol close, use close code from transformer. |
| + _closeCode = transformer.closeCode; |
| + _closeReason = transformer.closeReason; |
| + _controller.close(); |
| + }, cancelOnError: true); |
| _subscription.pause(); |
| - _controller = new StreamController(sync: true, |
| - onListen: _subscription.resume, |
| - onCancel: () { |
| - _subscription.cancel(); |
| - _subscription = null; |
| - }, |
| - onPause: _subscription.pause, |
| - onResume: _subscription.resume); |
| + _controller = new StreamController( |
| + sync: true, onListen: _subscription.resume, onCancel: () { |
| + _subscription.cancel(); |
| + _subscription = null; |
| + }, onPause: _subscription.pause, onResume: _subscription.resume); |
| _webSockets[_serviceId] = this; |
| - try { _socket._owner = this; } catch (_) {} |
| + try { |
| + _socket._owner = this; |
| + } catch (_) {} |
| } |
| StreamSubscription listen(void onData(message), |
| - {Function onError, |
| - void onDone(), |
| - bool cancelOnError}) { |
| + {Function onError, void onDone(), bool cancelOnError}) { |
| return _controller.stream.listen(onData, |
| - onError: onError, |
| - onDone: onDone, |
| - cancelOnError: cancelOnError); |
| + onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
| } |
| Duration get pingInterval => _pingInterval; |
| @@ -1027,13 +1237,12 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { |
| static bool _isReservedStatusCode(int code) { |
| return code != null && |
| - (code < WebSocketStatus.NORMAL_CLOSURE || |
| + (code < WebSocketStatus.NORMAL_CLOSURE || |
| code == WebSocketStatus.RESERVED_1004 || |
| code == WebSocketStatus.NO_STATUS_RECEIVED || |
| code == WebSocketStatus.ABNORMAL_CLOSURE || |
| (code > WebSocketStatus.INTERNAL_SERVER_ERROR && |
| - code < WebSocketStatus.RESERVED_1015) || |
| - (code >= WebSocketStatus.RESERVED_1015 && |
| - code < 3000)); |
| + code < WebSocketStatus.RESERVED_1015) || |
| + (code >= WebSocketStatus.RESERVED_1015 && code < 3000)); |
| } |
| } |