| Index: third_party/mojo/src/mojo/public/dart/src/event_stream.dart
|
| diff --git a/third_party/mojo/src/mojo/public/dart/src/event_stream.dart b/third_party/mojo/src/mojo/public/dart/src/event_stream.dart
|
| index 610285a358d3eed64f1e5f6c4d236a1b0bb203c6..8f022acc93ffbb4b1b18f6d580b64cf15fae348e 100644
|
| --- a/third_party/mojo/src/mojo/public/dart/src/event_stream.dart
|
| +++ b/third_party/mojo/src/mojo/public/dart/src/event_stream.dart
|
| @@ -4,7 +4,7 @@
|
|
|
| part of core;
|
|
|
| -class MojoEventStream extends Stream<int> {
|
| +class MojoEventStream extends Stream<List<int>> {
|
| // The underlying Mojo handle.
|
| MojoHandle _handle;
|
|
|
| @@ -26,11 +26,11 @@ class MojoEventStream extends Stream<int> {
|
| // Whether listen has been called.
|
| bool _isListening;
|
|
|
| - MojoEventStream(MojoHandle handle,
|
| - [MojoHandleSignals signals = MojoHandleSignals.READABLE]) :
|
| - _handle = handle,
|
| - _signals = signals,
|
| - _isListening = false {
|
| + MojoEventStream(MojoHandle handle, [MojoHandleSignals signals =
|
| + MojoHandleSignals.PEER_CLOSED_READABLE])
|
| + : _handle = handle,
|
| + _signals = signals,
|
| + _isListening = false {
|
| MojoResult result = MojoHandle.register(this);
|
| if (!result.isOk) {
|
| throw "Failed to register the MojoHandle: $result.";
|
| @@ -39,7 +39,11 @@ class MojoEventStream extends Stream<int> {
|
|
|
| void close() {
|
| if (_handle != null) {
|
| - MojoHandleWatcher.close(_handle);
|
| + if (_isListening) {
|
| + MojoHandleWatcher.close(_handle);
|
| + } else {
|
| + _handle.close();
|
| + }
|
| _handle = null;
|
| }
|
| if (_receivePort != null) {
|
| @@ -48,20 +52,20 @@ class MojoEventStream extends Stream<int> {
|
| }
|
| }
|
|
|
| - StreamSubscription<List<int>> listen(
|
| - void onData(List event),
|
| + StreamSubscription<List<int>> listen(void onData(List event),
|
| {Function onError, void onDone(), bool cancelOnError}) {
|
| if (_isListening) {
|
| throw "Listen has already been called: $_handle.";
|
| }
|
| _receivePort = new ReceivePort();
|
| _sendPort = _receivePort.sendPort;
|
| - _controller = new StreamController(sync: true,
|
| + _controller = new StreamController(
|
| + sync: true,
|
| onListen: _onSubscriptionStateChange,
|
| onCancel: _onSubscriptionStateChange,
|
| onPause: _onPauseStateChange,
|
| onResume: _onPauseStateChange);
|
| - _controller.addStream(_receivePort);
|
| + _controller.addStream(_receivePort).whenComplete(_controller.close);
|
|
|
| if (_signals != MojoHandleSignals.NONE) {
|
| var res = MojoHandleWatcher.add(_handle, _sendPort, _signals.value);
|
| @@ -88,7 +92,8 @@ class MojoEventStream extends Stream<int> {
|
| }
|
| }
|
|
|
| - void enableReadEvents() => enableSignals(MojoHandleSignals.READABLE);
|
| + void enableReadEvents() =>
|
| + enableSignals(MojoHandleSignals.PEER_CLOSED_READABLE);
|
| void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE);
|
| void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE);
|
|
|
| @@ -119,19 +124,19 @@ class MojoEventStream extends Stream<int> {
|
| }
|
|
|
| abstract class Listener {
|
| - StreamSubscription<List<int>> listen();
|
| + StreamSubscription<List<int>> listen({Function onClosed});
|
| }
|
|
|
| -class MojoEventStreamListener implements Listener {
|
| +class MojoEventStreamListener {
|
| MojoMessagePipeEndpoint _endpoint;
|
| MojoEventStream _eventStream;
|
| bool _isOpen = false;
|
| bool _isInHandler = false;
|
|
|
| - MojoEventStreamListener(MojoMessagePipeEndpoint endpoint) :
|
| - _endpoint = endpoint,
|
| - _eventStream = new MojoEventStream(endpoint.handle),
|
| - _isOpen = false;
|
| + MojoEventStreamListener(MojoMessagePipeEndpoint endpoint)
|
| + : _endpoint = endpoint,
|
| + _eventStream = new MojoEventStream(endpoint.handle),
|
| + _isOpen = false;
|
|
|
| MojoEventStreamListener.fromHandle(MojoHandle handle) {
|
| _endpoint = new MojoMessagePipeEndpoint(handle);
|
| @@ -139,10 +144,10 @@ class MojoEventStreamListener implements Listener {
|
| _isOpen = false;
|
| }
|
|
|
| - MojoEventStreamListener.unbound() :
|
| - _endpoint = null,
|
| - _eventStream = null,
|
| - _isOpen = false;
|
| + MojoEventStreamListener.unbound()
|
| + : _endpoint = null,
|
| + _eventStream = null,
|
| + _isOpen = false;
|
|
|
| void bind(MojoMessagePipeEndpoint endpoint) {
|
| assert(!isBound);
|
| @@ -158,13 +163,14 @@ class MojoEventStreamListener implements Listener {
|
| _isOpen = false;
|
| }
|
|
|
| - StreamSubscription<List<int>> listen() {
|
| + StreamSubscription<List<int>> listen({Function onClosed}) {
|
| _isOpen = true;
|
| return _eventStream.listen((List<int> event) {
|
| var signalsWatched = new MojoHandleSignals(event[0]);
|
| var signalsReceived = new MojoHandleSignals(event[1]);
|
| if (signalsReceived.isPeerClosed) {
|
| - handlePeerClosed();
|
| + if (onClosed != null) onClosed();
|
| + close();
|
| // The peer being closed obviates any other signal we might
|
| // have received since we won't be able to read or write the handle.
|
| // Thus, we just return before invoking other handlers.
|
| @@ -180,30 +186,23 @@ class MojoEventStreamListener implements Listener {
|
| handleWrite();
|
| }
|
| if (_isOpen) {
|
| - _eventStream.enableSignals(enableSignals(
|
| - signalsWatched, signalsReceived));
|
| + _eventStream.enableSignals(signalsWatched);
|
| }
|
| _isInHandler = false;
|
| - });
|
| + }, onDone: close);
|
| }
|
|
|
| void close() {
|
| - if (_isOpen) {
|
| + _isOpen = false;
|
| + _endpoint = null;
|
| + if (_eventStream != null) {
|
| _eventStream.close();
|
| - _isOpen = false;
|
| _eventStream = null;
|
| - _endpoint = null;
|
| }
|
| }
|
|
|
| void handleRead() {}
|
| void handleWrite() {}
|
| - void handlePeerClosed() {
|
| - close();
|
| - }
|
| -
|
| - MojoHandleSignals enableSignals(MojoHandleSignals watched,
|
| - MojoHandleSignals received) => watched;
|
|
|
| MojoMessagePipeEndpoint get endpoint => _endpoint;
|
| bool get isOpen => _isOpen;
|
|
|