| Index: sdk/lib/io/websocket_impl.dart
|
| diff --git a/sdk/lib/io/websocket_impl.dart b/sdk/lib/io/websocket_impl.dart
|
| index df3101737a7fd16da574a67a6bd64fff6e3888d6..cc68544e536233a5e43d63a699e01d4d58ce5b4e 100644
|
| --- a/sdk/lib/io/websocket_impl.dart
|
| +++ b/sdk/lib/io/websocket_impl.dart
|
| @@ -5,10 +5,6 @@
|
| part of dart.io;
|
|
|
| const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
|
| -const String _clientNoContextTakeover = "client_no_context_takeover";
|
| -const String _serverNoContextTakeover = "server_no_context_takeover";
|
| -const String _clientMaxWindowBits = "client_max_window_bits";
|
| -const String _serverMaxWindowBits = "server_max_window_bits";
|
|
|
| // Matches _WebSocketOpcode.
|
| class _WebSocketMessageType {
|
| @@ -17,6 +13,7 @@ class _WebSocketMessageType {
|
| static const int BINARY = 2;
|
| }
|
|
|
| +
|
| class _WebSocketOpcode {
|
| static const int CONTINUATION = 0;
|
| static const int TEXT = 1;
|
| @@ -41,8 +38,8 @@ class _WebSocketOpcode {
|
| * which is supplied through the [:handleData:]. As the protocol is processed,
|
| * it'll output frame data as either a List<int> or String.
|
| *
|
| - * Important information about usage: Be sure you use cancelOnError, so the
|
| - * socket will be closed when the processor encounter an error. Not using it
|
| + * Important infomation about usage: Be sure you use cancelOnError, so the
|
| + * socket will be closed when the processer encounter an error. Not using it
|
| * will lead to undefined behaviour.
|
| */
|
| // TODO(ajohnsen): make this transformer reusable?
|
| @@ -54,15 +51,9 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
|
| static const int PAYLOAD = 4;
|
| static const int CLOSED = 5;
|
| static const int FAILURE = 6;
|
| - static const int FIN = 0x80;
|
| - static const int RSV1 = 0x40;
|
| - static const int RSV2 = 0x20;
|
| - static const int RSV3 = 0x10;
|
| - static const int OPCODE = 0xF;
|
|
|
| int _state = START;
|
| bool _fin = false;
|
| - bool _compressed = false;
|
| int _opcode = -1;
|
| int _len = -1;
|
| bool _masked = false;
|
| @@ -80,17 +71,18 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
|
| final List _maskingBytes = new List(4);
|
| final BytesBuilder _payload = new BytesBuilder(copy: false);
|
|
|
| - _WebSocketPerMessageDeflate _deflate;
|
| - _WebSocketProtocolTransformer([this._serverSide = false, this._deflate]);
|
| + _WebSocketProtocolTransformer([this._serverSide = false]);
|
|
|
| Stream bind(Stream stream) {
|
| - return new Stream.eventTransformed(stream, (EventSink eventSink) {
|
| - if (_eventSink != null) {
|
| - throw new StateError("WebSocket transformer already used.");
|
| - }
|
| - _eventSink = eventSink;
|
| - return this;
|
| - });
|
| + return new Stream.eventTransformed(
|
| + stream,
|
| + (EventSink eventSink) {
|
| + if (_eventSink != null) {
|
| + throw new StateError("WebSocket transformer already used.");
|
| + }
|
| + _eventSink = eventSink;
|
| + return this;
|
| + });
|
| }
|
|
|
| void addError(Object error, [StackTrace stackTrace]) =>
|
| @@ -102,8 +94,9 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
|
| * Process data received from the underlying communication channel.
|
| */
|
| void add(Uint8List buffer) {
|
| + int count = buffer.length;
|
| int index = 0;
|
| - int lastIndex = buffer.length;
|
| + int lastIndex = count;
|
| if (_state == CLOSED) {
|
| throw new WebSocketException("Data on closed connection");
|
| }
|
| @@ -114,20 +107,12 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
|
| int byte = buffer[index];
|
| if (_state <= LEN_REST) {
|
| if (_state == START) {
|
| - _fin = (byte & FIN) != 0;
|
| -
|
| - if((byte & (RSV2 | RSV3)) != 0) {
|
| - // The RSV2, RSV3 bits must both be zero.
|
| + _fin = (byte & 0x80) != 0;
|
| + if ((byte & 0x70) != 0) {
|
| + // The RSV1, RSV2 bits RSV3 must be all zero.
|
| throw new WebSocketException("Protocol error");
|
| }
|
| -
|
| - if ((byte & RSV1) != 0) {
|
| - _compressed = true;
|
| - } else {
|
| - _compressed = false;
|
| - }
|
| - _opcode = (byte & OPCODE);
|
| -
|
| + _opcode = (byte & 0xF);
|
| if (_opcode <= _WebSocketOpcode.BINARY) {
|
| if (_opcode == _WebSocketOpcode.CONTINUATION) {
|
| if (_currentMessageType == _WebSocketMessageType.NONE) {
|
| @@ -135,14 +120,14 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
|
| }
|
| } else {
|
| assert(_opcode == _WebSocketOpcode.TEXT ||
|
| - _opcode == _WebSocketOpcode.BINARY);
|
| + _opcode == _WebSocketOpcode.BINARY);
|
| if (_currentMessageType != _WebSocketMessageType.NONE) {
|
| throw new WebSocketException("Protocol error");
|
| }
|
| _currentMessageType = _opcode;
|
| }
|
| } else if (_opcode >= _WebSocketOpcode.CLOSE &&
|
| - _opcode <= _WebSocketOpcode.PONG) {
|
| + _opcode <= _WebSocketOpcode.PONG) {
|
| // Control frames cannot be fragmented.
|
| if (!_fin) throw new WebSocketException("Protocol error");
|
| } else {
|
| @@ -191,14 +176,15 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
|
| _unmask(index, payloadLength, buffer);
|
| }
|
| // Control frame and data frame share _payloads.
|
| - _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength));
|
| + _payload.add(
|
| + new Uint8List.view(buffer.buffer, index, payloadLength));
|
| index += payloadLength;
|
| if (_isControlFrame()) {
|
| if (_remainingPayloadBytes == 0) _controlFrameEnd();
|
| } else {
|
| if (_currentMessageType != _WebSocketMessageType.TEXT &&
|
| _currentMessageType != _WebSocketMessageType.BINARY) {
|
| - throw new WebSocketException("Protocol error");
|
| + throw new WebSocketException("Protocol error");
|
| }
|
| if (_remainingPayloadBytes == 0) _messageFrameEnd();
|
| }
|
| @@ -233,8 +219,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
|
| mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3];
|
| }
|
| Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
|
| - Int32x4List blockBuffer =
|
| - new Int32x4List.view(buffer.buffer, index, blockCount);
|
| + Int32x4List blockBuffer = new Int32x4List.view(
|
| + buffer.buffer, index, blockCount);
|
| for (int i = 0; i < blockBuffer.length; i++) {
|
| blockBuffer[i] ^= blockMask;
|
| }
|
| @@ -298,17 +284,12 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
|
|
|
| void _messageFrameEnd() {
|
| if (_fin) {
|
| - var bytes = _payload.takeBytes();
|
| - if (_deflate != null && _compressed) {
|
| - bytes = _deflate.processIncomingMessage(bytes);
|
| - }
|
| -
|
| switch (_currentMessageType) {
|
| case _WebSocketMessageType.TEXT:
|
| - _eventSink.add(UTF8.decode(bytes));
|
| + _eventSink.add(UTF8.decode(_payload.takeBytes()));
|
| break;
|
| case _WebSocketMessageType.BINARY:
|
| - _eventSink.add(bytes);
|
| + _eventSink.add(_payload.takeBytes());
|
| break;
|
| }
|
| _currentMessageType = _WebSocketMessageType.NONE;
|
| @@ -350,8 +331,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
|
|
|
| bool _isControlFrame() {
|
| return _opcode == _WebSocketOpcode.CLOSE ||
|
| - _opcode == _WebSocketOpcode.PING ||
|
| - _opcode == _WebSocketOpcode.PONG;
|
| + _opcode == _WebSocketOpcode.PING ||
|
| + _opcode == _WebSocketOpcode.PONG;
|
| }
|
|
|
| void _prepareForNextFrame() {
|
| @@ -366,29 +347,31 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
|
| }
|
| }
|
|
|
| +
|
| class _WebSocketPing {
|
| final List<int> payload;
|
| _WebSocketPing([this.payload = null]);
|
| }
|
|
|
| +
|
| class _WebSocketPong {
|
| final List<int> payload;
|
| _WebSocketPong([this.payload = null]);
|
| }
|
|
|
| +
|
| class _WebSocketTransformerImpl implements WebSocketTransformer {
|
| final StreamController<WebSocket> _controller =
|
| new StreamController<WebSocket>(sync: true);
|
| final Function _protocolSelector;
|
| - final CompressionOptions _compression;
|
|
|
| - _WebSocketTransformerImpl(this._protocolSelector, this._compression);
|
| + _WebSocketTransformerImpl(this._protocolSelector);
|
|
|
| Stream<WebSocket> bind(Stream<HttpRequest> stream) {
|
| stream.listen((request) {
|
| - _upgrade(request, _protocolSelector, _compression)
|
| - .then((WebSocket webSocket) => _controller.add(webSocket))
|
| - .catchError(_controller.addError);
|
| + _upgrade(request, _protocolSelector)
|
| + .then((WebSocket webSocket) => _controller.add(webSocket))
|
| + .catchError(_controller.addError);
|
| }, onDone: () {
|
| _controller.close();
|
| });
|
| @@ -396,14 +379,13 @@ class _WebSocketTransformerImpl implements WebSocketTransformer {
|
| return _controller.stream;
|
| }
|
|
|
| - static Future<WebSocket> _upgrade(
|
| - HttpRequest request, _protocolSelector, CompressionOptions compression) {
|
| + static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) {
|
| var response = request.response;
|
| if (!_isUpgradeRequest(request)) {
|
| // Send error response.
|
| response
|
| - ..statusCode = HttpStatus.BAD_REQUEST
|
| - ..close();
|
| + ..statusCode = HttpStatus.BAD_REQUEST
|
| + ..close();
|
| return new Future.error(
|
| new WebSocketException("Invalid WebSocket upgrade request"));
|
| }
|
| @@ -411,9 +393,9 @@ class _WebSocketTransformerImpl implements WebSocketTransformer {
|
| Future upgrade(String protocol) {
|
| // Send the upgrade response.
|
| response
|
| - ..statusCode = HttpStatus.SWITCHING_PROTOCOLS
|
| - ..headers.add(HttpHeaders.CONNECTION, "Upgrade")
|
| - ..headers.add(HttpHeaders.UPGRADE, "websocket");
|
| + ..statusCode = HttpStatus.SWITCHING_PROTOCOLS
|
| + ..headers.add(HttpHeaders.CONNECTION, "Upgrade")
|
| + ..headers.add(HttpHeaders.UPGRADE, "websocket");
|
| String key = request.headers.value("Sec-WebSocket-Key");
|
| _SHA1 sha1 = new _SHA1();
|
| sha1.add("$key$_webSocketGUID".codeUnits);
|
| @@ -422,13 +404,10 @@ class _WebSocketTransformerImpl implements WebSocketTransformer {
|
| if (protocol != null) {
|
| response.headers.add("Sec-WebSocket-Protocol", protocol);
|
| }
|
| -
|
| - var deflate = _negotiateCompression(request, response, compression);
|
| -
|
| response.headers.contentLength = 0;
|
| - return response.detachSocket().then((socket) =>
|
| - new _WebSocketImpl._fromSocket(
|
| - socket, protocol, compression, true, deflate));
|
| + return response.detachSocket()
|
| + .then((socket) => new _WebSocketImpl._fromSocket(
|
| + socket, protocol, true));
|
| }
|
|
|
| var protocols = request.headers['Sec-WebSocket-Protocol'];
|
| @@ -437,53 +416,26 @@ class _WebSocketTransformerImpl implements WebSocketTransformer {
|
| // consisting of multiple protocols. To unify all of them, first join
|
| // the lists with ', ' and then tokenize.
|
| protocols = _HttpParser._tokenizeFieldValue(protocols.join(', '));
|
| - return new Future(() => _protocolSelector(protocols)).then((protocol) {
|
| - if (protocols.indexOf(protocol) < 0) {
|
| - throw new WebSocketException(
|
| - "Selected protocol is not in the list of available protocols");
|
| - }
|
| - return protocol;
|
| - }).catchError((error) {
|
| - response
|
| - ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR
|
| - ..close();
|
| - throw error;
|
| - }).then(upgrade);
|
| + return new Future(() => _protocolSelector(protocols))
|
| + .then((protocol) {
|
| + if (protocols.indexOf(protocol) < 0) {
|
| + throw new WebSocketException(
|
| + "Selected protocol is not in the list of available protocols");
|
| + }
|
| + return protocol;
|
| + })
|
| + .catchError((error) {
|
| + response
|
| + ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR
|
| + ..close();
|
| + throw error;
|
| + })
|
| + .then(upgrade);
|
| } else {
|
| return upgrade(null);
|
| }
|
| }
|
|
|
| - static _WebSocketPerMessageDeflate _negotiateCompression(HttpRequest request,
|
| - HttpResponse response, CompressionOptions compression) {
|
| - var extensionHeader = request.headers.value("Sec-WebSocket-Extensions");
|
| -
|
| - if (extensionHeader == null) {
|
| - extensionHeader = "";
|
| - }
|
| -
|
| - var hv = HeaderValue.parse(extensionHeader, valueSeparator: ',');
|
| - if (compression.enabled && hv.value == _WebSocketImpl.PER_MESSAGE_DEFLATE) {
|
| - var info = compression._createHeader(hv);
|
| -
|
| - response.headers.add("Sec-WebSocket-Extensions", info[0]);
|
| - var serverNoContextTakeover =
|
| - hv.parameters.containsKey(_serverNoContextTakeover);
|
| - var clientNoContextTakeover =
|
| - hv.parameters.containsKey(_clientNoContextTakeover);
|
| - var deflate = new _WebSocketPerMessageDeflate(
|
| - serverNoContextTakeover: serverNoContextTakeover,
|
| - clientNoContextTakeover: clientNoContextTakeover,
|
| - serverMaxWindowBits: info[1],
|
| - clientMaxWindowBits: info[1],
|
| - serverSide: true);
|
| -
|
| - return deflate;
|
| - }
|
| -
|
| - return null;
|
| - }
|
| -
|
| static bool _isUpgradeRequest(HttpRequest request) {
|
| if (request.method != "GET") {
|
| return false;
|
| @@ -512,127 +464,24 @@ class _WebSocketTransformerImpl implements WebSocketTransformer {
|
| }
|
| }
|
|
|
| -class _WebSocketPerMessageDeflate {
|
| - bool serverNoContextTakeover;
|
| - bool clientNoContextTakeover;
|
| - int clientMaxWindowBits;
|
| - int serverMaxWindowBits;
|
| - bool serverSide;
|
| -
|
| - _Filter decoder;
|
| - _Filter encoder;
|
| -
|
| - _WebSocketPerMessageDeflate(
|
| - {this.clientMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS,
|
| - this.serverMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS,
|
| - this.serverNoContextTakeover: false,
|
| - this.clientNoContextTakeover: false,
|
| - this.serverSide: false});
|
| -
|
| - void _ensureDecoder() {
|
| - if (decoder == null) {
|
| - decoder = _Filter._newZLibInflateFilter(
|
| - serverSide ? clientMaxWindowBits : serverMaxWindowBits, null, true);
|
| - }
|
| - }
|
| -
|
| - void _ensureEncoder() {
|
| - if (encoder == null) {
|
| - encoder = _Filter._newZLibDeflateFilter(
|
| - false,
|
| - ZLibOption.DEFAULT_LEVEL,
|
| - serverSide ? serverMaxWindowBits : clientMaxWindowBits,
|
| - ZLibOption.DEFAULT_MEM_LEVEL,
|
| - ZLibOption.STRATEGY_DEFAULT,
|
| - null,
|
| - true);
|
| - }
|
| - }
|
| -
|
| - List<int> processIncomingMessage(List<int> msg) {
|
| - _ensureDecoder();
|
| -
|
| - var data = [];
|
| - data.addAll(msg);
|
| - data.addAll(const [0x00, 0x00, 0xff, 0xff]);
|
| -
|
| - decoder.process(data, 0, data.length);
|
| - var reuse =
|
| - !(serverSide ? clientNoContextTakeover : serverNoContextTakeover);
|
| - var result = [];
|
| - var out;
|
| -
|
| - while ((out = decoder.processed(flush: reuse)) != null) {
|
| - result.addAll(out);
|
| - }
|
| -
|
| - decoder.processed(flush: reuse);
|
| -
|
| - if (!reuse) {
|
| - decoder.end();
|
| - decoder = null;
|
| - }
|
| - return result;
|
| - }
|
| -
|
| - List<int> processOutgoingMessage(List<int> msg) {
|
| - _ensureEncoder();
|
| - var reuse =
|
| - !(serverSide ? serverNoContextTakeover : clientNoContextTakeover);
|
| - var result = [];
|
| - Uint8List buffer;
|
| - var out;
|
| -
|
| - if (msg is! Uint8List) {
|
| - for (var i = 0; i < msg.length; i++) {
|
| - if (msg[i] < 0 || 255 < msg[i]) {
|
| - throw new ArgumentError("List element is not a byte value "
|
| - "(value ${msg[i]} at index $i)");
|
| - }
|
| - }
|
| - buffer = new Uint8List.fromList(msg);
|
| - } else {
|
| - buffer = msg;
|
| - }
|
| -
|
| - encoder.process(buffer, 0, buffer.length);
|
| -
|
| - while ((out = encoder.processed(flush: reuse)) != null) {
|
| - result.addAll(out);
|
| - }
|
| -
|
| - if (serverSide ? serverNoContextTakeover : clientNoContextTakeover) {
|
| - encoder.end();
|
| - encoder = null;
|
| - }
|
| -
|
| - if (result.length > 4) {
|
| - result = result.sublist(0, result.length - 4);
|
| - }
|
| -
|
| - return result;
|
| - }
|
| -}
|
|
|
| // TODO(ajohnsen): Make this transformer reusable.
|
| class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
|
| final _WebSocketImpl webSocket;
|
| EventSink _eventSink;
|
|
|
| - _WebSocketPerMessageDeflate _deflateHelper;
|
| -
|
| - _WebSocketOutgoingTransformer(this.webSocket) {
|
| - _deflateHelper = webSocket._deflate;
|
| - }
|
| + _WebSocketOutgoingTransformer(this.webSocket);
|
|
|
| Stream bind(Stream stream) {
|
| - return new Stream.eventTransformed(stream, (EventSink eventSink) {
|
| - if (_eventSink != null) {
|
| - throw new StateError("WebSocket transformer already used");
|
| - }
|
| - _eventSink = eventSink;
|
| - return this;
|
| - });
|
| + return new Stream.eventTransformed(
|
| + stream,
|
| + (EventSink eventSink) {
|
| + if (_eventSink != null) {
|
| + throw new StateError("WebSocket transformer already used");
|
| + }
|
| + _eventSink = eventSink;
|
| + return this;
|
| + });
|
| }
|
|
|
| void add(message) {
|
| @@ -651,16 +500,12 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
|
| opcode = _WebSocketOpcode.TEXT;
|
| data = UTF8.encode(message);
|
| } else {
|
| - if (message is! List<int>) {
|
| + if (message is !List<int>) {
|
| throw new ArgumentError(message);
|
| }
|
| opcode = _WebSocketOpcode.BINARY;
|
| data = message;
|
| }
|
| -
|
| - if (_deflateHelper != null) {
|
| - data = _deflateHelper.processOutgoingMessage(data);
|
| - }
|
| } else {
|
| opcode = _WebSocketOpcode.TEXT;
|
| }
|
| @@ -686,19 +531,11 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
|
| _eventSink.close();
|
| }
|
|
|
| - void addFrame(int opcode, List<int> data) => createFrame(
|
| - opcode,
|
| - data,
|
| - webSocket._serverSide,
|
| - _deflateHelper != null &&
|
| - (opcode == _WebSocketOpcode.TEXT ||
|
| - opcode == _WebSocketOpcode.BINARY)).forEach((e) {
|
| - _eventSink.add(e);
|
| - });
|
| + void addFrame(int opcode, List<int> data) =>
|
| + createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add);
|
|
|
| - static Iterable createFrame(
|
| - int opcode, List<int> data, bool serverSide, bool compressed) {
|
| - bool mask = !serverSide; // Masking not implemented for server.
|
| + static Iterable createFrame(int opcode, List<int> data, bool serverSide) {
|
| + bool mask = !serverSide; // Masking not implemented for server.
|
| int dataLength = data == null ? 0 : data.length;
|
| // Determine the header size.
|
| int headerSize = (mask) ? 6 : 2;
|
| @@ -709,15 +546,11 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
|
| }
|
| Uint8List header = new Uint8List(headerSize);
|
| int index = 0;
|
| -
|
| // Set FIN and opcode.
|
| - var hoc = _WebSocketProtocolTransformer.FIN
|
| - | (compressed ? _WebSocketProtocolTransformer.RSV1 : 0)
|
| - | (opcode & _WebSocketProtocolTransformer.OPCODE);
|
| -
|
| - header[index++] = hoc;
|
| + header[index++] = 0x80 | opcode;
|
| // Determine size and position of length field.
|
| int lengthBytes = 1;
|
| + int firstLengthByte = 1;
|
| if (dataLength > 65535) {
|
| header[index++] = 127;
|
| lengthBytes = 8;
|
| @@ -747,7 +580,8 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
|
| list = new Uint8List(data.length);
|
| for (int i = 0; i < data.length; i++) {
|
| if (data[i] < 0 || 255 < data[i]) {
|
| - throw new ArgumentError("List element is not a byte value "
|
| + throw new ArgumentError(
|
| + "List element is not a byte value "
|
| "(value ${data[i]} at index $i)");
|
| }
|
| list[i] = data[i];
|
| @@ -763,8 +597,8 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
|
| mask = (mask << 8) | maskBytes[i];
|
| }
|
| Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
|
| - Int32x4List blockBuffer =
|
| - new Int32x4List.view(list.buffer, 0, blockCount);
|
| + Int32x4List blockBuffer = new Int32x4List.view(
|
| + list.buffer, 0, blockCount);
|
| for (int i = 0; i < blockBuffer.length; i++) {
|
| blockBuffer[i] ^= blockMask;
|
| }
|
| @@ -785,6 +619,7 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
|
| }
|
| }
|
|
|
| +
|
| class _WebSocketConsumer implements StreamConsumer {
|
| final _WebSocketImpl webSocket;
|
| final Socket socket;
|
| @@ -829,28 +664,28 @@ class _WebSocketConsumer implements StreamConsumer {
|
|
|
| _ensureController() {
|
| if (_controller != null) return;
|
| - _controller = new StreamController(
|
| - sync: true,
|
| - onPause: _onPause,
|
| - onResume: _onResume,
|
| - onCancel: _onListen);
|
| - var stream = _controller.stream
|
| - .transform(new _WebSocketOutgoingTransformer(webSocket));
|
| - socket.addStream(stream).then((_) {
|
| - _done();
|
| - _closeCompleter.complete(webSocket);
|
| - }, onError: (error, StackTrace stackTrace) {
|
| - _closed = true;
|
| - _cancel();
|
| - if (error is ArgumentError) {
|
| - if (!_done(error, stackTrace)) {
|
| - _closeCompleter.completeError(error, stackTrace);
|
| - }
|
| - } else {
|
| - _done();
|
| - _closeCompleter.complete(webSocket);
|
| - }
|
| - });
|
| + _controller = new StreamController(sync: true,
|
| + onPause: _onPause,
|
| + onResume: _onResume,
|
| + onCancel: _onListen);
|
| + var stream = _controller.stream.transform(
|
| + new _WebSocketOutgoingTransformer(webSocket));
|
| + socket.addStream(stream)
|
| + .then((_) {
|
| + _done();
|
| + _closeCompleter.complete(webSocket);
|
| + }, onError: (error, StackTrace stackTrace) {
|
| + _closed = true;
|
| + _cancel();
|
| + if (error is ArgumentError) {
|
| + if (!_done(error, stackTrace)) {
|
| + _closeCompleter.completeError(error, stackTrace);
|
| + }
|
| + } else {
|
| + _done();
|
| + _closeCompleter.complete(webSocket);
|
| + }
|
| + });
|
| }
|
|
|
| bool _done([error, StackTrace stackTrace]) {
|
| @@ -871,9 +706,13 @@ class _WebSocketConsumer implements StreamConsumer {
|
| }
|
| _ensureController();
|
| _completer = new Completer();
|
| - _subscription = stream.listen((data) {
|
| - _controller.add(data);
|
| - }, onDone: _done, onError: _done, cancelOnError: true);
|
| + _subscription = stream.listen(
|
| + (data) {
|
| + _controller.add(data);
|
| + },
|
| + onDone: _done,
|
| + onError: _done,
|
| + cancelOnError: true);
|
| if (_issuedPause) {
|
| _subscription.pause();
|
| _issuedPause = false;
|
| @@ -903,11 +742,10 @@ class _WebSocketConsumer implements StreamConsumer {
|
| }
|
| }
|
|
|
| +
|
| class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
|
| // Use default Map so we keep order.
|
| static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>();
|
| - static const int DEFAULT_WINDOW_BITS = 15;
|
| - static const String PER_MESSAGE_DEFLATE = "permessage-deflate";
|
|
|
| final String protocol;
|
|
|
| @@ -928,13 +766,11 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
|
| int _outCloseCode;
|
| String _outCloseReason;
|
| Timer _closeTimer;
|
| - _WebSocketPerMessageDeflate _deflate;
|
|
|
| static final HttpClient _httpClient = new HttpClient();
|
|
|
| static Future<WebSocket> connect(
|
| - String url, Iterable<String> protocols, Map<String, dynamic> headers,
|
| - {CompressionOptions compression: CompressionOptions.DEFAULT}) {
|
| + String url, Iterable<String> protocols, Map<String, dynamic> headers) {
|
| Uri uri = Uri.parse(url);
|
| if (uri.scheme != "ws" && uri.scheme != "wss") {
|
| throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'");
|
| @@ -948,182 +784,144 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
|
| }
|
| String nonce = _CryptoUtils.bytesToBase64(nonceData);
|
|
|
| - uri = new Uri(
|
| - scheme: uri.scheme == "wss" ? "https" : "http",
|
| - userInfo: uri.userInfo,
|
| - host: uri.host,
|
| - port: uri.port,
|
| - path: uri.path,
|
| - query: uri.query,
|
| - fragment: uri.fragment);
|
| - return _httpClient.openUrl("GET", uri).then((request) {
|
| - if (uri.userInfo != null && !uri.userInfo.isEmpty) {
|
| - // If the URL contains user information use that for basic
|
| - // authorization.
|
| - String auth = _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo));
|
| - request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth");
|
| - }
|
| - if (headers != null) {
|
| - headers.forEach((field, value) => request.headers.add(field, value));
|
| - }
|
| - // Setup the initial handshake.
|
| - request.headers
|
| - ..set(HttpHeaders.CONNECTION, "Upgrade")
|
| - ..set(HttpHeaders.UPGRADE, "websocket")
|
| - ..set("Sec-WebSocket-Key", nonce)
|
| - ..set("Cache-Control", "no-cache")
|
| - ..set("Sec-WebSocket-Version", "13");
|
| - if (protocols != null) {
|
| - request.headers.add("Sec-WebSocket-Protocol", protocols.toList());
|
| - }
|
| -
|
| - if (compression.enabled) {
|
| + uri = new Uri(scheme: uri.scheme == "wss" ? "https" : "http",
|
| + userInfo: uri.userInfo,
|
| + host: uri.host,
|
| + port: uri.port,
|
| + path: uri.path,
|
| + query: uri.query,
|
| + fragment: uri.fragment);
|
| + return _httpClient.openUrl("GET", uri)
|
| + .then((request) {
|
| + if (uri.userInfo != null && !uri.userInfo.isEmpty) {
|
| + // If the URL contains user information use that for basic
|
| + // authorization.
|
| + String auth =
|
| + _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo));
|
| + request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth");
|
| + }
|
| + if (headers != null) {
|
| + headers.forEach((field, value) => request.headers.add(field, value));
|
| + }
|
| + // Setup the initial handshake.
|
| request.headers
|
| - .add("Sec-WebSocket-Extensions", compression._createHeader());
|
| - }
|
| -
|
| - return request.close();
|
| - }).then((response) {
|
| - void error(String message) {
|
| - // Flush data.
|
| - response.detachSocket().then((socket) {
|
| - socket.destroy();
|
| - });
|
| - throw new WebSocketException(message);
|
| - }
|
| - if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS ||
|
| - response.headers[HttpHeaders.CONNECTION] == null ||
|
| - !response.headers[HttpHeaders.CONNECTION]
|
| - .any((value) => value.toLowerCase() == "upgrade") ||
|
| - response.headers.value(HttpHeaders.UPGRADE).toLowerCase() !=
|
| - "websocket") {
|
| - error("Connection to '$uri' was not upgraded to websocket");
|
| - }
|
| - String accept = response.headers.value("Sec-WebSocket-Accept");
|
| - if (accept == null) {
|
| - error("Response did not contain a 'Sec-WebSocket-Accept' header");
|
| - }
|
| - _SHA1 sha1 = new _SHA1();
|
| - sha1.add("$nonce$_webSocketGUID".codeUnits);
|
| - List<int> expectedAccept = sha1.close();
|
| - List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
|
| - if (expectedAccept.length != receivedAccept.length) {
|
| - error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
|
| - }
|
| - for (int i = 0; i < expectedAccept.length; i++) {
|
| - if (expectedAccept[i] != receivedAccept[i]) {
|
| - error("Bad response 'Sec-WebSocket-Accept' header");
|
| + ..set(HttpHeaders.CONNECTION, "Upgrade")
|
| + ..set(HttpHeaders.UPGRADE, "websocket")
|
| + ..set("Sec-WebSocket-Key", nonce)
|
| + ..set("Cache-Control", "no-cache")
|
| + ..set("Sec-WebSocket-Version", "13");
|
| + if (protocols != null) {
|
| + request.headers.add("Sec-WebSocket-Protocol", protocols.toList());
|
| }
|
| - }
|
| - var protocol = response.headers.value('Sec-WebSocket-Protocol');
|
| -
|
| - _WebSocketPerMessageDeflate deflate =
|
| - negotiateClientCompression(response, compression);
|
| -
|
| - return response.detachSocket().then((socket) =>
|
| - new _WebSocketImpl._fromSocket(
|
| - socket, protocol, compression, false, deflate));
|
| - });
|
| - }
|
| -
|
| - static _WebSocketPerMessageDeflate negotiateClientCompression(
|
| - HttpClientResponse response, CompressionOptions compression) {
|
| - String extensionHeader = response.headers.value('Sec-WebSocket-Extensions');
|
| -
|
| - if (extensionHeader == null) {
|
| - extensionHeader = "";
|
| - }
|
| -
|
| - var hv = HeaderValue.parse(extensionHeader, valueSeparator: ',');
|
| -
|
| - if (compression.enabled && hv.value == PER_MESSAGE_DEFLATE) {
|
| - var serverNoContextTakeover =
|
| - hv.parameters.containsKey(_serverNoContextTakeover);
|
| - var clientNoContextTakeover =
|
| - hv.parameters.containsKey(_clientNoContextTakeover);
|
| -
|
| - int getWindowBits(String type) {
|
| - var o = hv.parameters[type];
|
| - if (o == null) {
|
| - return DEFAULT_WINDOW_BITS;
|
| + return request.close();
|
| + })
|
| + .then((response) {
|
| + void error(String message) {
|
| + // Flush data.
|
| + response.detachSocket().then((socket) {
|
| + socket.destroy();
|
| + });
|
| + throw new WebSocketException(message);
|
| }
|
| -
|
| - o = int.parse(o, onError: (s) => DEFAULT_WINDOW_BITS);
|
| - return o;
|
| - }
|
| -
|
| - return new _WebSocketPerMessageDeflate(
|
| - clientMaxWindowBits: getWindowBits(_clientMaxWindowBits),
|
| - serverMaxWindowBits: getWindowBits(_serverMaxWindowBits),
|
| - clientNoContextTakeover: clientNoContextTakeover,
|
| - serverNoContextTakeover: serverNoContextTakeover);
|
| - }
|
| -
|
| - return null;
|
| + if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS ||
|
| + response.headers[HttpHeaders.CONNECTION] == null ||
|
| + !response.headers[HttpHeaders.CONNECTION].any(
|
| + (value) => value.toLowerCase() == "upgrade") ||
|
| + response.headers.value(HttpHeaders.UPGRADE).toLowerCase() !=
|
| + "websocket") {
|
| + error("Connection to '$uri' was not upgraded to websocket");
|
| + }
|
| + String accept = response.headers.value("Sec-WebSocket-Accept");
|
| + if (accept == null) {
|
| + error("Response did not contain a 'Sec-WebSocket-Accept' header");
|
| + }
|
| + _SHA1 sha1 = new _SHA1();
|
| + sha1.add("$nonce$_webSocketGUID".codeUnits);
|
| + List<int> expectedAccept = sha1.close();
|
| + List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
|
| + if (expectedAccept.length != receivedAccept.length) {
|
| + error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
|
| + }
|
| + for (int i = 0; i < expectedAccept.length; i++) {
|
| + if (expectedAccept[i] != receivedAccept[i]) {
|
| + error("Bad response 'Sec-WebSocket-Accept' header");
|
| + }
|
| + }
|
| + var protocol = response.headers.value('Sec-WebSocket-Protocol');
|
| + return response.detachSocket()
|
| + .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol));
|
| + });
|
| }
|
|
|
| - _WebSocketImpl._fromSocket(
|
| - this._socket, this.protocol, CompressionOptions compression,
|
| - [this._serverSide = false, _WebSocketPerMessageDeflate deflate]) {
|
| + _WebSocketImpl._fromSocket(this._socket, this.protocol,
|
| + [this._serverSide = false]) {
|
| _consumer = new _WebSocketConsumer(this, _socket);
|
| _sink = new _StreamSinkImpl(_consumer);
|
| _readyState = WebSocket.OPEN;
|
| - _deflate = deflate;
|
| -
|
| - var transformer = new _WebSocketProtocolTransformer(_serverSide, _deflate);
|
| - _subscription = _socket.transform(transformer).listen((data) {
|
| - if (data is _WebSocketPing) {
|
| - if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
|
| - } else if (data is _WebSocketPong) {
|
| - // Simply set pingInterval, as it'll cancel any timers.
|
| - pingInterval = _pingInterval;
|
| - } else {
|
| - _controller.add(data);
|
| - }
|
| - }, onError: (error, stackTrace) {
|
| - if (_closeTimer != null) _closeTimer.cancel();
|
| - if (error is FormatException) {
|
| - _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
|
| - } else {
|
| - _close(WebSocketStatus.PROTOCOL_ERROR);
|
| - }
|
| - // An error happened, set the close code set above.
|
| - _closeCode = _outCloseCode;
|
| - _closeReason = _outCloseReason;
|
| - _controller.close();
|
| - }, onDone: () {
|
| - if (_closeTimer != null) _closeTimer.cancel();
|
| - if (_readyState == WebSocket.OPEN) {
|
| - _readyState = WebSocket.CLOSING;
|
| - if (!_isReservedStatusCode(transformer.closeCode)) {
|
| - _close(transformer.closeCode, transformer.closeReason);
|
| - } else {
|
| - _close();
|
| - }
|
| - _readyState = WebSocket.CLOSED;
|
| - }
|
| - // Protocol close, use close code from transformer.
|
| - _closeCode = transformer.closeCode;
|
| - _closeReason = transformer.closeReason;
|
| - _controller.close();
|
| - }, cancelOnError: true);
|
| +
|
| + var transformer = new _WebSocketProtocolTransformer(_serverSide);
|
| + _subscription = _socket.transform(transformer).listen(
|
| + (data) {
|
| + if (data is _WebSocketPing) {
|
| + if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
|
| + } else if (data is _WebSocketPong) {
|
| + // Simply set pingInterval, as it'll cancel any timers.
|
| + pingInterval = _pingInterval;
|
| + } else {
|
| + _controller.add(data);
|
| + }
|
| + },
|
| + onError: (error) {
|
| + if (_closeTimer != null) _closeTimer.cancel();
|
| + if (error is FormatException) {
|
| + _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
|
| + } else {
|
| + _close(WebSocketStatus.PROTOCOL_ERROR);
|
| + }
|
| + // An error happened, set the close code set above.
|
| + _closeCode = _outCloseCode;
|
| + _closeReason = _outCloseReason;
|
| + _controller.close();
|
| + },
|
| + onDone: () {
|
| + if (_closeTimer != null) _closeTimer.cancel();
|
| + if (_readyState == WebSocket.OPEN) {
|
| + _readyState = WebSocket.CLOSING;
|
| + if (!_isReservedStatusCode(transformer.closeCode)) {
|
| + _close(transformer.closeCode, transformer.closeReason);
|
| + } else {
|
| + _close();
|
| + }
|
| + _readyState = WebSocket.CLOSED;
|
| + }
|
| + // Protocol close, use close code from transformer.
|
| + _closeCode = transformer.closeCode;
|
| + _closeReason = transformer.closeReason;
|
| + _controller.close();
|
| + },
|
| + cancelOnError: true);
|
| _subscription.pause();
|
| - _controller = new StreamController(
|
| - sync: true, onListen: _subscription.resume, onCancel: () {
|
| - _subscription.cancel();
|
| - _subscription = null;
|
| - }, onPause: _subscription.pause, onResume: _subscription.resume);
|
| + _controller = new StreamController(sync: true,
|
| + onListen: _subscription.resume,
|
| + onCancel: () {
|
| + _subscription.cancel();
|
| + _subscription = null;
|
| + },
|
| + onPause: _subscription.pause,
|
| + onResume: _subscription.resume);
|
|
|
| _webSockets[_serviceId] = this;
|
| - try {
|
| - _socket._owner = this;
|
| - } catch (_) {}
|
| + try { _socket._owner = this; } catch (_) {}
|
| }
|
|
|
| StreamSubscription listen(void onData(message),
|
| - {Function onError, void onDone(), bool cancelOnError}) {
|
| + {Function onError,
|
| + void onDone(),
|
| + bool cancelOnError}) {
|
| return _controller.stream.listen(onData,
|
| - onError: onError, onDone: onDone, cancelOnError: cancelOnError);
|
| + onError: onError,
|
| + onDone: onDone,
|
| + cancelOnError: cancelOnError);
|
| }
|
|
|
| Duration get pingInterval => _pingInterval;
|
| @@ -1229,12 +1027,13 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
|
|
|
| static bool _isReservedStatusCode(int code) {
|
| return code != null &&
|
| - (code < WebSocketStatus.NORMAL_CLOSURE ||
|
| + (code < WebSocketStatus.NORMAL_CLOSURE ||
|
| code == WebSocketStatus.RESERVED_1004 ||
|
| code == WebSocketStatus.NO_STATUS_RECEIVED ||
|
| code == WebSocketStatus.ABNORMAL_CLOSURE ||
|
| (code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
|
| - code < WebSocketStatus.RESERVED_1015) ||
|
| - (code >= WebSocketStatus.RESERVED_1015 && code < 3000));
|
| + code < WebSocketStatus.RESERVED_1015) ||
|
| + (code >= WebSocketStatus.RESERVED_1015 &&
|
| + code < 3000));
|
| }
|
| }
|
|
|