Index: third_party/mojo/src/mojo/public/dart/src/event_stream.dart |
diff --git a/third_party/mojo/src/mojo/public/dart/src/event_stream.dart b/third_party/mojo/src/mojo/public/dart/src/event_stream.dart |
index 8f022acc93ffbb4b1b18f6d580b64cf15fae348e..645334e835ac2460fc7389acd84814f99dc0252a 100644 |
--- a/third_party/mojo/src/mojo/public/dart/src/event_stream.dart |
+++ b/third_party/mojo/src/mojo/public/dart/src/event_stream.dart |
@@ -26,8 +26,8 @@ class MojoEventStream extends Stream<List<int>> { |
// Whether listen has been called. |
bool _isListening; |
- MojoEventStream(MojoHandle handle, [MojoHandleSignals signals = |
- MojoHandleSignals.PEER_CLOSED_READABLE]) |
+ MojoEventStream(MojoHandle handle, |
+ [MojoHandleSignals signals = MojoHandleSignals.PEER_CLOSED_READABLE]) |
: _handle = handle, |
_signals = signals, |
_isListening = false { |
@@ -37,18 +37,14 @@ class MojoEventStream extends Stream<List<int>> { |
} |
} |
- void close() { |
+ Future close() { |
if (_handle != null) { |
if (_isListening) { |
- MojoHandleWatcher.close(_handle); |
+ return _handleWatcherClose(); |
} else { |
- _handle.close(); |
+ _localClose(); |
+ return new Future.value(null); |
} |
- _handle = null; |
- } |
- if (_receivePort != null) { |
- _receivePort.close(); |
- _receivePort = null; |
} |
} |
@@ -75,11 +71,8 @@ class MojoEventStream extends Stream<List<int>> { |
} |
_isListening = true; |
- return _controller.stream.listen( |
- onData, |
- onError: onError, |
- onDone: onDone, |
- cancelOnError: cancelOnError); |
+ return _controller.stream.listen(onData, |
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
} |
void enableSignals(MojoHandleSignals signals) { |
@@ -97,6 +90,26 @@ class MojoEventStream extends Stream<List<int>> { |
void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE); |
void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE); |
+ Future _handleWatcherClose() { |
+ assert(_handle != null); |
+ return MojoHandleWatcher.close(_handle, wait: true).then((_) { |
+ if (_receivePort != null) { |
+ _receivePort.close(); |
+ _receivePort = null; |
+ } |
+ }); |
+ } |
+ |
+ void _localClose() { |
+ assert(_handle != null); |
+ _handle.close(); |
+ _handle = null; |
+ if (_receivePort != null) { |
+ _receivePort.close(); |
+ _receivePort = null; |
+ } |
+ } |
+ |
void _onSubscriptionStateChange() { |
if (!_controller.hasListener) { |
close(); |
@@ -123,25 +136,28 @@ class MojoEventStream extends Stream<List<int>> { |
String toString() => "$_handle"; |
} |
-abstract class Listener { |
- StreamSubscription<List<int>> listen({Function onClosed}); |
-} |
+typedef void ErrorHandler(); |
class MojoEventStreamListener { |
MojoMessagePipeEndpoint _endpoint; |
MojoEventStream _eventStream; |
bool _isOpen = false; |
bool _isInHandler = false; |
+ StreamSubscription subscription; |
+ ErrorHandler onError; |
- MojoEventStreamListener(MojoMessagePipeEndpoint endpoint) |
+ MojoEventStreamListener.fromEndpoint(MojoMessagePipeEndpoint endpoint) |
: _endpoint = endpoint, |
_eventStream = new MojoEventStream(endpoint.handle), |
- _isOpen = false; |
+ _isOpen = false { |
+ listen(); |
+ } |
MojoEventStreamListener.fromHandle(MojoHandle handle) { |
_endpoint = new MojoMessagePipeEndpoint(handle); |
_eventStream = new MojoEventStream(handle); |
_isOpen = false; |
+ listen(); |
} |
MojoEventStreamListener.unbound() |
@@ -163,19 +179,18 @@ class MojoEventStreamListener { |
_isOpen = false; |
} |
- StreamSubscription<List<int>> listen({Function onClosed}) { |
+ StreamSubscription<List<int>> listen() { |
+ assert(isBound && (subscription == null)); |
_isOpen = true; |
- return _eventStream.listen((List<int> event) { |
- var signalsWatched = new MojoHandleSignals(event[0]); |
- var signalsReceived = new MojoHandleSignals(event[1]); |
- if (signalsReceived.isPeerClosed) { |
- if (onClosed != null) onClosed(); |
- close(); |
- // The peer being closed obviates any other signal we might |
- // have received since we won't be able to read or write the handle. |
- // Thus, we just return before invoking other handlers. |
+ subscription = _eventStream.listen((List<int> event) { |
+ if (!_isOpen) { |
+ // The actual close of the underlying stream happens asynchronously |
+ // after the call to close. However, we start to ignore incoming events |
+ // immediately. |
return; |
} |
+ var signalsWatched = new MojoHandleSignals(event[0]); |
+ var signalsReceived = new MojoHandleSignals(event[1]); |
_isInHandler = true; |
if (signalsReceived.isReadable) { |
assert(_eventStream.readyRead); |
@@ -189,16 +204,27 @@ class MojoEventStreamListener { |
_eventStream.enableSignals(signalsWatched); |
} |
_isInHandler = false; |
+ if (signalsReceived.isPeerClosed) { |
+ if (onError != null) { |
+ onError(); |
+ } |
+ close(); |
+ } |
}, onDone: close); |
+ return subscription; |
} |
- void close() { |
+ Future close() { |
+ var result; |
_isOpen = false; |
_endpoint = null; |
+ subscription = null; |
if (_eventStream != null) { |
- _eventStream.close(); |
- _eventStream = null; |
+ result = _eventStream.close().then((_) { |
+ _eventStream = null; |
+ }); |
} |
+ return result != null ? result : new Future.value(null); |
} |
void handleRead() {} |
@@ -208,4 +234,7 @@ class MojoEventStreamListener { |
bool get isOpen => _isOpen; |
bool get isInHandler => _isInHandler; |
bool get isBound => _endpoint != null; |
+ |
+ String toString() => "MojoEventStreamListener(" |
+ "isOpen: $isOpen, isBound: $isBound, endpoint: $_endpoint)"; |
} |