Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(349)

Unified Diff: sdk/lib/io/websocket_impl.dart

Issue 1208473005: WebSocket Compression (Closed) Base URL: https://github.com/dart-lang/sdk.git
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« sdk/lib/io/websocket.dart ('K') | « sdk/lib/io/websocket.dart ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/io/websocket_impl.dart
diff --git a/sdk/lib/io/websocket_impl.dart b/sdk/lib/io/websocket_impl.dart
index 265760690923f502512435c8e6d8df3d1fada22a..183500c6510808ffda8144d58c3fe99a8cd6a72a 100644
--- a/sdk/lib/io/websocket_impl.dart
+++ b/sdk/lib/io/websocket_impl.dart
@@ -38,8 +38,8 @@ class _WebSocketOpcode {
* which is supplied through the [:handleData:]. As the protocol is processed,
* it'll output frame data as either a List<int> or String.
*
- * Important infomation about usage: Be sure you use cancelOnError, so the
- * socket will be closed when the processer encounter an error. Not using it
+ * Important information about usage: Be sure you use cancelOnError, so the
+ * socket will be closed when the processor encounter an error. Not using it
* will lead to undefined behaviour.
*/
// TODO(ajohnsen): make this transformer reusable?
@@ -54,6 +54,7 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
int _state = START;
bool _fin = false;
+ bool _compressed = false;
int _opcode = -1;
int _len = -1;
bool _masked = false;
@@ -71,7 +72,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
final List _maskingBytes = new List(4);
final BytesBuilder _payload = new BytesBuilder(copy: false);
- _WebSocketProtocolTransformer([this._serverSide = false]);
+ _WebSocketPerMessageDeflate _deflate;
+ _WebSocketProtocolTransformer([this._serverSide = false, this._deflate]);
Stream bind(Stream stream) {
return new Stream.eventTransformed(
@@ -108,11 +110,18 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
if (_state <= LEN_REST) {
if (_state == START) {
_fin = (byte & 0x80) != 0;
Søren Gjesse 2015/06/25 15:41:27 Please add constants for FIN, RSV1, RSV2 and RSV3
- if ((byte & 0x70) != 0) {
- // The RSV1, RSV2 bits RSV3 must be all zero.
+
+ if ((byte & 0x40) != 0) {
+ _compressed = true;
+ }
+
+ if ((byte & 0x20) != 0 || (byte & 0x10) != 0) {
+ // The RSV2 and RSV3 bits must be all zero.
throw new WebSocketException("Protocol error");
}
+
_opcode = (byte & 0xF);
Søren Gjesse 2015/06/25 15:41:27 Please add a constant for the opcode mask.
+
if (_opcode <= _WebSocketOpcode.BINARY) {
if (_opcode == _WebSocketOpcode.CONTINUATION) {
if (_currentMessageType == _WebSocketMessageType.NONE) {
@@ -284,12 +293,17 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
void _messageFrameEnd() {
if (_fin) {
+ var bytes = _payload.takeBytes();
+ if (_deflate != null && _compressed) {
+ bytes = _deflate.processIncomingMessage(bytes);
+ }
+
switch (_currentMessageType) {
case _WebSocketMessageType.TEXT:
- _eventSink.add(UTF8.decode(_payload.takeBytes()));
+ _eventSink.add(UTF8.decode(bytes));
break;
case _WebSocketMessageType.BINARY:
- _eventSink.add(_payload.takeBytes());
+ _eventSink.add(bytes);
break;
}
_currentMessageType = _WebSocketMessageType.NONE;
@@ -364,12 +378,13 @@ class _WebSocketTransformerImpl implements WebSocketTransformer {
final StreamController<WebSocket> _controller =
new StreamController<WebSocket>(sync: true);
final Function _protocolSelector;
+ final CompressionOptions _compression;
- _WebSocketTransformerImpl(this._protocolSelector);
+ _WebSocketTransformerImpl(this._protocolSelector, this._compression);
Stream<WebSocket> bind(Stream<HttpRequest> stream) {
stream.listen((request) {
- _upgrade(request, _protocolSelector)
+ _upgrade(request, _protocolSelector, _compression)
.then((WebSocket webSocket) => _controller.add(webSocket))
.catchError(_controller.addError);
}, onDone: () {
@@ -379,7 +394,8 @@ class _WebSocketTransformerImpl implements WebSocketTransformer {
return _controller.stream;
}
- static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) {
+ static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector,
+ CompressionOptions compression) {
var response = request.response;
if (!_isUpgradeRequest(request)) {
// Send error response.
@@ -404,10 +420,24 @@ class _WebSocketTransformerImpl implements WebSocketTransformer {
if (protocol != null) {
response.headers.add("Sec-WebSocket-Protocol", protocol);
}
+
Søren Gjesse 2015/06/25 15:41:27 Extract server side negotiation into a separate me
+ var extensionHeader = request.headers.value("Sec-WebSocket-Extensions");
+
+ if (extensionHeader == null) {
+ extensionHeader = "";
+ }
+
+ Iterable<List<String>> extensions = extensionHeader.split(",").map((it) => it.split("; "));
+
+ if (compression.enabled && extensions.any((x) => x[0] == "permessage-deflate")) {
+ var opts = extensions.firstWhere((x) => x[0] == "permessage-deflate");
+ response.headers.add("Sec-WebSocket-Extensions", compression._createHeader(opts));
+ }
+
response.headers.contentLength = 0;
return response.detachSocket()
.then((socket) => new _WebSocketImpl._fromSocket(
Søren Gjesse 2015/06/25 15:41:27 Compression not supported for server (no additiona
- socket, protocol, true));
+ socket, protocol, compression, true));
}
var protocols = request.headers['Sec-WebSocket-Protocol'];
@@ -464,12 +494,64 @@ class _WebSocketTransformerImpl implements WebSocketTransformer {
}
}
+class _WebSocketPerMessageDeflate {
+ bool noContextTakeover;
+ int clientMaxWindowBits;
+ int serverMaxWindowBits;
+ bool serverSide;
+
+ ZLibDecoder decoder;
+ ZLibEncoder encoder;
+
+ _WebSocketPerMessageDeflate({this.clientMaxWindowBits,
Søren Gjesse 2015/06/25 15:41:27 No need to use optional named argument for interna
+ this.serverMaxWindowBits, this.noContextTakeover,
+ this.serverSide: false}) {
+ if (clientMaxWindowBits == null) {
Søren Gjesse 2015/06/25 15:41:27 Make the caller always pass a non null value - it
+ clientMaxWindowBits = 15;
Søren Gjesse 2015/06/25 15:41:27 Create a constant for this (maybe in CompressionOp
+ }
+
+ if (serverMaxWindowBits == null) {
+ serverMaxWindowBits = 15;
+ }
+ }
+
+ void _ensureDecoder() {
+ if (noContextTakeover || decoder == null) {
+ decoder = new ZLibDecoder(windowBits: serverSide ?
+ clientMaxWindowBits : serverMaxWindowBits);
+ }
+ }
+
+ void _ensureEncoder() {
+ if (noContextTakeover || encoder == null) {
+ encoder = new ZLibEncoder(windowBits: serverSide ?
+ serverMaxWindowBits : clientMaxWindowBits);
+ }
+ }
+
+ List<int> processIncomingMessage(List<int> msg) {
+ _ensureDecoder();
+ var builder = new BytesBuilder();
+ builder.add(msg);
+ builder.add(const [0x00, 0x00, 0xff, 0xff]);
Søren Gjesse 2015/06/25 15:41:27 For the noContextTakeover case set decoder to null
+ return decoder.convert(builder.takeBytes());
+ }
+
+ List<int> processOutgoingMessage(List<int> msg) {
+ _ensureEncoder();
+ var c = encoder.convert(msg);
+ c = c.sublist(0, c.length - 4);
Søren Gjesse 2015/06/25 15:41:27 Ditto.
+ return c;
+ }
+}
// TODO(ajohnsen): Make this transformer reusable.
class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
final _WebSocketImpl webSocket;
EventSink _eventSink;
+ _WebSocketPerMessageDeflate _deflateHelper;
+
_WebSocketOutgoingTransformer(this.webSocket);
Stream bind(Stream stream) {
@@ -499,12 +581,24 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
if (message is String) {
opcode = _WebSocketOpcode.TEXT;
data = UTF8.encode(message);
+
+ if (_deflateHelper != null) {
+ data = _deflateHelper.processOutgoingMessage(data);
+ }
} else {
if (message is !List<int>) {
throw new ArgumentError(message);
}
opcode = _WebSocketOpcode.BINARY;
data = message;
+
+ if (_deflateHelper != null) {
+ data = _deflateHelper.processOutgoingMessage(data);
+ }
+ }
+
+ if (_deflateHelper != null) {
+ data = _deflateHelper.processOutgoingMessage(data);
}
} else {
opcode = _WebSocketOpcode.TEXT;
@@ -532,9 +626,12 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
}
void addFrame(int opcode, List<int> data) =>
- createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add);
+ createFrame(opcode, data, webSocket._serverSide, webSocket._deflate != null)
+ .forEach((e) {
+ _eventSink.add(e);
+ });
- static Iterable createFrame(int opcode, List<int> data, bool serverSide) {
+ static Iterable createFrame(int opcode, List<int> data, bool serverSide, bool compressed) {
bool mask = !serverSide; // Masking not implemented for server.
int dataLength = data == null ? 0 : data.length;
// Determine the header size.
@@ -547,10 +644,19 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
Uint8List header = new Uint8List(headerSize);
int index = 0;
// Set FIN and opcode.
- header[index++] = 0x80 | opcode;
+ var hoc = 0;
+
+ hoc |= 0x80;
+
+ if (compressed) {
+ hoc |= 0x40;
+ }
+
+ hoc |= opcode & 0xF;
+
+ header[index++] = hoc;
// Determine size and position of length field.
int lengthBytes = 1;
- int firstLengthByte = 1;
if (dataLength > 65535) {
header[index++] = 127;
lengthBytes = 8;
@@ -766,11 +872,13 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
int _outCloseCode;
String _outCloseReason;
Timer _closeTimer;
+ _WebSocketPerMessageDeflate _deflate;
static final HttpClient _httpClient = new HttpClient();
static Future<WebSocket> connect(
- String url, Iterable<String> protocols, Map<String, dynamic> headers) {
+ String url, Iterable<String> protocols, Map<String, dynamic> headers,
+ {CompressionOptions compression: CompressionOptions.DEFAULT}) {
Uri uri = Uri.parse(url);
if (uri.scheme != "ws" && uri.scheme != "wss") {
throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'");
@@ -809,10 +917,14 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
..set(HttpHeaders.UPGRADE, "websocket")
..set("Sec-WebSocket-Key", nonce)
..set("Cache-Control", "no-cache")
- ..set("Sec-WebSocket-Version", "13");
+ ..set("Sec-WebSocket-Version", "13")
+ ..set("Sec-WebSocket-Extensions", "permessage-deflate");
Søren Gjesse 2015/06/25 15:41:27 Isn't this already handled by adding Sec-WebSocket
if (protocols != null) {
request.headers.add("Sec-WebSocket-Protocol", protocols.toList());
}
+
+ request.headers.add("Sec-WebSocket-Extensions", compression._createHeader());
+
return request.close();
})
.then((response) {
@@ -848,18 +960,62 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
}
}
var protocol = response.headers.value('Sec-WebSocket-Protocol');
+
Søren Gjesse 2015/06/25 15:41:27 Please extract this into a separate method called
+ String extensionHeader = response.headers.value('Sec-WebSocket-Extensions');
+
+ if (extensionHeader == null) {
+ extensionHeader = "";
+ }
+
+ Iterable<List<String>> extensions = extensionHeader
+ .split(", ")
+ .map((it) => it.split("; "));
+
+ _WebSocketPerMessageDeflate deflate;
+
+ if (compression.enabled && extensions.any((x) => x[0] == "permessage-deflate")) {
+ var opts = extensions.firstWhere((x) => x[0] == "permessage-deflate");
+ var noContextTakeover = opts.contains("client_no_context_takeover");
+
+ int getWindowBits(String type) {
+ var o = opts.firstWhere((x) =>
+ x.startsWith("${type}_max_window_bits="), orElse: () => null);
+
+ if (o == null) {
+ return 15;
+ }
+
+ try {
+ o = o.substring("client_max_window_bits=".length);
Søren Gjesse 2015/06/25 15:41:27 client -> ${type} here as well? Use var paramete
+ o = int.parse(o);
+ } catch (e) {
+ return 15;
+ }
+
+ return o;
+ }
+
+ deflate = new _WebSocketPerMessageDeflate(
+ clientMaxWindowBits: getWindowBits("client"),
+ serverMaxWindowBits: getWindowBits("server"),
+ noContextTakeover: noContextTakeover);
+ }
+
return response.detachSocket()
- .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol));
+ .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol,
+ compression, false, deflate));
});
}
_WebSocketImpl._fromSocket(this._socket, this.protocol,
- [this._serverSide = false]) {
+ CompressionOptions compression, [this._serverSide = false,
+ _WebSocketPerMessageDeflate deflate]) {
_consumer = new _WebSocketConsumer(this, _socket);
_sink = new _StreamSinkImpl(_consumer);
_readyState = WebSocket.OPEN;
+ _deflate = deflate;
- var transformer = new _WebSocketProtocolTransformer(_serverSide);
+ var transformer = new _WebSocketProtocolTransformer(_serverSide, _deflate);
_subscription = _socket.transform(transformer).listen(
(data) {
if (data is _WebSocketPing) {
« sdk/lib/io/websocket.dart ('K') | « sdk/lib/io/websocket.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698