Index: mojo/public/dart/src/event_stream.dart |
diff --git a/mojo/public/dart/src/event_stream.dart b/mojo/public/dart/src/event_stream.dart |
index bbdb351ebbd51e6b8d2b19ed83b2e7c4c28d11c4..645334e835ac2460fc7389acd84814f99dc0252a 100644 |
--- a/mojo/public/dart/src/event_stream.dart |
+++ b/mojo/public/dart/src/event_stream.dart |
@@ -37,18 +37,14 @@ class MojoEventStream extends Stream<List<int>> { |
} |
} |
- void close() { |
+ Future close() { |
if (_handle != null) { |
if (_isListening) { |
- MojoHandleWatcher.close(_handle); |
+ return _handleWatcherClose(); |
} else { |
- _handle.close(); |
+ _localClose(); |
+ return new Future.value(null); |
} |
- _handle = null; |
- } |
- if (_receivePort != null) { |
- _receivePort.close(); |
- _receivePort = null; |
} |
} |
@@ -94,6 +90,26 @@ class MojoEventStream extends Stream<List<int>> { |
void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE); |
void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE); |
+ Future _handleWatcherClose() { |
+ assert(_handle != null); |
+ return MojoHandleWatcher.close(_handle, wait: true).then((_) { |
+ if (_receivePort != null) { |
+ _receivePort.close(); |
+ _receivePort = null; |
+ } |
+ }); |
+ } |
+ |
+ void _localClose() { |
+ assert(_handle != null); |
+ _handle.close(); |
+ _handle = null; |
+ if (_receivePort != null) { |
+ _receivePort.close(); |
+ _receivePort = null; |
+ } |
+ } |
+ |
void _onSubscriptionStateChange() { |
if (!_controller.hasListener) { |
close(); |
@@ -167,6 +183,12 @@ class MojoEventStreamListener { |
assert(isBound && (subscription == null)); |
_isOpen = true; |
subscription = _eventStream.listen((List<int> event) { |
+ if (!_isOpen) { |
+ // The actual close of the underlying stream happens asynchronously |
+ // after the call to close. However, we start to ignore incoming events |
+ // immediately. |
+ return; |
+ } |
var signalsWatched = new MojoHandleSignals(event[0]); |
var signalsReceived = new MojoHandleSignals(event[1]); |
_isInHandler = true; |
@@ -183,25 +205,26 @@ class MojoEventStreamListener { |
} |
_isInHandler = false; |
if (signalsReceived.isPeerClosed) { |
- if (onError != null) onError(); |
+ if (onError != null) { |
+ onError(); |
+ } |
close(); |
- // The peer being closed obviates any other signal we might |
- // have received since we won't be able to read or write the handle. |
- // Thus, we just return before invoking other handlers. |
- return; |
} |
}, onDone: close); |
return subscription; |
} |
- void close() { |
+ Future close() { |
+ var result; |
_isOpen = false; |
_endpoint = null; |
subscription = null; |
if (_eventStream != null) { |
- _eventStream.close(); |
- _eventStream = null; |
+ result = _eventStream.close().then((_) { |
+ _eventStream = null; |
+ }); |
} |
+ return result != null ? result : new Future.value(null); |
} |
void handleRead() {} |