Index: sdk/lib/io/websocket_impl.dart |
diff --git a/sdk/lib/io/websocket_impl.dart b/sdk/lib/io/websocket_impl.dart |
index b13709caa51b84465eabc57361cb077171f787e0..e2bc326271ba9fb338eb71ca11b222b1a2266484 100644 |
--- a/sdk/lib/io/websocket_impl.dart |
+++ b/sdk/lib/io/websocket_impl.dart |
@@ -572,9 +572,7 @@ class _WebSocketConsumer implements StreamConsumer { |
StreamController _controller; |
StreamSubscription _subscription; |
bool _issuedPause = false; |
- // Only report error if the last message was a user-provided message and not a |
- // ping or pong message. |
- bool _reportError = false; |
+ bool _closed = false; |
Completer _closeCompleter = new Completer(); |
Completer _completer; |
@@ -602,6 +600,14 @@ class _WebSocketConsumer implements StreamConsumer { |
} |
} |
+ void _cancel() { |
+ if (_subscription != null) { |
+ var subscription = _subscription; |
+ _subscription = null; |
+ subscription.cancel(); |
+ } |
+ } |
+ |
_ensureController() { |
if (_controller != null) return; |
_controller = new StreamController(sync: true, |
@@ -612,19 +618,20 @@ class _WebSocketConsumer implements StreamConsumer { |
new _WebSocketOutgoingTransformer(webSocket)); |
socket.addStream(stream) |
.then((_) { |
- _done(); |
- _closeCompleter.complete(webSocket); |
- }, |
- onError: (error) { |
- if (_reportError) { |
- if (!_done(error)) { |
- _closeCompleter.completeError(error); |
- } |
- } else { |
- _done(); |
- _closeCompleter.complete(webSocket); |
- } |
- }); |
+ _done(); |
+ _closeCompleter.complete(webSocket); |
+ }, onError: (error) { |
+ _closed = true; |
+ _cancel(); |
+ if (error is ArgumentError) { |
+ if (!_done(error)) { |
+ _closeCompleter.completeError(error); |
+ } |
+ } else { |
+ _done(); |
+ _closeCompleter.complete(webSocket); |
+ } |
+ }); |
} |
bool _done([error]) { |
@@ -639,11 +646,14 @@ class _WebSocketConsumer implements StreamConsumer { |
} |
Future addStream(var stream) { |
+ if (_closed) { |
+ stream.listen(null).cancel(); |
+ return new Future.value(webSocket); |
+ } |
_ensureController(); |
_completer = new Completer(); |
_subscription = stream.listen( |
(data) { |
- _reportError = true; |
_controller.add(data); |
}, |
onDone: () { |
@@ -663,17 +673,23 @@ class _WebSocketConsumer implements StreamConsumer { |
Future close() { |
_ensureController(); |
Future closeSocket() { |
- return socket.close().then((_) => webSocket); |
+ return socket.close().catchError((_) {}).then((_) => webSocket); |
} |
_controller.close(); |
return _closeCompleter.future.then((_) => closeSocket()); |
} |
void add(data) { |
+ if (_closed) return; |
_ensureController(); |
- _reportError = false; |
_controller.add(data); |
} |
+ |
+ void closeSocket() { |
+ _closed = true; |
+ _cancel(); |
+ close(); |
+ } |
} |
@@ -786,21 +802,20 @@ class _WebSocketImpl extends Stream implements WebSocket { |
} |
}, |
onError: (error) { |
- if (error is ArgumentError) { |
- close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); |
+ if (error is FormatException) { |
+ _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); |
} else { |
- close(WebSocketStatus.PROTOCOL_ERROR); |
+ _close(WebSocketStatus.PROTOCOL_ERROR); |
} |
- _controller.addError(error); |
_controller.close(); |
}, |
onDone: () { |
if (_readyState == WebSocket.OPEN) { |
_readyState = WebSocket.CLOSING; |
if (!_isReservedStatusCode(transformer.closeCode)) { |
- close(transformer.closeCode); |
+ _close(transformer.closeCode); |
} else { |
- close(); |
+ _close(); |
} |
_readyState = WebSocket.CLOSED; |
} |
@@ -840,7 +855,7 @@ class _WebSocketImpl extends Stream implements WebSocket { |
_consumer.add(new _WebSocketPing()); |
_pingTimer = new Timer(_pingInterval, () { |
// No pong received. |
- close(WebSocketStatus.GOING_AWAY); |
+ _close(WebSocketStatus.GOING_AWAY); |
}); |
}); |
} |
@@ -858,16 +873,24 @@ class _WebSocketImpl extends Stream implements WebSocket { |
Future get done => _sink.done; |
Future close([int code, String reason]) { |
- if (!_writeClosed) { |
- if (_isReservedStatusCode(code)) { |
- throw new WebSocketException("Reserved status code $code"); |
- } |
+ if (_isReservedStatusCode(code)) { |
+ throw new WebSocketException("Reserved status code $code"); |
+ } |
+ if (_outCloseCode == null) { |
+ _outCloseCode = code; |
+ _outCloseReason = reason; |
+ } |
+ return _sink.close(); |
+ } |
+ |
+ void _close([int code, String reason]) { |
+ if (_writeClosed) return; |
+ if (_outCloseCode == null) { |
_outCloseCode = code; |
_outCloseReason = reason; |
- _writeClosed = true; |
} |
- if (!(_sink as _StreamSinkImpl)._isBound) _sink.close(); |
- return _sink.done; |
+ _writeClosed = true; |
+ _consumer.closeSocket(); |
} |
static bool _isReservedStatusCode(int code) { |