Chromium Code Reviews| Index: sdk/lib/io/websocket_impl.dart |
| diff --git a/sdk/lib/io/websocket_impl.dart b/sdk/lib/io/websocket_impl.dart |
| index 265760690923f502512435c8e6d8df3d1fada22a..183500c6510808ffda8144d58c3fe99a8cd6a72a 100644 |
| --- a/sdk/lib/io/websocket_impl.dart |
| +++ b/sdk/lib/io/websocket_impl.dart |
| @@ -38,8 +38,8 @@ class _WebSocketOpcode { |
| * which is supplied through the [:handleData:]. As the protocol is processed, |
| * it'll output frame data as either a List<int> or String. |
| * |
| - * Important infomation about usage: Be sure you use cancelOnError, so the |
| - * socket will be closed when the processer encounter an error. Not using it |
| + * Important information about usage: Be sure you use cancelOnError, so the |
| + * socket will be closed when the processor encounter an error. Not using it |
| * will lead to undefined behaviour. |
| */ |
| // TODO(ajohnsen): make this transformer reusable? |
| @@ -54,6 +54,7 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
| int _state = START; |
| bool _fin = false; |
| + bool _compressed = false; |
| int _opcode = -1; |
| int _len = -1; |
| bool _masked = false; |
| @@ -71,7 +72,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
| final List _maskingBytes = new List(4); |
| final BytesBuilder _payload = new BytesBuilder(copy: false); |
| - _WebSocketProtocolTransformer([this._serverSide = false]); |
| + _WebSocketPerMessageDeflate _deflate; |
| + _WebSocketProtocolTransformer([this._serverSide = false, this._deflate]); |
| Stream bind(Stream stream) { |
| return new Stream.eventTransformed( |
| @@ -108,11 +110,18 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
| if (_state <= LEN_REST) { |
| if (_state == START) { |
| _fin = (byte & 0x80) != 0; |
|
Søren Gjesse
2015/06/25 15:41:27
Please add constants for FIN, RSV1, RSV2 and RSV3
|
| - if ((byte & 0x70) != 0) { |
| - // The RSV1, RSV2 bits RSV3 must be all zero. |
| + |
| + if ((byte & 0x40) != 0) { |
| + _compressed = true; |
| + } |
| + |
| + if ((byte & 0x20) != 0 || (byte & 0x10) != 0) { |
| + // The RSV2 and RSV3 bits must be all zero. |
| throw new WebSocketException("Protocol error"); |
| } |
| + |
| _opcode = (byte & 0xF); |
|
Søren Gjesse
2015/06/25 15:41:27
Please add a constant for the opcode mask.
|
| + |
| if (_opcode <= _WebSocketOpcode.BINARY) { |
| if (_opcode == _WebSocketOpcode.CONTINUATION) { |
| if (_currentMessageType == _WebSocketMessageType.NONE) { |
| @@ -284,12 +293,17 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
| void _messageFrameEnd() { |
| if (_fin) { |
| + var bytes = _payload.takeBytes(); |
| + if (_deflate != null && _compressed) { |
| + bytes = _deflate.processIncomingMessage(bytes); |
| + } |
| + |
| switch (_currentMessageType) { |
| case _WebSocketMessageType.TEXT: |
| - _eventSink.add(UTF8.decode(_payload.takeBytes())); |
| + _eventSink.add(UTF8.decode(bytes)); |
| break; |
| case _WebSocketMessageType.BINARY: |
| - _eventSink.add(_payload.takeBytes()); |
| + _eventSink.add(bytes); |
| break; |
| } |
| _currentMessageType = _WebSocketMessageType.NONE; |
| @@ -364,12 +378,13 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { |
| final StreamController<WebSocket> _controller = |
| new StreamController<WebSocket>(sync: true); |
| final Function _protocolSelector; |
| + final CompressionOptions _compression; |
| - _WebSocketTransformerImpl(this._protocolSelector); |
| + _WebSocketTransformerImpl(this._protocolSelector, this._compression); |
| Stream<WebSocket> bind(Stream<HttpRequest> stream) { |
| stream.listen((request) { |
| - _upgrade(request, _protocolSelector) |
| + _upgrade(request, _protocolSelector, _compression) |
| .then((WebSocket webSocket) => _controller.add(webSocket)) |
| .catchError(_controller.addError); |
| }, onDone: () { |
| @@ -379,7 +394,8 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { |
| return _controller.stream; |
| } |
| - static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) { |
| + static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector, |
| + CompressionOptions compression) { |
| var response = request.response; |
| if (!_isUpgradeRequest(request)) { |
| // Send error response. |
| @@ -404,10 +420,24 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { |
| if (protocol != null) { |
| response.headers.add("Sec-WebSocket-Protocol", protocol); |
| } |
| + |
|
Søren Gjesse
2015/06/25 15:41:27
Extract server side negotiation into a separate me
|
| + var extensionHeader = request.headers.value("Sec-WebSocket-Extensions"); |
| + |
| + if (extensionHeader == null) { |
| + extensionHeader = ""; |
| + } |
| + |
| + Iterable<List<String>> extensions = extensionHeader.split(",").map((it) => it.split("; ")); |
| + |
| + if (compression.enabled && extensions.any((x) => x[0] == "permessage-deflate")) { |
| + var opts = extensions.firstWhere((x) => x[0] == "permessage-deflate"); |
| + response.headers.add("Sec-WebSocket-Extensions", compression._createHeader(opts)); |
| + } |
| + |
| response.headers.contentLength = 0; |
| return response.detachSocket() |
| .then((socket) => new _WebSocketImpl._fromSocket( |
|
Søren Gjesse
2015/06/25 15:41:27
Compression not supported for server (no additiona
|
| - socket, protocol, true)); |
| + socket, protocol, compression, true)); |
| } |
| var protocols = request.headers['Sec-WebSocket-Protocol']; |
| @@ -464,12 +494,64 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { |
| } |
| } |
| +class _WebSocketPerMessageDeflate { |
| + bool noContextTakeover; |
| + int clientMaxWindowBits; |
| + int serverMaxWindowBits; |
| + bool serverSide; |
| + |
| + ZLibDecoder decoder; |
| + ZLibEncoder encoder; |
| + |
| + _WebSocketPerMessageDeflate({this.clientMaxWindowBits, |
|
Søren Gjesse
2015/06/25 15:41:27
No need to use optional named argument for interna
|
| + this.serverMaxWindowBits, this.noContextTakeover, |
| + this.serverSide: false}) { |
| + if (clientMaxWindowBits == null) { |
|
Søren Gjesse
2015/06/25 15:41:27
Make the caller always pass a non null value - it
|
| + clientMaxWindowBits = 15; |
|
Søren Gjesse
2015/06/25 15:41:27
Create a constant for this (maybe in CompressionOp
|
| + } |
| + |
| + if (serverMaxWindowBits == null) { |
| + serverMaxWindowBits = 15; |
| + } |
| + } |
| + |
| + void _ensureDecoder() { |
| + if (noContextTakeover || decoder == null) { |
| + decoder = new ZLibDecoder(windowBits: serverSide ? |
| + clientMaxWindowBits : serverMaxWindowBits); |
| + } |
| + } |
| + |
| + void _ensureEncoder() { |
| + if (noContextTakeover || encoder == null) { |
| + encoder = new ZLibEncoder(windowBits: serverSide ? |
| + serverMaxWindowBits : clientMaxWindowBits); |
| + } |
| + } |
| + |
| + List<int> processIncomingMessage(List<int> msg) { |
| + _ensureDecoder(); |
| + var builder = new BytesBuilder(); |
| + builder.add(msg); |
| + builder.add(const [0x00, 0x00, 0xff, 0xff]); |
|
Søren Gjesse
2015/06/25 15:41:27
For the noContextTakeover case set decoder to null
|
| + return decoder.convert(builder.takeBytes()); |
| + } |
| + |
| + List<int> processOutgoingMessage(List<int> msg) { |
| + _ensureEncoder(); |
| + var c = encoder.convert(msg); |
| + c = c.sublist(0, c.length - 4); |
|
Søren Gjesse
2015/06/25 15:41:27
Ditto.
|
| + return c; |
| + } |
| +} |
| // TODO(ajohnsen): Make this transformer reusable. |
| class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
| final _WebSocketImpl webSocket; |
| EventSink _eventSink; |
| + _WebSocketPerMessageDeflate _deflateHelper; |
| + |
| _WebSocketOutgoingTransformer(this.webSocket); |
| Stream bind(Stream stream) { |
| @@ -499,12 +581,24 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
| if (message is String) { |
| opcode = _WebSocketOpcode.TEXT; |
| data = UTF8.encode(message); |
| + |
| + if (_deflateHelper != null) { |
| + data = _deflateHelper.processOutgoingMessage(data); |
| + } |
| } else { |
| if (message is !List<int>) { |
| throw new ArgumentError(message); |
| } |
| opcode = _WebSocketOpcode.BINARY; |
| data = message; |
| + |
| + if (_deflateHelper != null) { |
| + data = _deflateHelper.processOutgoingMessage(data); |
| + } |
| + } |
| + |
| + if (_deflateHelper != null) { |
| + data = _deflateHelper.processOutgoingMessage(data); |
| } |
| } else { |
| opcode = _WebSocketOpcode.TEXT; |
| @@ -532,9 +626,12 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
| } |
| void addFrame(int opcode, List<int> data) => |
| - createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); |
| + createFrame(opcode, data, webSocket._serverSide, webSocket._deflate != null) |
| + .forEach((e) { |
| + _eventSink.add(e); |
| + }); |
| - static Iterable createFrame(int opcode, List<int> data, bool serverSide) { |
| + static Iterable createFrame(int opcode, List<int> data, bool serverSide, bool compressed) { |
| bool mask = !serverSide; // Masking not implemented for server. |
| int dataLength = data == null ? 0 : data.length; |
| // Determine the header size. |
| @@ -547,10 +644,19 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
| Uint8List header = new Uint8List(headerSize); |
| int index = 0; |
| // Set FIN and opcode. |
| - header[index++] = 0x80 | opcode; |
| + var hoc = 0; |
| + |
| + hoc |= 0x80; |
| + |
| + if (compressed) { |
| + hoc |= 0x40; |
| + } |
| + |
| + hoc |= opcode & 0xF; |
| + |
| + header[index++] = hoc; |
| // Determine size and position of length field. |
| int lengthBytes = 1; |
| - int firstLengthByte = 1; |
| if (dataLength > 65535) { |
| header[index++] = 127; |
| lengthBytes = 8; |
| @@ -766,11 +872,13 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { |
| int _outCloseCode; |
| String _outCloseReason; |
| Timer _closeTimer; |
| + _WebSocketPerMessageDeflate _deflate; |
| static final HttpClient _httpClient = new HttpClient(); |
| static Future<WebSocket> connect( |
| - String url, Iterable<String> protocols, Map<String, dynamic> headers) { |
| + String url, Iterable<String> protocols, Map<String, dynamic> headers, |
| + {CompressionOptions compression: CompressionOptions.DEFAULT}) { |
| Uri uri = Uri.parse(url); |
| if (uri.scheme != "ws" && uri.scheme != "wss") { |
| throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'"); |
| @@ -809,10 +917,14 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { |
| ..set(HttpHeaders.UPGRADE, "websocket") |
| ..set("Sec-WebSocket-Key", nonce) |
| ..set("Cache-Control", "no-cache") |
| - ..set("Sec-WebSocket-Version", "13"); |
| + ..set("Sec-WebSocket-Version", "13") |
| + ..set("Sec-WebSocket-Extensions", "permessage-deflate"); |
|
Søren Gjesse
2015/06/25 15:41:27
Isn't this already handled by adding Sec-WebSocket
|
| if (protocols != null) { |
| request.headers.add("Sec-WebSocket-Protocol", protocols.toList()); |
| } |
| + |
| + request.headers.add("Sec-WebSocket-Extensions", compression._createHeader()); |
| + |
| return request.close(); |
| }) |
| .then((response) { |
| @@ -848,18 +960,62 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { |
| } |
| } |
| var protocol = response.headers.value('Sec-WebSocket-Protocol'); |
| + |
|
Søren Gjesse
2015/06/25 15:41:27
Please extract this into a separate method called
|
| + String extensionHeader = response.headers.value('Sec-WebSocket-Extensions'); |
| + |
| + if (extensionHeader == null) { |
| + extensionHeader = ""; |
| + } |
| + |
| + Iterable<List<String>> extensions = extensionHeader |
| + .split(", ") |
| + .map((it) => it.split("; ")); |
| + |
| + _WebSocketPerMessageDeflate deflate; |
| + |
| + if (compression.enabled && extensions.any((x) => x[0] == "permessage-deflate")) { |
| + var opts = extensions.firstWhere((x) => x[0] == "permessage-deflate"); |
| + var noContextTakeover = opts.contains("client_no_context_takeover"); |
| + |
| + int getWindowBits(String type) { |
| + var o = opts.firstWhere((x) => |
| + x.startsWith("${type}_max_window_bits="), orElse: () => null); |
| + |
| + if (o == null) { |
| + return 15; |
| + } |
| + |
| + try { |
| + o = o.substring("client_max_window_bits=".length); |
|
Søren Gjesse
2015/06/25 15:41:27
client -> ${type} here as well?
Use
var paramete
|
| + o = int.parse(o); |
| + } catch (e) { |
| + return 15; |
| + } |
| + |
| + return o; |
| + } |
| + |
| + deflate = new _WebSocketPerMessageDeflate( |
| + clientMaxWindowBits: getWindowBits("client"), |
| + serverMaxWindowBits: getWindowBits("server"), |
| + noContextTakeover: noContextTakeover); |
| + } |
| + |
| return response.detachSocket() |
| - .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol)); |
| + .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol, |
| + compression, false, deflate)); |
| }); |
| } |
| _WebSocketImpl._fromSocket(this._socket, this.protocol, |
| - [this._serverSide = false]) { |
| + CompressionOptions compression, [this._serverSide = false, |
| + _WebSocketPerMessageDeflate deflate]) { |
| _consumer = new _WebSocketConsumer(this, _socket); |
| _sink = new _StreamSinkImpl(_consumer); |
| _readyState = WebSocket.OPEN; |
| + _deflate = deflate; |
| - var transformer = new _WebSocketProtocolTransformer(_serverSide); |
| + var transformer = new _WebSocketProtocolTransformer(_serverSide, _deflate); |
| _subscription = _socket.transform(transformer).listen( |
| (data) { |
| if (data is _WebSocketPing) { |