| Index: sdk/lib/io/websocket_impl.dart
|
| diff --git a/sdk/lib/io/websocket_impl.dart b/sdk/lib/io/websocket_impl.dart
|
| index b13709caa51b84465eabc57361cb077171f787e0..e2bc326271ba9fb338eb71ca11b222b1a2266484 100644
|
| --- a/sdk/lib/io/websocket_impl.dart
|
| +++ b/sdk/lib/io/websocket_impl.dart
|
| @@ -572,9 +572,7 @@ class _WebSocketConsumer implements StreamConsumer {
|
| StreamController _controller;
|
| StreamSubscription _subscription;
|
| bool _issuedPause = false;
|
| - // Only report error if the last message was a user-provided message and not a
|
| - // ping or pong message.
|
| - bool _reportError = false;
|
| + bool _closed = false;
|
| Completer _closeCompleter = new Completer();
|
| Completer _completer;
|
|
|
| @@ -602,6 +600,14 @@ class _WebSocketConsumer implements StreamConsumer {
|
| }
|
| }
|
|
|
| + void _cancel() {
|
| + if (_subscription != null) {
|
| + var subscription = _subscription;
|
| + _subscription = null;
|
| + subscription.cancel();
|
| + }
|
| + }
|
| +
|
| _ensureController() {
|
| if (_controller != null) return;
|
| _controller = new StreamController(sync: true,
|
| @@ -612,19 +618,20 @@ class _WebSocketConsumer implements StreamConsumer {
|
| new _WebSocketOutgoingTransformer(webSocket));
|
| socket.addStream(stream)
|
| .then((_) {
|
| - _done();
|
| - _closeCompleter.complete(webSocket);
|
| - },
|
| - onError: (error) {
|
| - if (_reportError) {
|
| - if (!_done(error)) {
|
| - _closeCompleter.completeError(error);
|
| - }
|
| - } else {
|
| - _done();
|
| - _closeCompleter.complete(webSocket);
|
| - }
|
| - });
|
| + _done();
|
| + _closeCompleter.complete(webSocket);
|
| + }, onError: (error) {
|
| + _closed = true;
|
| + _cancel();
|
| + if (error is ArgumentError) {
|
| + if (!_done(error)) {
|
| + _closeCompleter.completeError(error);
|
| + }
|
| + } else {
|
| + _done();
|
| + _closeCompleter.complete(webSocket);
|
| + }
|
| + });
|
| }
|
|
|
| bool _done([error]) {
|
| @@ -639,11 +646,14 @@ class _WebSocketConsumer implements StreamConsumer {
|
| }
|
|
|
| Future addStream(var stream) {
|
| + if (_closed) {
|
| + stream.listen(null).cancel();
|
| + return new Future.value(webSocket);
|
| + }
|
| _ensureController();
|
| _completer = new Completer();
|
| _subscription = stream.listen(
|
| (data) {
|
| - _reportError = true;
|
| _controller.add(data);
|
| },
|
| onDone: () {
|
| @@ -663,17 +673,23 @@ class _WebSocketConsumer implements StreamConsumer {
|
| Future close() {
|
| _ensureController();
|
| Future closeSocket() {
|
| - return socket.close().then((_) => webSocket);
|
| + return socket.close().catchError((_) {}).then((_) => webSocket);
|
| }
|
| _controller.close();
|
| return _closeCompleter.future.then((_) => closeSocket());
|
| }
|
|
|
| void add(data) {
|
| + if (_closed) return;
|
| _ensureController();
|
| - _reportError = false;
|
| _controller.add(data);
|
| }
|
| +
|
| + void closeSocket() {
|
| + _closed = true;
|
| + _cancel();
|
| + close();
|
| + }
|
| }
|
|
|
|
|
| @@ -786,21 +802,20 @@ class _WebSocketImpl extends Stream implements WebSocket {
|
| }
|
| },
|
| onError: (error) {
|
| - if (error is ArgumentError) {
|
| - close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
|
| + if (error is FormatException) {
|
| + _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
|
| } else {
|
| - close(WebSocketStatus.PROTOCOL_ERROR);
|
| + _close(WebSocketStatus.PROTOCOL_ERROR);
|
| }
|
| - _controller.addError(error);
|
| _controller.close();
|
| },
|
| onDone: () {
|
| if (_readyState == WebSocket.OPEN) {
|
| _readyState = WebSocket.CLOSING;
|
| if (!_isReservedStatusCode(transformer.closeCode)) {
|
| - close(transformer.closeCode);
|
| + _close(transformer.closeCode);
|
| } else {
|
| - close();
|
| + _close();
|
| }
|
| _readyState = WebSocket.CLOSED;
|
| }
|
| @@ -840,7 +855,7 @@ class _WebSocketImpl extends Stream implements WebSocket {
|
| _consumer.add(new _WebSocketPing());
|
| _pingTimer = new Timer(_pingInterval, () {
|
| // No pong received.
|
| - close(WebSocketStatus.GOING_AWAY);
|
| + _close(WebSocketStatus.GOING_AWAY);
|
| });
|
| });
|
| }
|
| @@ -858,16 +873,24 @@ class _WebSocketImpl extends Stream implements WebSocket {
|
| Future get done => _sink.done;
|
|
|
| Future close([int code, String reason]) {
|
| - if (!_writeClosed) {
|
| - if (_isReservedStatusCode(code)) {
|
| - throw new WebSocketException("Reserved status code $code");
|
| - }
|
| + if (_isReservedStatusCode(code)) {
|
| + throw new WebSocketException("Reserved status code $code");
|
| + }
|
| + if (_outCloseCode == null) {
|
| + _outCloseCode = code;
|
| + _outCloseReason = reason;
|
| + }
|
| + return _sink.close();
|
| + }
|
| +
|
| + void _close([int code, String reason]) {
|
| + if (_writeClosed) return;
|
| + if (_outCloseCode == null) {
|
| _outCloseCode = code;
|
| _outCloseReason = reason;
|
| - _writeClosed = true;
|
| }
|
| - if (!(_sink as _StreamSinkImpl)._isBound) _sink.close();
|
| - return _sink.done;
|
| + _writeClosed = true;
|
| + _consumer.closeSocket();
|
| }
|
|
|
| static bool _isReservedStatusCode(int code) {
|
|
|