| Index: third_party/mojo/src/mojo/public/dart/src/event_stream.dart
|
| diff --git a/third_party/mojo/src/mojo/public/dart/src/event_stream.dart b/third_party/mojo/src/mojo/public/dart/src/event_stream.dart
|
| index 8f022acc93ffbb4b1b18f6d580b64cf15fae348e..645334e835ac2460fc7389acd84814f99dc0252a 100644
|
| --- a/third_party/mojo/src/mojo/public/dart/src/event_stream.dart
|
| +++ b/third_party/mojo/src/mojo/public/dart/src/event_stream.dart
|
| @@ -26,8 +26,8 @@ class MojoEventStream extends Stream<List<int>> {
|
| // Whether listen has been called.
|
| bool _isListening;
|
|
|
| - MojoEventStream(MojoHandle handle, [MojoHandleSignals signals =
|
| - MojoHandleSignals.PEER_CLOSED_READABLE])
|
| + MojoEventStream(MojoHandle handle,
|
| + [MojoHandleSignals signals = MojoHandleSignals.PEER_CLOSED_READABLE])
|
| : _handle = handle,
|
| _signals = signals,
|
| _isListening = false {
|
| @@ -37,18 +37,14 @@ class MojoEventStream extends Stream<List<int>> {
|
| }
|
| }
|
|
|
| - void close() {
|
| + Future close() {
|
| if (_handle != null) {
|
| if (_isListening) {
|
| - MojoHandleWatcher.close(_handle);
|
| + return _handleWatcherClose();
|
| } else {
|
| - _handle.close();
|
| + _localClose();
|
| + return new Future.value(null);
|
| }
|
| - _handle = null;
|
| - }
|
| - if (_receivePort != null) {
|
| - _receivePort.close();
|
| - _receivePort = null;
|
| }
|
| }
|
|
|
| @@ -75,11 +71,8 @@ class MojoEventStream extends Stream<List<int>> {
|
| }
|
|
|
| _isListening = true;
|
| - return _controller.stream.listen(
|
| - onData,
|
| - onError: onError,
|
| - onDone: onDone,
|
| - cancelOnError: cancelOnError);
|
| + return _controller.stream.listen(onData,
|
| + onError: onError, onDone: onDone, cancelOnError: cancelOnError);
|
| }
|
|
|
| void enableSignals(MojoHandleSignals signals) {
|
| @@ -97,6 +90,26 @@ class MojoEventStream extends Stream<List<int>> {
|
| void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE);
|
| void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE);
|
|
|
| + Future _handleWatcherClose() {
|
| + assert(_handle != null);
|
| + return MojoHandleWatcher.close(_handle, wait: true).then((_) {
|
| + if (_receivePort != null) {
|
| + _receivePort.close();
|
| + _receivePort = null;
|
| + }
|
| + });
|
| + }
|
| +
|
| + void _localClose() {
|
| + assert(_handle != null);
|
| + _handle.close();
|
| + _handle = null;
|
| + if (_receivePort != null) {
|
| + _receivePort.close();
|
| + _receivePort = null;
|
| + }
|
| + }
|
| +
|
| void _onSubscriptionStateChange() {
|
| if (!_controller.hasListener) {
|
| close();
|
| @@ -123,25 +136,28 @@ class MojoEventStream extends Stream<List<int>> {
|
| String toString() => "$_handle";
|
| }
|
|
|
| -abstract class Listener {
|
| - StreamSubscription<List<int>> listen({Function onClosed});
|
| -}
|
| +typedef void ErrorHandler();
|
|
|
| class MojoEventStreamListener {
|
| MojoMessagePipeEndpoint _endpoint;
|
| MojoEventStream _eventStream;
|
| bool _isOpen = false;
|
| bool _isInHandler = false;
|
| + StreamSubscription subscription;
|
| + ErrorHandler onError;
|
|
|
| - MojoEventStreamListener(MojoMessagePipeEndpoint endpoint)
|
| + MojoEventStreamListener.fromEndpoint(MojoMessagePipeEndpoint endpoint)
|
| : _endpoint = endpoint,
|
| _eventStream = new MojoEventStream(endpoint.handle),
|
| - _isOpen = false;
|
| + _isOpen = false {
|
| + listen();
|
| + }
|
|
|
| MojoEventStreamListener.fromHandle(MojoHandle handle) {
|
| _endpoint = new MojoMessagePipeEndpoint(handle);
|
| _eventStream = new MojoEventStream(handle);
|
| _isOpen = false;
|
| + listen();
|
| }
|
|
|
| MojoEventStreamListener.unbound()
|
| @@ -163,19 +179,18 @@ class MojoEventStreamListener {
|
| _isOpen = false;
|
| }
|
|
|
| - StreamSubscription<List<int>> listen({Function onClosed}) {
|
| + StreamSubscription<List<int>> listen() {
|
| + assert(isBound && (subscription == null));
|
| _isOpen = true;
|
| - return _eventStream.listen((List<int> event) {
|
| - var signalsWatched = new MojoHandleSignals(event[0]);
|
| - var signalsReceived = new MojoHandleSignals(event[1]);
|
| - if (signalsReceived.isPeerClosed) {
|
| - if (onClosed != null) onClosed();
|
| - close();
|
| - // The peer being closed obviates any other signal we might
|
| - // have received since we won't be able to read or write the handle.
|
| - // Thus, we just return before invoking other handlers.
|
| + subscription = _eventStream.listen((List<int> event) {
|
| + if (!_isOpen) {
|
| + // The actual close of the underlying stream happens asynchronously
|
| + // after the call to close. However, we start to ignore incoming events
|
| + // immediately.
|
| return;
|
| }
|
| + var signalsWatched = new MojoHandleSignals(event[0]);
|
| + var signalsReceived = new MojoHandleSignals(event[1]);
|
| _isInHandler = true;
|
| if (signalsReceived.isReadable) {
|
| assert(_eventStream.readyRead);
|
| @@ -189,16 +204,27 @@ class MojoEventStreamListener {
|
| _eventStream.enableSignals(signalsWatched);
|
| }
|
| _isInHandler = false;
|
| + if (signalsReceived.isPeerClosed) {
|
| + if (onError != null) {
|
| + onError();
|
| + }
|
| + close();
|
| + }
|
| }, onDone: close);
|
| + return subscription;
|
| }
|
|
|
| - void close() {
|
| + Future close() {
|
| + var result;
|
| _isOpen = false;
|
| _endpoint = null;
|
| + subscription = null;
|
| if (_eventStream != null) {
|
| - _eventStream.close();
|
| - _eventStream = null;
|
| + result = _eventStream.close().then((_) {
|
| + _eventStream = null;
|
| + });
|
| }
|
| + return result != null ? result : new Future.value(null);
|
| }
|
|
|
| void handleRead() {}
|
| @@ -208,4 +234,7 @@ class MojoEventStreamListener {
|
| bool get isOpen => _isOpen;
|
| bool get isInHandler => _isInHandler;
|
| bool get isBound => _endpoint != null;
|
| +
|
| + String toString() => "MojoEventStreamListener("
|
| + "isOpen: $isOpen, isBound: $isBound, endpoint: $_endpoint)";
|
| }
|
|
|