Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(521)

Unified Diff: sdk/lib/io/websocket_impl.dart

Issue 1390353005: Web Socket compression - take two (Closed) Base URL: https://github.com/dart-lang/sdk.git@master
Patch Set: Fix observatory issues Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/io/websocket.dart ('k') | tests/standalone/io/web_socket_compression_test.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/io/websocket_impl.dart
diff --git a/sdk/lib/io/websocket_impl.dart b/sdk/lib/io/websocket_impl.dart
index cc68544e536233a5e43d63a699e01d4d58ce5b4e..bcb800d84aa80e8f4ca0d812f641b231012f9336 100644
--- a/sdk/lib/io/websocket_impl.dart
+++ b/sdk/lib/io/websocket_impl.dart
@@ -5,6 +5,10 @@
part of dart.io;
const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
+const String _clientNoContextTakeover = "client_no_context_takeover";
+const String _serverNoContextTakeover = "server_no_context_takeover";
+const String _clientMaxWindowBits = "client_max_window_bits";
+const String _serverMaxWindowBits = "server_max_window_bits";
// Matches _WebSocketOpcode.
class _WebSocketMessageType {
@@ -13,7 +17,6 @@ class _WebSocketMessageType {
static const int BINARY = 2;
}
-
class _WebSocketOpcode {
static const int CONTINUATION = 0;
static const int TEXT = 1;
@@ -38,8 +41,8 @@ class _WebSocketOpcode {
* which is supplied through the [:handleData:]. As the protocol is processed,
* it'll output frame data as either a List<int> or String.
*
- * Important infomation about usage: Be sure you use cancelOnError, so the
- * socket will be closed when the processer encounter an error. Not using it
+ * Important information about usage: Be sure you use cancelOnError, so the
+ * socket will be closed when the processor encounter an error. Not using it
* will lead to undefined behaviour.
*/
// TODO(ajohnsen): make this transformer reusable?
@@ -51,9 +54,15 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
static const int PAYLOAD = 4;
static const int CLOSED = 5;
static const int FAILURE = 6;
+ static const int FIN = 0x80;
+ static const int RSV1 = 0x40;
+ static const int RSV2 = 0x20;
+ static const int RSV3 = 0x10;
+ static const int OPCODE = 0xF;
int _state = START;
bool _fin = false;
+ bool _compressed = false;
int _opcode = -1;
int _len = -1;
bool _masked = false;
@@ -71,18 +80,17 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
final List _maskingBytes = new List(4);
final BytesBuilder _payload = new BytesBuilder(copy: false);
- _WebSocketProtocolTransformer([this._serverSide = false]);
+ _WebSocketPerMessageDeflate _deflate;
+ _WebSocketProtocolTransformer([this._serverSide = false, this._deflate]);
Stream bind(Stream stream) {
- return new Stream.eventTransformed(
- stream,
- (EventSink eventSink) {
- if (_eventSink != null) {
- throw new StateError("WebSocket transformer already used.");
- }
- _eventSink = eventSink;
- return this;
- });
+ return new Stream.eventTransformed(stream, (EventSink eventSink) {
+ if (_eventSink != null) {
+ throw new StateError("WebSocket transformer already used.");
+ }
+ _eventSink = eventSink;
+ return this;
+ });
}
void addError(Object error, [StackTrace stackTrace]) =>
@@ -94,9 +102,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
* Process data received from the underlying communication channel.
*/
void add(Uint8List buffer) {
- int count = buffer.length;
int index = 0;
- int lastIndex = count;
+ int lastIndex = buffer.length;
if (_state == CLOSED) {
throw new WebSocketException("Data on closed connection");
}
@@ -107,12 +114,20 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
int byte = buffer[index];
if (_state <= LEN_REST) {
if (_state == START) {
- _fin = (byte & 0x80) != 0;
- if ((byte & 0x70) != 0) {
- // The RSV1, RSV2 bits RSV3 must be all zero.
+ _fin = (byte & FIN) != 0;
+
+ if((byte & (RSV2 | RSV3)) != 0) {
+ // The RSV2, RSV3 bits must both be zero.
throw new WebSocketException("Protocol error");
}
- _opcode = (byte & 0xF);
+
+ if ((byte & RSV1) != 0) {
+ _compressed = true;
+ } else {
+ _compressed = false;
+ }
+ _opcode = (byte & OPCODE);
+
if (_opcode <= _WebSocketOpcode.BINARY) {
if (_opcode == _WebSocketOpcode.CONTINUATION) {
if (_currentMessageType == _WebSocketMessageType.NONE) {
@@ -120,14 +135,14 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
}
} else {
assert(_opcode == _WebSocketOpcode.TEXT ||
- _opcode == _WebSocketOpcode.BINARY);
+ _opcode == _WebSocketOpcode.BINARY);
if (_currentMessageType != _WebSocketMessageType.NONE) {
throw new WebSocketException("Protocol error");
}
_currentMessageType = _opcode;
}
} else if (_opcode >= _WebSocketOpcode.CLOSE &&
- _opcode <= _WebSocketOpcode.PONG) {
+ _opcode <= _WebSocketOpcode.PONG) {
// Control frames cannot be fragmented.
if (!_fin) throw new WebSocketException("Protocol error");
} else {
@@ -176,15 +191,14 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
_unmask(index, payloadLength, buffer);
}
// Control frame and data frame share _payloads.
- _payload.add(
- new Uint8List.view(buffer.buffer, index, payloadLength));
+ _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength));
index += payloadLength;
if (_isControlFrame()) {
if (_remainingPayloadBytes == 0) _controlFrameEnd();
} else {
if (_currentMessageType != _WebSocketMessageType.TEXT &&
_currentMessageType != _WebSocketMessageType.BINARY) {
- throw new WebSocketException("Protocol error");
+ throw new WebSocketException("Protocol error");
}
if (_remainingPayloadBytes == 0) _messageFrameEnd();
}
@@ -219,8 +233,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3];
}
Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
- Int32x4List blockBuffer = new Int32x4List.view(
- buffer.buffer, index, blockCount);
+ Int32x4List blockBuffer =
+ new Int32x4List.view(buffer.buffer, index, blockCount);
for (int i = 0; i < blockBuffer.length; i++) {
blockBuffer[i] ^= blockMask;
}
@@ -284,12 +298,17 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
void _messageFrameEnd() {
if (_fin) {
+ var bytes = _payload.takeBytes();
+ if (_deflate != null && _compressed) {
+ bytes = _deflate.processIncomingMessage(bytes);
+ }
+
switch (_currentMessageType) {
case _WebSocketMessageType.TEXT:
- _eventSink.add(UTF8.decode(_payload.takeBytes()));
+ _eventSink.add(UTF8.decode(bytes));
break;
case _WebSocketMessageType.BINARY:
- _eventSink.add(_payload.takeBytes());
+ _eventSink.add(bytes);
break;
}
_currentMessageType = _WebSocketMessageType.NONE;
@@ -331,8 +350,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
bool _isControlFrame() {
return _opcode == _WebSocketOpcode.CLOSE ||
- _opcode == _WebSocketOpcode.PING ||
- _opcode == _WebSocketOpcode.PONG;
+ _opcode == _WebSocketOpcode.PING ||
+ _opcode == _WebSocketOpcode.PONG;
}
void _prepareForNextFrame() {
@@ -347,31 +366,29 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
}
}
-
class _WebSocketPing {
final List<int> payload;
_WebSocketPing([this.payload = null]);
}
-
class _WebSocketPong {
final List<int> payload;
_WebSocketPong([this.payload = null]);
}
-
class _WebSocketTransformerImpl implements WebSocketTransformer {
final StreamController<WebSocket> _controller =
new StreamController<WebSocket>(sync: true);
final Function _protocolSelector;
+ final CompressionOptions _compression;
- _WebSocketTransformerImpl(this._protocolSelector);
+ _WebSocketTransformerImpl(this._protocolSelector, this._compression);
Stream<WebSocket> bind(Stream<HttpRequest> stream) {
stream.listen((request) {
- _upgrade(request, _protocolSelector)
- .then((WebSocket webSocket) => _controller.add(webSocket))
- .catchError(_controller.addError);
+ _upgrade(request, _protocolSelector, _compression)
+ .then((WebSocket webSocket) => _controller.add(webSocket))
+ .catchError(_controller.addError);
}, onDone: () {
_controller.close();
});
@@ -379,13 +396,14 @@ class _WebSocketTransformerImpl implements WebSocketTransformer {
return _controller.stream;
}
- static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) {
+ static Future<WebSocket> _upgrade(
+ HttpRequest request, _protocolSelector, CompressionOptions compression) {
var response = request.response;
if (!_isUpgradeRequest(request)) {
// Send error response.
response
- ..statusCode = HttpStatus.BAD_REQUEST
- ..close();
+ ..statusCode = HttpStatus.BAD_REQUEST
+ ..close();
return new Future.error(
new WebSocketException("Invalid WebSocket upgrade request"));
}
@@ -393,9 +411,9 @@ class _WebSocketTransformerImpl implements WebSocketTransformer {
Future upgrade(String protocol) {
// Send the upgrade response.
response
- ..statusCode = HttpStatus.SWITCHING_PROTOCOLS
- ..headers.add(HttpHeaders.CONNECTION, "Upgrade")
- ..headers.add(HttpHeaders.UPGRADE, "websocket");
+ ..statusCode = HttpStatus.SWITCHING_PROTOCOLS
+ ..headers.add(HttpHeaders.CONNECTION, "Upgrade")
+ ..headers.add(HttpHeaders.UPGRADE, "websocket");
String key = request.headers.value("Sec-WebSocket-Key");
_SHA1 sha1 = new _SHA1();
sha1.add("$key$_webSocketGUID".codeUnits);
@@ -404,10 +422,13 @@ class _WebSocketTransformerImpl implements WebSocketTransformer {
if (protocol != null) {
response.headers.add("Sec-WebSocket-Protocol", protocol);
}
+
+ var deflate = _negotiateCompression(request, response, compression);
+
response.headers.contentLength = 0;
- return response.detachSocket()
- .then((socket) => new _WebSocketImpl._fromSocket(
- socket, protocol, true));
+ return response.detachSocket().then((socket) =>
+ new _WebSocketImpl._fromSocket(
+ socket, protocol, compression, true, deflate));
}
var protocols = request.headers['Sec-WebSocket-Protocol'];
@@ -416,26 +437,53 @@ class _WebSocketTransformerImpl implements WebSocketTransformer {
// consisting of multiple protocols. To unify all of them, first join
// the lists with ', ' and then tokenize.
protocols = _HttpParser._tokenizeFieldValue(protocols.join(', '));
- return new Future(() => _protocolSelector(protocols))
- .then((protocol) {
- if (protocols.indexOf(protocol) < 0) {
- throw new WebSocketException(
- "Selected protocol is not in the list of available protocols");
- }
- return protocol;
- })
- .catchError((error) {
- response
- ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR
- ..close();
- throw error;
- })
- .then(upgrade);
+ return new Future(() => _protocolSelector(protocols)).then((protocol) {
+ if (protocols.indexOf(protocol) < 0) {
+ throw new WebSocketException(
+ "Selected protocol is not in the list of available protocols");
+ }
+ return protocol;
+ }).catchError((error) {
+ response
+ ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR
+ ..close();
+ throw error;
+ }).then(upgrade);
} else {
return upgrade(null);
}
}
+ static _WebSocketPerMessageDeflate _negotiateCompression(HttpRequest request,
+ HttpResponse response, CompressionOptions compression) {
+ var extensionHeader = request.headers.value("Sec-WebSocket-Extensions");
+
+ if (extensionHeader == null) {
+ extensionHeader = "";
+ }
+
+ var hv = HeaderValue.parse(extensionHeader, valueSeparator: ',');
+ if (compression.enabled && hv.value == _WebSocketImpl.PER_MESSAGE_DEFLATE) {
+ var info = compression._createHeader(hv);
+
+ response.headers.add("Sec-WebSocket-Extensions", info[0]);
+ var serverNoContextTakeover =
+ hv.parameters.containsKey(_serverNoContextTakeover);
+ var clientNoContextTakeover =
+ hv.parameters.containsKey(_clientNoContextTakeover);
+ var deflate = new _WebSocketPerMessageDeflate(
+ serverNoContextTakeover: serverNoContextTakeover,
+ clientNoContextTakeover: clientNoContextTakeover,
+ serverMaxWindowBits: info[1],
+ clientMaxWindowBits: info[1],
+ serverSide: true);
+
+ return deflate;
+ }
+
+ return null;
+ }
+
static bool _isUpgradeRequest(HttpRequest request) {
if (request.method != "GET") {
return false;
@@ -464,24 +512,127 @@ class _WebSocketTransformerImpl implements WebSocketTransformer {
}
}
+class _WebSocketPerMessageDeflate {
+ bool serverNoContextTakeover;
+ bool clientNoContextTakeover;
+ int clientMaxWindowBits;
+ int serverMaxWindowBits;
+ bool serverSide;
+
+ _Filter decoder;
+ _Filter encoder;
+
+ _WebSocketPerMessageDeflate(
+ {this.clientMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS,
+ this.serverMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS,
+ this.serverNoContextTakeover: false,
+ this.clientNoContextTakeover: false,
+ this.serverSide: false});
+
+ void _ensureDecoder() {
+ if (decoder == null) {
+ decoder = _Filter._newZLibInflateFilter(
+ serverSide ? clientMaxWindowBits : serverMaxWindowBits, null, true);
+ }
+ }
+
+ void _ensureEncoder() {
+ if (encoder == null) {
+ encoder = _Filter._newZLibDeflateFilter(
+ false,
+ ZLibOption.DEFAULT_LEVEL,
+ serverSide ? serverMaxWindowBits : clientMaxWindowBits,
+ ZLibOption.DEFAULT_MEM_LEVEL,
+ ZLibOption.STRATEGY_DEFAULT,
+ null,
+ true);
+ }
+ }
+
+ Uint8List processIncomingMessage(List<int> msg) {
+ _ensureDecoder();
+
+ var data = [];
+ data.addAll(msg);
+ data.addAll(const [0x00, 0x00, 0xff, 0xff]);
+
+ decoder.process(data, 0, data.length);
+ var reuse =
+ !(serverSide ? clientNoContextTakeover : serverNoContextTakeover);
+ var result = [];
+ var out;
+
+ while ((out = decoder.processed(flush: reuse)) != null) {
+ result.addAll(out);
+ }
+
+ decoder.processed(flush: reuse);
+
+ if (!reuse) {
+ decoder.end();
+ decoder = null;
+ }
+ return new Uint8List.fromList(result);
+ }
+
+ List<int> processOutgoingMessage(List<int> msg) {
+ _ensureEncoder();
+ var reuse =
+ !(serverSide ? serverNoContextTakeover : clientNoContextTakeover);
+ var result = [];
+ Uint8List buffer;
+ var out;
+
+ if (msg is! Uint8List) {
+ for (var i = 0; i < msg.length; i++) {
+ if (msg[i] < 0 || 255 < msg[i]) {
+ throw new ArgumentError("List element is not a byte value "
+ "(value ${msg[i]} at index $i)");
+ }
+ }
+ buffer = new Uint8List.fromList(msg);
+ } else {
+ buffer = msg;
+ }
+
+ encoder.process(buffer, 0, buffer.length);
+
+ while ((out = encoder.processed(flush: reuse)) != null) {
+ result.addAll(out);
+ }
+
+ if (serverSide ? serverNoContextTakeover : clientNoContextTakeover) {
+ encoder.end();
+ encoder = null;
+ }
+
+ if (result.length > 4) {
+ result = result.sublist(0, result.length - 4);
+ }
+
+ return result;
+ }
+}
// TODO(ajohnsen): Make this transformer reusable.
class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
final _WebSocketImpl webSocket;
EventSink _eventSink;
- _WebSocketOutgoingTransformer(this.webSocket);
+ _WebSocketPerMessageDeflate _deflateHelper;
+
+ _WebSocketOutgoingTransformer(this.webSocket) {
+ _deflateHelper = webSocket._deflate;
+ }
Stream bind(Stream stream) {
- return new Stream.eventTransformed(
- stream,
- (EventSink eventSink) {
- if (_eventSink != null) {
- throw new StateError("WebSocket transformer already used");
- }
- _eventSink = eventSink;
- return this;
- });
+ return new Stream.eventTransformed(stream, (EventSink eventSink) {
+ if (_eventSink != null) {
+ throw new StateError("WebSocket transformer already used");
+ }
+ _eventSink = eventSink;
+ return this;
+ });
}
void add(message) {
@@ -500,12 +651,16 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
opcode = _WebSocketOpcode.TEXT;
data = UTF8.encode(message);
} else {
- if (message is !List<int>) {
+ if (message is! List<int>) {
throw new ArgumentError(message);
}
opcode = _WebSocketOpcode.BINARY;
data = message;
}
+
+ if (_deflateHelper != null) {
+ data = _deflateHelper.processOutgoingMessage(data);
+ }
} else {
opcode = _WebSocketOpcode.TEXT;
}
@@ -531,11 +686,19 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
_eventSink.close();
}
- void addFrame(int opcode, List<int> data) =>
- createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add);
+ void addFrame(int opcode, List<int> data) => createFrame(
+ opcode,
+ data,
+ webSocket._serverSide,
+ _deflateHelper != null &&
+ (opcode == _WebSocketOpcode.TEXT ||
+ opcode == _WebSocketOpcode.BINARY)).forEach((e) {
+ _eventSink.add(e);
+ });
- static Iterable createFrame(int opcode, List<int> data, bool serverSide) {
- bool mask = !serverSide; // Masking not implemented for server.
+ static Iterable createFrame(
+ int opcode, List<int> data, bool serverSide, bool compressed) {
+ bool mask = !serverSide; // Masking not implemented for server.
int dataLength = data == null ? 0 : data.length;
// Determine the header size.
int headerSize = (mask) ? 6 : 2;
@@ -546,11 +709,15 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
}
Uint8List header = new Uint8List(headerSize);
int index = 0;
+
// Set FIN and opcode.
- header[index++] = 0x80 | opcode;
+ var hoc = _WebSocketProtocolTransformer.FIN
+ | (compressed ? _WebSocketProtocolTransformer.RSV1 : 0)
+ | (opcode & _WebSocketProtocolTransformer.OPCODE);
+
+ header[index++] = hoc;
// Determine size and position of length field.
int lengthBytes = 1;
- int firstLengthByte = 1;
if (dataLength > 65535) {
header[index++] = 127;
lengthBytes = 8;
@@ -580,8 +747,7 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
list = new Uint8List(data.length);
for (int i = 0; i < data.length; i++) {
if (data[i] < 0 || 255 < data[i]) {
- throw new ArgumentError(
- "List element is not a byte value "
+ throw new ArgumentError("List element is not a byte value "
"(value ${data[i]} at index $i)");
}
list[i] = data[i];
@@ -597,8 +763,8 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
mask = (mask << 8) | maskBytes[i];
}
Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
- Int32x4List blockBuffer = new Int32x4List.view(
- list.buffer, 0, blockCount);
+ Int32x4List blockBuffer =
+ new Int32x4List.view(list.buffer, 0, blockCount);
for (int i = 0; i < blockBuffer.length; i++) {
blockBuffer[i] ^= blockMask;
}
@@ -619,7 +785,6 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
}
}
-
class _WebSocketConsumer implements StreamConsumer {
final _WebSocketImpl webSocket;
final Socket socket;
@@ -664,28 +829,28 @@ class _WebSocketConsumer implements StreamConsumer {
_ensureController() {
if (_controller != null) return;
- _controller = new StreamController(sync: true,
- onPause: _onPause,
- onResume: _onResume,
- onCancel: _onListen);
- var stream = _controller.stream.transform(
- new _WebSocketOutgoingTransformer(webSocket));
- socket.addStream(stream)
- .then((_) {
- _done();
- _closeCompleter.complete(webSocket);
- }, onError: (error, StackTrace stackTrace) {
- _closed = true;
- _cancel();
- if (error is ArgumentError) {
- if (!_done(error, stackTrace)) {
- _closeCompleter.completeError(error, stackTrace);
- }
- } else {
- _done();
- _closeCompleter.complete(webSocket);
- }
- });
+ _controller = new StreamController(
+ sync: true,
+ onPause: _onPause,
+ onResume: _onResume,
+ onCancel: _onListen);
+ var stream = _controller.stream
+ .transform(new _WebSocketOutgoingTransformer(webSocket));
+ socket.addStream(stream).then((_) {
+ _done();
+ _closeCompleter.complete(webSocket);
+ }, onError: (error, StackTrace stackTrace) {
+ _closed = true;
+ _cancel();
+ if (error is ArgumentError) {
+ if (!_done(error, stackTrace)) {
+ _closeCompleter.completeError(error, stackTrace);
+ }
+ } else {
+ _done();
+ _closeCompleter.complete(webSocket);
+ }
+ });
}
bool _done([error, StackTrace stackTrace]) {
@@ -706,13 +871,9 @@ class _WebSocketConsumer implements StreamConsumer {
}
_ensureController();
_completer = new Completer();
- _subscription = stream.listen(
- (data) {
- _controller.add(data);
- },
- onDone: _done,
- onError: _done,
- cancelOnError: true);
+ _subscription = stream.listen((data) {
+ _controller.add(data);
+ }, onDone: _done, onError: _done, cancelOnError: true);
if (_issuedPause) {
_subscription.pause();
_issuedPause = false;
@@ -742,10 +903,11 @@ class _WebSocketConsumer implements StreamConsumer {
}
}
-
class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
// Use default Map so we keep order.
static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>();
+ static const int DEFAULT_WINDOW_BITS = 15;
+ static const String PER_MESSAGE_DEFLATE = "permessage-deflate";
final String protocol;
@@ -766,11 +928,13 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
int _outCloseCode;
String _outCloseReason;
Timer _closeTimer;
+ _WebSocketPerMessageDeflate _deflate;
static final HttpClient _httpClient = new HttpClient();
static Future<WebSocket> connect(
- String url, Iterable<String> protocols, Map<String, dynamic> headers) {
+ String url, Iterable<String> protocols, Map<String, dynamic> headers,
+ {CompressionOptions compression: CompressionOptions.DEFAULT}) {
Uri uri = Uri.parse(url);
if (uri.scheme != "ws" && uri.scheme != "wss") {
throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'");
@@ -784,144 +948,182 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
}
String nonce = _CryptoUtils.bytesToBase64(nonceData);
- uri = new Uri(scheme: uri.scheme == "wss" ? "https" : "http",
- userInfo: uri.userInfo,
- host: uri.host,
- port: uri.port,
- path: uri.path,
- query: uri.query,
- fragment: uri.fragment);
- return _httpClient.openUrl("GET", uri)
- .then((request) {
- if (uri.userInfo != null && !uri.userInfo.isEmpty) {
- // If the URL contains user information use that for basic
- // authorization.
- String auth =
- _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo));
- request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth");
- }
- if (headers != null) {
- headers.forEach((field, value) => request.headers.add(field, value));
- }
- // Setup the initial handshake.
+ uri = new Uri(
+ scheme: uri.scheme == "wss" ? "https" : "http",
+ userInfo: uri.userInfo,
+ host: uri.host,
+ port: uri.port,
+ path: uri.path,
+ query: uri.query,
+ fragment: uri.fragment);
+ return _httpClient.openUrl("GET", uri).then((request) {
+ if (uri.userInfo != null && !uri.userInfo.isEmpty) {
+ // If the URL contains user information use that for basic
+ // authorization.
+ String auth = _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo));
+ request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth");
+ }
+ if (headers != null) {
+ headers.forEach((field, value) => request.headers.add(field, value));
+ }
+ // Setup the initial handshake.
+ request.headers
+ ..set(HttpHeaders.CONNECTION, "Upgrade")
+ ..set(HttpHeaders.UPGRADE, "websocket")
+ ..set("Sec-WebSocket-Key", nonce)
+ ..set("Cache-Control", "no-cache")
+ ..set("Sec-WebSocket-Version", "13");
+ if (protocols != null) {
+ request.headers.add("Sec-WebSocket-Protocol", protocols.toList());
+ }
+
+ if (compression.enabled) {
request.headers
- ..set(HttpHeaders.CONNECTION, "Upgrade")
- ..set(HttpHeaders.UPGRADE, "websocket")
- ..set("Sec-WebSocket-Key", nonce)
- ..set("Cache-Control", "no-cache")
- ..set("Sec-WebSocket-Version", "13");
- if (protocols != null) {
- request.headers.add("Sec-WebSocket-Protocol", protocols.toList());
- }
- return request.close();
- })
- .then((response) {
- void error(String message) {
- // Flush data.
- response.detachSocket().then((socket) {
- socket.destroy();
- });
- throw new WebSocketException(message);
- }
- if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS ||
- response.headers[HttpHeaders.CONNECTION] == null ||
- !response.headers[HttpHeaders.CONNECTION].any(
- (value) => value.toLowerCase() == "upgrade") ||
- response.headers.value(HttpHeaders.UPGRADE).toLowerCase() !=
- "websocket") {
- error("Connection to '$uri' was not upgraded to websocket");
- }
- String accept = response.headers.value("Sec-WebSocket-Accept");
- if (accept == null) {
- error("Response did not contain a 'Sec-WebSocket-Accept' header");
- }
- _SHA1 sha1 = new _SHA1();
- sha1.add("$nonce$_webSocketGUID".codeUnits);
- List<int> expectedAccept = sha1.close();
- List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
- if (expectedAccept.length != receivedAccept.length) {
- error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
+ .add("Sec-WebSocket-Extensions", compression._createHeader());
+ }
+
+ return request.close();
+ }).then((response) {
+ void error(String message) {
+ // Flush data.
+ response.detachSocket().then((socket) {
+ socket.destroy();
+ });
+ throw new WebSocketException(message);
+ }
+ if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS ||
+ response.headers[HttpHeaders.CONNECTION] == null ||
+ !response.headers[HttpHeaders.CONNECTION]
+ .any((value) => value.toLowerCase() == "upgrade") ||
+ response.headers.value(HttpHeaders.UPGRADE).toLowerCase() !=
+ "websocket") {
+ error("Connection to '$uri' was not upgraded to websocket");
+ }
+ String accept = response.headers.value("Sec-WebSocket-Accept");
+ if (accept == null) {
+ error("Response did not contain a 'Sec-WebSocket-Accept' header");
+ }
+ _SHA1 sha1 = new _SHA1();
+ sha1.add("$nonce$_webSocketGUID".codeUnits);
+ List<int> expectedAccept = sha1.close();
+ List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
+ if (expectedAccept.length != receivedAccept.length) {
+ error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
+ }
+ for (int i = 0; i < expectedAccept.length; i++) {
+ if (expectedAccept[i] != receivedAccept[i]) {
+ error("Bad response 'Sec-WebSocket-Accept' header");
}
- for (int i = 0; i < expectedAccept.length; i++) {
- if (expectedAccept[i] != receivedAccept[i]) {
- error("Bad response 'Sec-WebSocket-Accept' header");
- }
+ }
+ var protocol = response.headers.value('Sec-WebSocket-Protocol');
+
+ _WebSocketPerMessageDeflate deflate =
+ negotiateClientCompression(response, compression);
+
+ return response.detachSocket().then((socket) =>
+ new _WebSocketImpl._fromSocket(
+ socket, protocol, compression, false, deflate));
+ });
+ }
+
+ static _WebSocketPerMessageDeflate negotiateClientCompression(
+ HttpClientResponse response, CompressionOptions compression) {
+ String extensionHeader = response.headers.value('Sec-WebSocket-Extensions');
+
+ if (extensionHeader == null) {
+ extensionHeader = "";
+ }
+
+ var hv = HeaderValue.parse(extensionHeader, valueSeparator: ',');
+
+ if (compression.enabled && hv.value == PER_MESSAGE_DEFLATE) {
+ var serverNoContextTakeover =
+ hv.parameters.containsKey(_serverNoContextTakeover);
+ var clientNoContextTakeover =
+ hv.parameters.containsKey(_clientNoContextTakeover);
+
+ int getWindowBits(String type) {
+ var o = hv.parameters[type];
+ if (o == null) {
+ return DEFAULT_WINDOW_BITS;
}
- var protocol = response.headers.value('Sec-WebSocket-Protocol');
- return response.detachSocket()
- .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol));
- });
+
+ o = int.parse(o, onError: (s) => DEFAULT_WINDOW_BITS);
+ return o;
+ }
+
+ return new _WebSocketPerMessageDeflate(
+ clientMaxWindowBits: getWindowBits(_clientMaxWindowBits),
+ serverMaxWindowBits: getWindowBits(_serverMaxWindowBits),
+ clientNoContextTakeover: clientNoContextTakeover,
+ serverNoContextTakeover: serverNoContextTakeover);
+ }
+
+ return null;
}
- _WebSocketImpl._fromSocket(this._socket, this.protocol,
- [this._serverSide = false]) {
+ _WebSocketImpl._fromSocket(
+ this._socket, this.protocol, CompressionOptions compression,
+ [this._serverSide = false, _WebSocketPerMessageDeflate deflate]) {
_consumer = new _WebSocketConsumer(this, _socket);
_sink = new _StreamSinkImpl(_consumer);
_readyState = WebSocket.OPEN;
-
- var transformer = new _WebSocketProtocolTransformer(_serverSide);
- _subscription = _socket.transform(transformer).listen(
- (data) {
- if (data is _WebSocketPing) {
- if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
- } else if (data is _WebSocketPong) {
- // Simply set pingInterval, as it'll cancel any timers.
- pingInterval = _pingInterval;
- } else {
- _controller.add(data);
- }
- },
- onError: (error) {
- if (_closeTimer != null) _closeTimer.cancel();
- if (error is FormatException) {
- _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
- } else {
- _close(WebSocketStatus.PROTOCOL_ERROR);
- }
- // An error happened, set the close code set above.
- _closeCode = _outCloseCode;
- _closeReason = _outCloseReason;
- _controller.close();
- },
- onDone: () {
- if (_closeTimer != null) _closeTimer.cancel();
- if (_readyState == WebSocket.OPEN) {
- _readyState = WebSocket.CLOSING;
- if (!_isReservedStatusCode(transformer.closeCode)) {
- _close(transformer.closeCode, transformer.closeReason);
- } else {
- _close();
- }
- _readyState = WebSocket.CLOSED;
- }
- // Protocol close, use close code from transformer.
- _closeCode = transformer.closeCode;
- _closeReason = transformer.closeReason;
- _controller.close();
- },
- cancelOnError: true);
+ _deflate = deflate;
+
+ var transformer = new _WebSocketProtocolTransformer(_serverSide, _deflate);
+ _subscription = _socket.transform(transformer).listen((data) {
+ if (data is _WebSocketPing) {
+ if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
+ } else if (data is _WebSocketPong) {
+ // Simply set pingInterval, as it'll cancel any timers.
+ pingInterval = _pingInterval;
+ } else {
+ _controller.add(data);
+ }
+ }, onError: (error, stackTrace) {
+ if (_closeTimer != null) _closeTimer.cancel();
+ if (error is FormatException) {
+ _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
+ } else {
+ _close(WebSocketStatus.PROTOCOL_ERROR);
+ }
+ // An error happened, set the close code set above.
+ _closeCode = _outCloseCode;
+ _closeReason = _outCloseReason;
+ _controller.close();
+ }, onDone: () {
+ if (_closeTimer != null) _closeTimer.cancel();
+ if (_readyState == WebSocket.OPEN) {
+ _readyState = WebSocket.CLOSING;
+ if (!_isReservedStatusCode(transformer.closeCode)) {
+ _close(transformer.closeCode, transformer.closeReason);
+ } else {
+ _close();
+ }
+ _readyState = WebSocket.CLOSED;
+ }
+ // Protocol close, use close code from transformer.
+ _closeCode = transformer.closeCode;
+ _closeReason = transformer.closeReason;
+ _controller.close();
+ }, cancelOnError: true);
_subscription.pause();
- _controller = new StreamController(sync: true,
- onListen: _subscription.resume,
- onCancel: () {
- _subscription.cancel();
- _subscription = null;
- },
- onPause: _subscription.pause,
- onResume: _subscription.resume);
+ _controller = new StreamController(
+ sync: true, onListen: _subscription.resume, onCancel: () {
+ _subscription.cancel();
+ _subscription = null;
+ }, onPause: _subscription.pause, onResume: _subscription.resume);
_webSockets[_serviceId] = this;
- try { _socket._owner = this; } catch (_) {}
+ try {
+ _socket._owner = this;
+ } catch (_) {}
}
StreamSubscription listen(void onData(message),
- {Function onError,
- void onDone(),
- bool cancelOnError}) {
+ {Function onError, void onDone(), bool cancelOnError}) {
return _controller.stream.listen(onData,
- onError: onError,
- onDone: onDone,
- cancelOnError: cancelOnError);
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
Duration get pingInterval => _pingInterval;
@@ -1027,13 +1229,12 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
static bool _isReservedStatusCode(int code) {
return code != null &&
- (code < WebSocketStatus.NORMAL_CLOSURE ||
+ (code < WebSocketStatus.NORMAL_CLOSURE ||
code == WebSocketStatus.RESERVED_1004 ||
code == WebSocketStatus.NO_STATUS_RECEIVED ||
code == WebSocketStatus.ABNORMAL_CLOSURE ||
(code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
- code < WebSocketStatus.RESERVED_1015) ||
- (code >= WebSocketStatus.RESERVED_1015 &&
- code < 3000));
+ code < WebSocketStatus.RESERVED_1015) ||
+ (code >= WebSocketStatus.RESERVED_1015 && code < 3000));
}
}
« no previous file with comments | « sdk/lib/io/websocket.dart ('k') | tests/standalone/io/web_socket_compression_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698