| Index: mojo/public/dart/src/event_stream.dart
|
| diff --git a/mojo/public/dart/src/event_stream.dart b/mojo/public/dart/src/event_stream.dart
|
| index bbdb351ebbd51e6b8d2b19ed83b2e7c4c28d11c4..645334e835ac2460fc7389acd84814f99dc0252a 100644
|
| --- a/mojo/public/dart/src/event_stream.dart
|
| +++ b/mojo/public/dart/src/event_stream.dart
|
| @@ -37,18 +37,14 @@ class MojoEventStream extends Stream<List<int>> {
|
| }
|
| }
|
|
|
| - void close() {
|
| + Future close() {
|
| if (_handle != null) {
|
| if (_isListening) {
|
| - MojoHandleWatcher.close(_handle);
|
| + return _handleWatcherClose();
|
| } else {
|
| - _handle.close();
|
| + _localClose();
|
| + return new Future.value(null);
|
| }
|
| - _handle = null;
|
| - }
|
| - if (_receivePort != null) {
|
| - _receivePort.close();
|
| - _receivePort = null;
|
| }
|
| }
|
|
|
| @@ -94,6 +90,26 @@ class MojoEventStream extends Stream<List<int>> {
|
| void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE);
|
| void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE);
|
|
|
| + Future _handleWatcherClose() {
|
| + assert(_handle != null);
|
| + return MojoHandleWatcher.close(_handle, wait: true).then((_) {
|
| + if (_receivePort != null) {
|
| + _receivePort.close();
|
| + _receivePort = null;
|
| + }
|
| + });
|
| + }
|
| +
|
| + void _localClose() {
|
| + assert(_handle != null);
|
| + _handle.close();
|
| + _handle = null;
|
| + if (_receivePort != null) {
|
| + _receivePort.close();
|
| + _receivePort = null;
|
| + }
|
| + }
|
| +
|
| void _onSubscriptionStateChange() {
|
| if (!_controller.hasListener) {
|
| close();
|
| @@ -167,6 +183,12 @@ class MojoEventStreamListener {
|
| assert(isBound && (subscription == null));
|
| _isOpen = true;
|
| subscription = _eventStream.listen((List<int> event) {
|
| + if (!_isOpen) {
|
| + // The actual close of the underlying stream happens asynchronously
|
| + // after the call to close. However, we start to ignore incoming events
|
| + // immediately.
|
| + return;
|
| + }
|
| var signalsWatched = new MojoHandleSignals(event[0]);
|
| var signalsReceived = new MojoHandleSignals(event[1]);
|
| _isInHandler = true;
|
| @@ -183,25 +205,26 @@ class MojoEventStreamListener {
|
| }
|
| _isInHandler = false;
|
| if (signalsReceived.isPeerClosed) {
|
| - if (onError != null) onError();
|
| + if (onError != null) {
|
| + onError();
|
| + }
|
| close();
|
| - // The peer being closed obviates any other signal we might
|
| - // have received since we won't be able to read or write the handle.
|
| - // Thus, we just return before invoking other handlers.
|
| - return;
|
| }
|
| }, onDone: close);
|
| return subscription;
|
| }
|
|
|
| - void close() {
|
| + Future close() {
|
| + var result;
|
| _isOpen = false;
|
| _endpoint = null;
|
| subscription = null;
|
| if (_eventStream != null) {
|
| - _eventStream.close();
|
| - _eventStream = null;
|
| + result = _eventStream.close().then((_) {
|
| + _eventStream = null;
|
| + });
|
| }
|
| + return result != null ? result : new Future.value(null);
|
| }
|
|
|
| void handleRead() {}
|
|
|