 Chromium Code Reviews
 Chromium Code Reviews| Index: sdk/lib/io/websocket_impl.dart | 
| diff --git a/sdk/lib/io/websocket_impl.dart b/sdk/lib/io/websocket_impl.dart | 
| index cc68544e536233a5e43d63a699e01d4d58ce5b4e..eb18b7c7b4bba56f903ced5a2d0597990ee5fbc7 100644 | 
| --- a/sdk/lib/io/websocket_impl.dart | 
| +++ b/sdk/lib/io/websocket_impl.dart | 
| @@ -13,7 +13,6 @@ class _WebSocketMessageType { | 
| static const int BINARY = 2; | 
| } | 
| - | 
| class _WebSocketOpcode { | 
| static const int CONTINUATION = 0; | 
| static const int TEXT = 1; | 
| @@ -38,8 +37,8 @@ class _WebSocketOpcode { | 
| * which is supplied through the [:handleData:]. As the protocol is processed, | 
| * it'll output frame data as either a List<int> or String. | 
| * | 
| - * Important infomation about usage: Be sure you use cancelOnError, so the | 
| - * socket will be closed when the processer encounter an error. Not using it | 
| + * Important information about usage: Be sure you use cancelOnError, so the | 
| + * socket will be closed when the processor encounter an error. Not using it | 
| * will lead to undefined behaviour. | 
| */ | 
| // TODO(ajohnsen): make this transformer reusable? | 
| @@ -51,9 +50,15 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { | 
| static const int PAYLOAD = 4; | 
| static const int CLOSED = 5; | 
| static const int FAILURE = 6; | 
| + static const int FIN = 0x80; | 
| + static const int RSV1 = 0x40; | 
| + static const int RSV2 = 0x20; | 
| + static const int RSV3 = 0x10; | 
| + static const int OPCODE = 0xF; | 
| int _state = START; | 
| bool _fin = false; | 
| + bool _compressed = false; | 
| int _opcode = -1; | 
| int _len = -1; | 
| bool _masked = false; | 
| @@ -71,18 +76,17 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { | 
| final List _maskingBytes = new List(4); | 
| final BytesBuilder _payload = new BytesBuilder(copy: false); | 
| - _WebSocketProtocolTransformer([this._serverSide = false]); | 
| + _WebSocketPerMessageDeflate _deflate; | 
| + _WebSocketProtocolTransformer([this._serverSide = false, this._deflate]); | 
| Stream bind(Stream stream) { | 
| - return new Stream.eventTransformed( | 
| - stream, | 
| - (EventSink eventSink) { | 
| - if (_eventSink != null) { | 
| - throw new StateError("WebSocket transformer already used."); | 
| - } | 
| - _eventSink = eventSink; | 
| - return this; | 
| - }); | 
| + return new Stream.eventTransformed(stream, (EventSink eventSink) { | 
| + if (_eventSink != null) { | 
| + throw new StateError("WebSocket transformer already used."); | 
| + } | 
| + _eventSink = eventSink; | 
| + return this; | 
| + }); | 
| } | 
| void addError(Object error, [StackTrace stackTrace]) => | 
| @@ -108,11 +112,20 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { | 
| if (_state <= LEN_REST) { | 
| if (_state == START) { | 
| _fin = (byte & 0x80) != 0; | 
| - if ((byte & 0x70) != 0) { | 
| + var rsv = (byte & 0x70) >> 4; | 
| 
Søren Gjesse
2015/08/24 08:21:50
I don't think this temp is required - see below.
 | 
| + | 
| + if (rsv != 0 && rsv != 4) { | 
| 
Søren Gjesse
2015/08/24 08:21:50
Please use
  if ((byte & (RSV1 | RSV2)) != 0)
to
 | 
| // The RSV1, RSV2 bits RSV3 must be all zero. | 
| 
Søren Gjesse
2015/08/24 08:21:50
Please remove RSV3 from this comment.
 | 
| throw new WebSocketException("Protocol error"); | 
| } | 
| - _opcode = (byte & 0xF); | 
| + | 
| + if (rsv == 4) { | 
| 
Søren Gjesse
2015/08/24 08:21:50
Please use
  if ((byte & RSV3) != 0) {
to test a
 | 
| + _compressed = true; | 
| + } else { | 
| + _compressed = false; | 
| + } | 
| + _opcode = (byte & 0x0F); | 
| 
Søren Gjesse
2015/08/24 08:21:50
Please use the new constant OPCODE here.
 | 
| + | 
| if (_opcode <= _WebSocketOpcode.BINARY) { | 
| if (_opcode == _WebSocketOpcode.CONTINUATION) { | 
| if (_currentMessageType == _WebSocketMessageType.NONE) { | 
| @@ -120,14 +133,14 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { | 
| } | 
| } else { | 
| assert(_opcode == _WebSocketOpcode.TEXT || | 
| - _opcode == _WebSocketOpcode.BINARY); | 
| + _opcode == _WebSocketOpcode.BINARY); | 
| if (_currentMessageType != _WebSocketMessageType.NONE) { | 
| throw new WebSocketException("Protocol error"); | 
| } | 
| _currentMessageType = _opcode; | 
| } | 
| } else if (_opcode >= _WebSocketOpcode.CLOSE && | 
| - _opcode <= _WebSocketOpcode.PONG) { | 
| + _opcode <= _WebSocketOpcode.PONG) { | 
| // Control frames cannot be fragmented. | 
| if (!_fin) throw new WebSocketException("Protocol error"); | 
| } else { | 
| @@ -176,15 +189,14 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { | 
| _unmask(index, payloadLength, buffer); | 
| } | 
| // Control frame and data frame share _payloads. | 
| - _payload.add( | 
| - new Uint8List.view(buffer.buffer, index, payloadLength)); | 
| + _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength)); | 
| index += payloadLength; | 
| if (_isControlFrame()) { | 
| if (_remainingPayloadBytes == 0) _controlFrameEnd(); | 
| } else { | 
| if (_currentMessageType != _WebSocketMessageType.TEXT && | 
| _currentMessageType != _WebSocketMessageType.BINARY) { | 
| - throw new WebSocketException("Protocol error"); | 
| + throw new WebSocketException("Protocol error"); | 
| } | 
| if (_remainingPayloadBytes == 0) _messageFrameEnd(); | 
| } | 
| @@ -219,8 +231,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { | 
| mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; | 
| } | 
| Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); | 
| - Int32x4List blockBuffer = new Int32x4List.view( | 
| - buffer.buffer, index, blockCount); | 
| + Int32x4List blockBuffer = | 
| + new Int32x4List.view(buffer.buffer, index, blockCount); | 
| for (int i = 0; i < blockBuffer.length; i++) { | 
| blockBuffer[i] ^= blockMask; | 
| } | 
| @@ -284,12 +296,17 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { | 
| void _messageFrameEnd() { | 
| if (_fin) { | 
| + var bytes = _payload.takeBytes(); | 
| + if (_deflate != null && _compressed) { | 
| + bytes = _deflate.processIncomingMessage(bytes); | 
| + } | 
| + | 
| switch (_currentMessageType) { | 
| case _WebSocketMessageType.TEXT: | 
| - _eventSink.add(UTF8.decode(_payload.takeBytes())); | 
| + _eventSink.add(UTF8.decode(bytes)); | 
| break; | 
| case _WebSocketMessageType.BINARY: | 
| - _eventSink.add(_payload.takeBytes()); | 
| + _eventSink.add(bytes); | 
| break; | 
| } | 
| _currentMessageType = _WebSocketMessageType.NONE; | 
| @@ -331,8 +348,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { | 
| bool _isControlFrame() { | 
| return _opcode == _WebSocketOpcode.CLOSE || | 
| - _opcode == _WebSocketOpcode.PING || | 
| - _opcode == _WebSocketOpcode.PONG; | 
| + _opcode == _WebSocketOpcode.PING || | 
| + _opcode == _WebSocketOpcode.PONG; | 
| } | 
| void _prepareForNextFrame() { | 
| @@ -347,31 +364,29 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { | 
| } | 
| } | 
| - | 
| class _WebSocketPing { | 
| final List<int> payload; | 
| _WebSocketPing([this.payload = null]); | 
| } | 
| - | 
| class _WebSocketPong { | 
| final List<int> payload; | 
| _WebSocketPong([this.payload = null]); | 
| } | 
| - | 
| class _WebSocketTransformerImpl implements WebSocketTransformer { | 
| final StreamController<WebSocket> _controller = | 
| new StreamController<WebSocket>(sync: true); | 
| final Function _protocolSelector; | 
| + final CompressionOptions _compression; | 
| - _WebSocketTransformerImpl(this._protocolSelector); | 
| + _WebSocketTransformerImpl(this._protocolSelector, this._compression); | 
| Stream<WebSocket> bind(Stream<HttpRequest> stream) { | 
| stream.listen((request) { | 
| - _upgrade(request, _protocolSelector) | 
| - .then((WebSocket webSocket) => _controller.add(webSocket)) | 
| - .catchError(_controller.addError); | 
| + _upgrade(request, _protocolSelector, _compression) | 
| + .then((WebSocket webSocket) => _controller.add(webSocket)) | 
| + .catchError(_controller.addError); | 
| }, onDone: () { | 
| _controller.close(); | 
| }); | 
| @@ -379,13 +394,14 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { | 
| return _controller.stream; | 
| } | 
| - static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) { | 
| + static Future<WebSocket> _upgrade( | 
| + HttpRequest request, _protocolSelector, CompressionOptions compression) { | 
| var response = request.response; | 
| if (!_isUpgradeRequest(request)) { | 
| // Send error response. | 
| response | 
| - ..statusCode = HttpStatus.BAD_REQUEST | 
| - ..close(); | 
| + ..statusCode = HttpStatus.BAD_REQUEST | 
| + ..close(); | 
| return new Future.error( | 
| new WebSocketException("Invalid WebSocket upgrade request")); | 
| } | 
| @@ -393,9 +409,9 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { | 
| Future upgrade(String protocol) { | 
| // Send the upgrade response. | 
| response | 
| - ..statusCode = HttpStatus.SWITCHING_PROTOCOLS | 
| - ..headers.add(HttpHeaders.CONNECTION, "Upgrade") | 
| - ..headers.add(HttpHeaders.UPGRADE, "websocket"); | 
| + ..statusCode = HttpStatus.SWITCHING_PROTOCOLS | 
| + ..headers.add(HttpHeaders.CONNECTION, "Upgrade") | 
| + ..headers.add(HttpHeaders.UPGRADE, "websocket"); | 
| String key = request.headers.value("Sec-WebSocket-Key"); | 
| _SHA1 sha1 = new _SHA1(); | 
| sha1.add("$key$_webSocketGUID".codeUnits); | 
| @@ -404,10 +420,13 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { | 
| if (protocol != null) { | 
| response.headers.add("Sec-WebSocket-Protocol", protocol); | 
| } | 
| + | 
| + var deflate = _negotiateCompression(request, response, compression); | 
| + | 
| response.headers.contentLength = 0; | 
| - return response.detachSocket() | 
| - .then((socket) => new _WebSocketImpl._fromSocket( | 
| - socket, protocol, true)); | 
| + return response.detachSocket().then((socket) => | 
| + new _WebSocketImpl._fromSocket( | 
| + socket, protocol, compression, true, deflate)); | 
| } | 
| var protocols = request.headers['Sec-WebSocket-Protocol']; | 
| @@ -416,26 +435,58 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { | 
| // consisting of multiple protocols. To unify all of them, first join | 
| // the lists with ', ' and then tokenize. | 
| protocols = _HttpParser._tokenizeFieldValue(protocols.join(', ')); | 
| - return new Future(() => _protocolSelector(protocols)) | 
| - .then((protocol) { | 
| - if (protocols.indexOf(protocol) < 0) { | 
| - throw new WebSocketException( | 
| - "Selected protocol is not in the list of available protocols"); | 
| - } | 
| - return protocol; | 
| - }) | 
| - .catchError((error) { | 
| - response | 
| - ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR | 
| - ..close(); | 
| - throw error; | 
| - }) | 
| - .then(upgrade); | 
| + return new Future(() => _protocolSelector(protocols)).then((protocol) { | 
| + if (protocols.indexOf(protocol) < 0) { | 
| + throw new WebSocketException( | 
| + "Selected protocol is not in the list of available protocols"); | 
| + } | 
| + return protocol; | 
| + }).catchError((error) { | 
| + response | 
| + ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR | 
| + ..close(); | 
| + throw error; | 
| + }).then(upgrade); | 
| } else { | 
| return upgrade(null); | 
| } | 
| } | 
| + static _WebSocketPerMessageDeflate _negotiateCompression(HttpRequest request, | 
| + HttpResponse response, CompressionOptions compression) { | 
| + var extensionHeader = request.headers.value("Sec-WebSocket-Extensions"); | 
| + | 
| + if (extensionHeader == null) { | 
| + extensionHeader = ""; | 
| + } | 
| + | 
| + Iterable<List<String>> extensions = | 
| + extensionHeader.split(",").map((it) => it.split("; ")); | 
| 
Søren Gjesse
2015/08/24 08:21:50
Both the splitting using "," and "; " seems fragil
 | 
| + | 
| + var perMessageDeflate = extensions.firstWhere( | 
| + (x) => x[0] == _WebSocketImpl.PER_MESSAGE_DEFLATE, | 
| + orElse: () => null); | 
| + if (compression.enabled && perMessageDeflate != null) { | 
| + var info = compression._createHeader(perMessageDeflate); | 
| + | 
| + response.headers.add("Sec-WebSocket-Extensions", info[0]); | 
| + var serverNoContextTakeover = | 
| + perMessageDeflate.contains("server_no_context_takeover"); | 
| + var clientNoContextTakeover = | 
| + perMessageDeflate.contains("client_no_context_takeover"); | 
| + var deflate = new _WebSocketPerMessageDeflate( | 
| + serverNoContextTakeover: serverNoContextTakeover, | 
| + clientNoContextTakeover: clientNoContextTakeover, | 
| + serverMaxWindowBits: info[1], | 
| + clientMaxWindowBits: info[1], | 
| + serverSide: true); | 
| + | 
| + return deflate; | 
| + } | 
| + | 
| + return null; | 
| + } | 
| + | 
| static bool _isUpgradeRequest(HttpRequest request) { | 
| if (request.method != "GET") { | 
| return false; | 
| @@ -464,24 +515,122 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { | 
| } | 
| } | 
| +class _WebSocketPerMessageDeflate { | 
| + bool serverNoContextTakeover; | 
| + bool clientNoContextTakeover; | 
| + int clientMaxWindowBits; | 
| + int serverMaxWindowBits; | 
| + bool serverSide; | 
| + | 
| + _Filter decoder; | 
| + _Filter encoder; | 
| + | 
| + _WebSocketPerMessageDeflate( | 
| + {this.clientMaxWindowBits, | 
| 
Søren Gjesse
2015/08/24 08:21:50
nit: Please indent like this (one space more for t
 | 
| + this.serverMaxWindowBits, | 
| + this.serverNoContextTakeover: false, | 
| + this.clientNoContextTakeover: false, | 
| + this.serverSide: false}) { | 
| + if (clientMaxWindowBits == null) { | 
| + clientMaxWindowBits = _WebSocketImpl.DEFAULT_WINDOW_BITS; | 
| + } | 
| + | 
| + if (serverMaxWindowBits == null) { | 
| + serverMaxWindowBits = _WebSocketImpl.DEFAULT_WINDOW_BITS; | 
| + } | 
| + } | 
| + | 
| + void _ensureDecoder() { | 
| + if (decoder == null) { | 
| + decoder = _Filter._newZLibInflateFilter( | 
| + serverSide ? clientMaxWindowBits : serverMaxWindowBits, null, true); | 
| + } | 
| + } | 
| + | 
| + void _ensureEncoder() { | 
| + if (encoder == null) { | 
| + encoder = _Filter._newZLibDeflateFilter( | 
| + false, | 
| + ZLibOption.DEFAULT_LEVEL, | 
| + serverSide ? serverMaxWindowBits : clientMaxWindowBits, | 
| + ZLibOption.DEFAULT_MEM_LEVEL, | 
| + ZLibOption.STRATEGY_DEFAULT, | 
| + null, | 
| + true); | 
| + } | 
| + } | 
| + | 
| + List<int> processIncomingMessage(List<int> msg) { | 
| + _ensureDecoder(); | 
| + | 
| + var data = []; | 
| + data.addAll(msg); | 
| + data.addAll(const [0x00, 0x00, 0xff, 0xff]); | 
| + | 
| + decoder.process(data, 0, data.length); | 
| + var reuse = | 
| + !(serverSide ? clientNoContextTakeover : serverNoContextTakeover); | 
| + var result = []; | 
| + var out; | 
| + | 
| + while ((out = decoder.processed(flush: reuse)) != null) { | 
| + result.addAll(out); | 
| + } | 
| + | 
| + decoder.processed(flush: reuse); | 
| + | 
| + if (!reuse) { | 
| + decoder.end(); | 
| + decoder = null; | 
| + } | 
| + return result; | 
| + } | 
| + | 
| + List<int> processOutgoingMessage(List<int> msg) { | 
| + _ensureEncoder(); | 
| + var reuse = | 
| + !(serverSide ? serverNoContextTakeover : clientNoContextTakeover); | 
| + var result = []; | 
| + var out; | 
| + | 
| + encoder.process(msg, 0, msg.length); | 
| + | 
| + while ((out = encoder.processed(flush: reuse)) != null) { | 
| + result.addAll(out); | 
| + } | 
| + | 
| + if (serverSide ? serverNoContextTakeover : clientNoContextTakeover) { | 
| + encoder.end(); | 
| + encoder = null; | 
| + } | 
| + | 
| + if (result.length > 4) { | 
| + result = result.sublist(0, result.length - 4); | 
| + } | 
| + | 
| + return result; | 
| + } | 
| +} | 
| // TODO(ajohnsen): Make this transformer reusable. | 
| class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { | 
| final _WebSocketImpl webSocket; | 
| EventSink _eventSink; | 
| - _WebSocketOutgoingTransformer(this.webSocket); | 
| + _WebSocketPerMessageDeflate _deflateHelper; | 
| + | 
| + _WebSocketOutgoingTransformer(this.webSocket) { | 
| + _deflateHelper = webSocket._deflate; | 
| + } | 
| Stream bind(Stream stream) { | 
| - return new Stream.eventTransformed( | 
| - stream, | 
| - (EventSink eventSink) { | 
| - if (_eventSink != null) { | 
| - throw new StateError("WebSocket transformer already used"); | 
| - } | 
| - _eventSink = eventSink; | 
| - return this; | 
| - }); | 
| + return new Stream.eventTransformed(stream, (EventSink eventSink) { | 
| + if (_eventSink != null) { | 
| + throw new StateError("WebSocket transformer already used"); | 
| + } | 
| + _eventSink = eventSink; | 
| + return this; | 
| + }); | 
| } | 
| void add(message) { | 
| @@ -500,12 +649,16 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { | 
| opcode = _WebSocketOpcode.TEXT; | 
| data = UTF8.encode(message); | 
| } else { | 
| - if (message is !List<int>) { | 
| + if (message is! List<int>) { | 
| throw new ArgumentError(message); | 
| } | 
| opcode = _WebSocketOpcode.BINARY; | 
| data = message; | 
| } | 
| + | 
| + if (_deflateHelper != null) { | 
| + data = _deflateHelper.processOutgoingMessage(data); | 
| + } | 
| } else { | 
| opcode = _WebSocketOpcode.TEXT; | 
| } | 
| @@ -531,11 +684,19 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { | 
| _eventSink.close(); | 
| } | 
| - void addFrame(int opcode, List<int> data) => | 
| - createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); | 
| + void addFrame(int opcode, List<int> data) => createFrame( | 
| + opcode, | 
| + data, | 
| + webSocket._serverSide, | 
| + _deflateHelper != null && | 
| + (opcode == _WebSocketOpcode.TEXT || | 
| + opcode == _WebSocketOpcode.BINARY)).forEach((e) { | 
| + _eventSink.add(e); | 
| + }); | 
| - static Iterable createFrame(int opcode, List<int> data, bool serverSide) { | 
| - bool mask = !serverSide; // Masking not implemented for server. | 
| + static Iterable createFrame( | 
| + int opcode, List<int> data, bool serverSide, bool compressed) { | 
| + bool mask = !serverSide; // Masking not implemented for server. | 
| int dataLength = data == null ? 0 : data.length; | 
| // Determine the header size. | 
| int headerSize = (mask) ? 6 : 2; | 
| @@ -546,11 +707,18 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { | 
| } | 
| Uint8List header = new Uint8List(headerSize); | 
| int index = 0; | 
| + | 
| + var rsv = 0; | 
| + if (compressed) { | 
| + rsv = 4; | 
| + } | 
| + | 
| // Set FIN and opcode. | 
| - header[index++] = 0x80 | opcode; | 
| + var hoc = (1 << 7) | (rsv % 8) << 4 | (opcode % 128); | 
| + | 
| + header[index++] = hoc; | 
| 
Søren Gjesse
2015/08/24 08:21:50
Just set this byte like this (no need to the rsv a
 | 
| // Determine size and position of length field. | 
| int lengthBytes = 1; | 
| - int firstLengthByte = 1; | 
| if (dataLength > 65535) { | 
| header[index++] = 127; | 
| lengthBytes = 8; | 
| @@ -580,8 +748,7 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { | 
| list = new Uint8List(data.length); | 
| for (int i = 0; i < data.length; i++) { | 
| if (data[i] < 0 || 255 < data[i]) { | 
| - throw new ArgumentError( | 
| - "List element is not a byte value " | 
| + throw new ArgumentError("List element is not a byte value " | 
| "(value ${data[i]} at index $i)"); | 
| } | 
| list[i] = data[i]; | 
| @@ -597,8 +764,8 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { | 
| mask = (mask << 8) | maskBytes[i]; | 
| } | 
| Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); | 
| - Int32x4List blockBuffer = new Int32x4List.view( | 
| - list.buffer, 0, blockCount); | 
| + Int32x4List blockBuffer = | 
| + new Int32x4List.view(list.buffer, 0, blockCount); | 
| for (int i = 0; i < blockBuffer.length; i++) { | 
| blockBuffer[i] ^= blockMask; | 
| } | 
| @@ -619,7 +786,6 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { | 
| } | 
| } | 
| - | 
| class _WebSocketConsumer implements StreamConsumer { | 
| final _WebSocketImpl webSocket; | 
| final Socket socket; | 
| @@ -664,28 +830,28 @@ class _WebSocketConsumer implements StreamConsumer { | 
| _ensureController() { | 
| if (_controller != null) return; | 
| - _controller = new StreamController(sync: true, | 
| - onPause: _onPause, | 
| - onResume: _onResume, | 
| - onCancel: _onListen); | 
| - var stream = _controller.stream.transform( | 
| - new _WebSocketOutgoingTransformer(webSocket)); | 
| - socket.addStream(stream) | 
| - .then((_) { | 
| - _done(); | 
| - _closeCompleter.complete(webSocket); | 
| - }, onError: (error, StackTrace stackTrace) { | 
| - _closed = true; | 
| - _cancel(); | 
| - if (error is ArgumentError) { | 
| - if (!_done(error, stackTrace)) { | 
| - _closeCompleter.completeError(error, stackTrace); | 
| - } | 
| - } else { | 
| - _done(); | 
| - _closeCompleter.complete(webSocket); | 
| - } | 
| - }); | 
| + _controller = new StreamController( | 
| + sync: true, | 
| + onPause: _onPause, | 
| + onResume: _onResume, | 
| + onCancel: _onListen); | 
| + var stream = _controller.stream | 
| + .transform(new _WebSocketOutgoingTransformer(webSocket)); | 
| + socket.addStream(stream).then((_) { | 
| + _done(); | 
| + _closeCompleter.complete(webSocket); | 
| + }, onError: (error, StackTrace stackTrace) { | 
| + _closed = true; | 
| + _cancel(); | 
| + if (error is ArgumentError) { | 
| + if (!_done(error, stackTrace)) { | 
| + _closeCompleter.completeError(error, stackTrace); | 
| + } | 
| + } else { | 
| + _done(); | 
| + _closeCompleter.complete(webSocket); | 
| + } | 
| + }); | 
| } | 
| bool _done([error, StackTrace stackTrace]) { | 
| @@ -706,13 +872,9 @@ class _WebSocketConsumer implements StreamConsumer { | 
| } | 
| _ensureController(); | 
| _completer = new Completer(); | 
| - _subscription = stream.listen( | 
| - (data) { | 
| - _controller.add(data); | 
| - }, | 
| - onDone: _done, | 
| - onError: _done, | 
| - cancelOnError: true); | 
| + _subscription = stream.listen((data) { | 
| + _controller.add(data); | 
| + }, onDone: _done, onError: _done, cancelOnError: true); | 
| if (_issuedPause) { | 
| _subscription.pause(); | 
| _issuedPause = false; | 
| @@ -742,10 +904,11 @@ class _WebSocketConsumer implements StreamConsumer { | 
| } | 
| } | 
| - | 
| class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { | 
| // Use default Map so we keep order. | 
| static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>(); | 
| + static const int DEFAULT_WINDOW_BITS = 15; | 
| + static const String PER_MESSAGE_DEFLATE = "permessage-deflate"; | 
| final String protocol; | 
| @@ -766,11 +929,13 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { | 
| int _outCloseCode; | 
| String _outCloseReason; | 
| Timer _closeTimer; | 
| + _WebSocketPerMessageDeflate _deflate; | 
| static final HttpClient _httpClient = new HttpClient(); | 
| static Future<WebSocket> connect( | 
| - String url, Iterable<String> protocols, Map<String, dynamic> headers) { | 
| + String url, Iterable<String> protocols, Map<String, dynamic> headers, | 
| + {CompressionOptions compression: CompressionOptions.DEFAULT}) { | 
| Uri uri = Uri.parse(url); | 
| if (uri.scheme != "ws" && uri.scheme != "wss") { | 
| throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'"); | 
| @@ -784,144 +949,189 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { | 
| } | 
| String nonce = _CryptoUtils.bytesToBase64(nonceData); | 
| - uri = new Uri(scheme: uri.scheme == "wss" ? "https" : "http", | 
| - userInfo: uri.userInfo, | 
| - host: uri.host, | 
| - port: uri.port, | 
| - path: uri.path, | 
| - query: uri.query, | 
| - fragment: uri.fragment); | 
| - return _httpClient.openUrl("GET", uri) | 
| - .then((request) { | 
| - if (uri.userInfo != null && !uri.userInfo.isEmpty) { | 
| - // If the URL contains user information use that for basic | 
| - // authorization. | 
| - String auth = | 
| - _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo)); | 
| - request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); | 
| - } | 
| - if (headers != null) { | 
| - headers.forEach((field, value) => request.headers.add(field, value)); | 
| - } | 
| - // Setup the initial handshake. | 
| - request.headers | 
| - ..set(HttpHeaders.CONNECTION, "Upgrade") | 
| - ..set(HttpHeaders.UPGRADE, "websocket") | 
| - ..set("Sec-WebSocket-Key", nonce) | 
| - ..set("Cache-Control", "no-cache") | 
| - ..set("Sec-WebSocket-Version", "13"); | 
| - if (protocols != null) { | 
| - request.headers.add("Sec-WebSocket-Protocol", protocols.toList()); | 
| - } | 
| - return request.close(); | 
| - }) | 
| - .then((response) { | 
| - void error(String message) { | 
| - // Flush data. | 
| - response.detachSocket().then((socket) { | 
| - socket.destroy(); | 
| - }); | 
| - throw new WebSocketException(message); | 
| - } | 
| - if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS || | 
| - response.headers[HttpHeaders.CONNECTION] == null || | 
| - !response.headers[HttpHeaders.CONNECTION].any( | 
| - (value) => value.toLowerCase() == "upgrade") || | 
| - response.headers.value(HttpHeaders.UPGRADE).toLowerCase() != | 
| - "websocket") { | 
| - error("Connection to '$uri' was not upgraded to websocket"); | 
| - } | 
| - String accept = response.headers.value("Sec-WebSocket-Accept"); | 
| - if (accept == null) { | 
| - error("Response did not contain a 'Sec-WebSocket-Accept' header"); | 
| + uri = new Uri( | 
| + scheme: uri.scheme == "wss" ? "https" : "http", | 
| + userInfo: uri.userInfo, | 
| + host: uri.host, | 
| + port: uri.port, | 
| + path: uri.path, | 
| + query: uri.query, | 
| + fragment: uri.fragment); | 
| + return _httpClient.openUrl("GET", uri).then((request) { | 
| + if (uri.userInfo != null && !uri.userInfo.isEmpty) { | 
| + // If the URL contains user information use that for basic | 
| + // authorization. | 
| + String auth = _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo)); | 
| + request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); | 
| + } | 
| + if (headers != null) { | 
| + headers.forEach((field, value) => request.headers.add(field, value)); | 
| + } | 
| + // Setup the initial handshake. | 
| + request.headers | 
| + ..set(HttpHeaders.CONNECTION, "Upgrade") | 
| + ..set(HttpHeaders.UPGRADE, "websocket") | 
| + ..set("Sec-WebSocket-Key", nonce) | 
| + ..set("Cache-Control", "no-cache") | 
| + ..set("Sec-WebSocket-Version", "13"); | 
| + if (protocols != null) { | 
| + request.headers.add("Sec-WebSocket-Protocol", protocols.toList()); | 
| + } | 
| + | 
| + request.headers | 
| + .add("Sec-WebSocket-Extensions", compression._createHeader()); | 
| + | 
| + return request.close(); | 
| + }).then((response) { | 
| + void error(String message) { | 
| + // Flush data. | 
| + response.detachSocket().then((socket) { | 
| + socket.destroy(); | 
| + }); | 
| + throw new WebSocketException(message); | 
| + } | 
| + if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS || | 
| + response.headers[HttpHeaders.CONNECTION] == null || | 
| + !response.headers[HttpHeaders.CONNECTION] | 
| + .any((value) => value.toLowerCase() == "upgrade") || | 
| + response.headers.value(HttpHeaders.UPGRADE).toLowerCase() != | 
| + "websocket") { | 
| + error("Connection to '$uri' was not upgraded to websocket"); | 
| + } | 
| + String accept = response.headers.value("Sec-WebSocket-Accept"); | 
| + if (accept == null) { | 
| + error("Response did not contain a 'Sec-WebSocket-Accept' header"); | 
| + } | 
| + _SHA1 sha1 = new _SHA1(); | 
| + sha1.add("$nonce$_webSocketGUID".codeUnits); | 
| + List<int> expectedAccept = sha1.close(); | 
| + List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept); | 
| + if (expectedAccept.length != receivedAccept.length) { | 
| + error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length"); | 
| + } | 
| + for (int i = 0; i < expectedAccept.length; i++) { | 
| + if (expectedAccept[i] != receivedAccept[i]) { | 
| + error("Bad response 'Sec-WebSocket-Accept' header"); | 
| } | 
| - _SHA1 sha1 = new _SHA1(); | 
| - sha1.add("$nonce$_webSocketGUID".codeUnits); | 
| - List<int> expectedAccept = sha1.close(); | 
| - List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept); | 
| - if (expectedAccept.length != receivedAccept.length) { | 
| - error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length"); | 
| + } | 
| + var protocol = response.headers.value('Sec-WebSocket-Protocol'); | 
| + | 
| + _WebSocketPerMessageDeflate deflate = | 
| + negotiateClientCompression(response, compression); | 
| + | 
| + return response.detachSocket().then((socket) => | 
| + new _WebSocketImpl._fromSocket( | 
| + socket, protocol, compression, false, deflate)); | 
| + }); | 
| + } | 
| + | 
| + static _WebSocketPerMessageDeflate negotiateClientCompression( | 
| + HttpClientResponse response, CompressionOptions compression) { | 
| + String extensionHeader = response.headers.value('Sec-WebSocket-Extensions'); | 
| + | 
| + if (extensionHeader == null) { | 
| + extensionHeader = ""; | 
| + } | 
| + | 
| + Iterable<List<String>> extensions = | 
| + extensionHeader.split(", ").map((it) => it.split("; ")); | 
| 
Søren Gjesse
2015/08/24 08:21:50
Please see above (comment on _negoatiateCompressio
 | 
| + | 
| + if (compression.enabled && | 
| + extensions.any((x) => x[0] == PER_MESSAGE_DEFLATE)) { | 
| + var opts = extensions.firstWhere((x) => x[0] == PER_MESSAGE_DEFLATE); | 
| + var serverNoContextTakeover = opts.contains("server_no_context_takeover"); | 
| + var clientNoContextTakeover = opts.contains("client_no_context_takeover"); | 
| + | 
| + int getWindowBits(String type) { | 
| + var o = opts.firstWhere((x) => x.startsWith("${type}_max_window_bits="), | 
| + orElse: () => null); | 
| + | 
| + if (o == null) { | 
| + return DEFAULT_WINDOW_BITS; | 
| } | 
| - for (int i = 0; i < expectedAccept.length; i++) { | 
| - if (expectedAccept[i] != receivedAccept[i]) { | 
| - error("Bad response 'Sec-WebSocket-Accept' header"); | 
| - } | 
| + | 
| + try { | 
| + o = o.substring("${type}_max_window_bits=".length); | 
| + o = int.parse(o); | 
| + } catch (e) { | 
| + return DEFAULT_WINDOW_BITS; | 
| } | 
| - var protocol = response.headers.value('Sec-WebSocket-Protocol'); | 
| - return response.detachSocket() | 
| - .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol)); | 
| - }); | 
| + | 
| + return o; | 
| + } | 
| + | 
| + return new _WebSocketPerMessageDeflate( | 
| + clientMaxWindowBits: getWindowBits("client"), | 
| + serverMaxWindowBits: getWindowBits("server"), | 
| + clientNoContextTakeover: clientNoContextTakeover, | 
| + serverNoContextTakeover: serverNoContextTakeover); | 
| + } | 
| + | 
| + return null; | 
| } | 
| - _WebSocketImpl._fromSocket(this._socket, this.protocol, | 
| - [this._serverSide = false]) { | 
| + _WebSocketImpl._fromSocket( | 
| + this._socket, this.protocol, CompressionOptions compression, | 
| + [this._serverSide = false, _WebSocketPerMessageDeflate deflate]) { | 
| _consumer = new _WebSocketConsumer(this, _socket); | 
| _sink = new _StreamSinkImpl(_consumer); | 
| _readyState = WebSocket.OPEN; | 
| - | 
| - var transformer = new _WebSocketProtocolTransformer(_serverSide); | 
| - _subscription = _socket.transform(transformer).listen( | 
| - (data) { | 
| - if (data is _WebSocketPing) { | 
| - if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); | 
| - } else if (data is _WebSocketPong) { | 
| - // Simply set pingInterval, as it'll cancel any timers. | 
| - pingInterval = _pingInterval; | 
| - } else { | 
| - _controller.add(data); | 
| - } | 
| - }, | 
| - onError: (error) { | 
| - if (_closeTimer != null) _closeTimer.cancel(); | 
| - if (error is FormatException) { | 
| - _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); | 
| - } else { | 
| - _close(WebSocketStatus.PROTOCOL_ERROR); | 
| - } | 
| - // An error happened, set the close code set above. | 
| - _closeCode = _outCloseCode; | 
| - _closeReason = _outCloseReason; | 
| - _controller.close(); | 
| - }, | 
| - onDone: () { | 
| - if (_closeTimer != null) _closeTimer.cancel(); | 
| - if (_readyState == WebSocket.OPEN) { | 
| - _readyState = WebSocket.CLOSING; | 
| - if (!_isReservedStatusCode(transformer.closeCode)) { | 
| - _close(transformer.closeCode, transformer.closeReason); | 
| - } else { | 
| - _close(); | 
| - } | 
| - _readyState = WebSocket.CLOSED; | 
| - } | 
| - // Protocol close, use close code from transformer. | 
| - _closeCode = transformer.closeCode; | 
| - _closeReason = transformer.closeReason; | 
| - _controller.close(); | 
| - }, | 
| - cancelOnError: true); | 
| + _deflate = deflate; | 
| + | 
| + var transformer = new _WebSocketProtocolTransformer(_serverSide, _deflate); | 
| + _subscription = _socket.transform(transformer).listen((data) { | 
| + if (data is _WebSocketPing) { | 
| + if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); | 
| + } else if (data is _WebSocketPong) { | 
| + // Simply set pingInterval, as it'll cancel any timers. | 
| + pingInterval = _pingInterval; | 
| + } else { | 
| + _controller.add(data); | 
| + } | 
| + }, onError: (error, stackTrace) { | 
| + if (_closeTimer != null) _closeTimer.cancel(); | 
| + if (error is FormatException) { | 
| + _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); | 
| + } else { | 
| + _close(WebSocketStatus.PROTOCOL_ERROR); | 
| + } | 
| + // An error happened, set the close code set above. | 
| + _closeCode = _outCloseCode; | 
| + _closeReason = _outCloseReason; | 
| + _controller.close(); | 
| + }, onDone: () { | 
| + if (_closeTimer != null) _closeTimer.cancel(); | 
| + if (_readyState == WebSocket.OPEN) { | 
| + _readyState = WebSocket.CLOSING; | 
| + if (!_isReservedStatusCode(transformer.closeCode)) { | 
| + _close(transformer.closeCode, transformer.closeReason); | 
| + } else { | 
| + _close(); | 
| + } | 
| + _readyState = WebSocket.CLOSED; | 
| + } | 
| + // Protocol close, use close code from transformer. | 
| + _closeCode = transformer.closeCode; | 
| + _closeReason = transformer.closeReason; | 
| + _controller.close(); | 
| + }, cancelOnError: true); | 
| _subscription.pause(); | 
| - _controller = new StreamController(sync: true, | 
| - onListen: _subscription.resume, | 
| - onCancel: () { | 
| - _subscription.cancel(); | 
| - _subscription = null; | 
| - }, | 
| - onPause: _subscription.pause, | 
| - onResume: _subscription.resume); | 
| + _controller = new StreamController( | 
| + sync: true, onListen: _subscription.resume, onCancel: () { | 
| + _subscription.cancel(); | 
| + _subscription = null; | 
| + }, onPause: _subscription.pause, onResume: _subscription.resume); | 
| _webSockets[_serviceId] = this; | 
| - try { _socket._owner = this; } catch (_) {} | 
| + try { | 
| + _socket._owner = this; | 
| + } catch (_) {} | 
| } | 
| StreamSubscription listen(void onData(message), | 
| - {Function onError, | 
| - void onDone(), | 
| - bool cancelOnError}) { | 
| + {Function onError, void onDone(), bool cancelOnError}) { | 
| return _controller.stream.listen(onData, | 
| - onError: onError, | 
| - onDone: onDone, | 
| - cancelOnError: cancelOnError); | 
| + onError: onError, onDone: onDone, cancelOnError: cancelOnError); | 
| } | 
| Duration get pingInterval => _pingInterval; | 
| @@ -1027,13 +1237,12 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { | 
| static bool _isReservedStatusCode(int code) { | 
| return code != null && | 
| - (code < WebSocketStatus.NORMAL_CLOSURE || | 
| + (code < WebSocketStatus.NORMAL_CLOSURE || | 
| code == WebSocketStatus.RESERVED_1004 || | 
| code == WebSocketStatus.NO_STATUS_RECEIVED || | 
| code == WebSocketStatus.ABNORMAL_CLOSURE || | 
| (code > WebSocketStatus.INTERNAL_SERVER_ERROR && | 
| - code < WebSocketStatus.RESERVED_1015) || | 
| - (code >= WebSocketStatus.RESERVED_1015 && | 
| - code < 3000)); | 
| + code < WebSocketStatus.RESERVED_1015) || | 
| + (code >= WebSocketStatus.RESERVED_1015 && code < 3000)); | 
| } | 
| } |