Index: third_party/mojo/src/mojo/public/dart/src/event_stream.dart |
diff --git a/third_party/mojo/src/mojo/public/dart/src/event_stream.dart b/third_party/mojo/src/mojo/public/dart/src/event_stream.dart |
index 610285a358d3eed64f1e5f6c4d236a1b0bb203c6..8f022acc93ffbb4b1b18f6d580b64cf15fae348e 100644 |
--- a/third_party/mojo/src/mojo/public/dart/src/event_stream.dart |
+++ b/third_party/mojo/src/mojo/public/dart/src/event_stream.dart |
@@ -4,7 +4,7 @@ |
part of core; |
-class MojoEventStream extends Stream<int> { |
+class MojoEventStream extends Stream<List<int>> { |
// The underlying Mojo handle. |
MojoHandle _handle; |
@@ -26,11 +26,11 @@ class MojoEventStream extends Stream<int> { |
// Whether listen has been called. |
bool _isListening; |
- MojoEventStream(MojoHandle handle, |
- [MojoHandleSignals signals = MojoHandleSignals.READABLE]) : |
- _handle = handle, |
- _signals = signals, |
- _isListening = false { |
+ MojoEventStream(MojoHandle handle, [MojoHandleSignals signals = |
+ MojoHandleSignals.PEER_CLOSED_READABLE]) |
+ : _handle = handle, |
+ _signals = signals, |
+ _isListening = false { |
MojoResult result = MojoHandle.register(this); |
if (!result.isOk) { |
throw "Failed to register the MojoHandle: $result."; |
@@ -39,7 +39,11 @@ class MojoEventStream extends Stream<int> { |
void close() { |
if (_handle != null) { |
- MojoHandleWatcher.close(_handle); |
+ if (_isListening) { |
+ MojoHandleWatcher.close(_handle); |
+ } else { |
+ _handle.close(); |
+ } |
_handle = null; |
} |
if (_receivePort != null) { |
@@ -48,20 +52,20 @@ class MojoEventStream extends Stream<int> { |
} |
} |
- StreamSubscription<List<int>> listen( |
- void onData(List event), |
+ StreamSubscription<List<int>> listen(void onData(List event), |
{Function onError, void onDone(), bool cancelOnError}) { |
if (_isListening) { |
throw "Listen has already been called: $_handle."; |
} |
_receivePort = new ReceivePort(); |
_sendPort = _receivePort.sendPort; |
- _controller = new StreamController(sync: true, |
+ _controller = new StreamController( |
+ sync: true, |
onListen: _onSubscriptionStateChange, |
onCancel: _onSubscriptionStateChange, |
onPause: _onPauseStateChange, |
onResume: _onPauseStateChange); |
- _controller.addStream(_receivePort); |
+ _controller.addStream(_receivePort).whenComplete(_controller.close); |
if (_signals != MojoHandleSignals.NONE) { |
var res = MojoHandleWatcher.add(_handle, _sendPort, _signals.value); |
@@ -88,7 +92,8 @@ class MojoEventStream extends Stream<int> { |
} |
} |
- void enableReadEvents() => enableSignals(MojoHandleSignals.READABLE); |
+ void enableReadEvents() => |
+ enableSignals(MojoHandleSignals.PEER_CLOSED_READABLE); |
void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE); |
void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE); |
@@ -119,19 +124,19 @@ class MojoEventStream extends Stream<int> { |
} |
abstract class Listener { |
- StreamSubscription<List<int>> listen(); |
+ StreamSubscription<List<int>> listen({Function onClosed}); |
} |
-class MojoEventStreamListener implements Listener { |
+class MojoEventStreamListener { |
MojoMessagePipeEndpoint _endpoint; |
MojoEventStream _eventStream; |
bool _isOpen = false; |
bool _isInHandler = false; |
- MojoEventStreamListener(MojoMessagePipeEndpoint endpoint) : |
- _endpoint = endpoint, |
- _eventStream = new MojoEventStream(endpoint.handle), |
- _isOpen = false; |
+ MojoEventStreamListener(MojoMessagePipeEndpoint endpoint) |
+ : _endpoint = endpoint, |
+ _eventStream = new MojoEventStream(endpoint.handle), |
+ _isOpen = false; |
MojoEventStreamListener.fromHandle(MojoHandle handle) { |
_endpoint = new MojoMessagePipeEndpoint(handle); |
@@ -139,10 +144,10 @@ class MojoEventStreamListener implements Listener { |
_isOpen = false; |
} |
- MojoEventStreamListener.unbound() : |
- _endpoint = null, |
- _eventStream = null, |
- _isOpen = false; |
+ MojoEventStreamListener.unbound() |
+ : _endpoint = null, |
+ _eventStream = null, |
+ _isOpen = false; |
void bind(MojoMessagePipeEndpoint endpoint) { |
assert(!isBound); |
@@ -158,13 +163,14 @@ class MojoEventStreamListener implements Listener { |
_isOpen = false; |
} |
- StreamSubscription<List<int>> listen() { |
+ StreamSubscription<List<int>> listen({Function onClosed}) { |
_isOpen = true; |
return _eventStream.listen((List<int> event) { |
var signalsWatched = new MojoHandleSignals(event[0]); |
var signalsReceived = new MojoHandleSignals(event[1]); |
if (signalsReceived.isPeerClosed) { |
- handlePeerClosed(); |
+ if (onClosed != null) onClosed(); |
+ close(); |
// The peer being closed obviates any other signal we might |
// have received since we won't be able to read or write the handle. |
// Thus, we just return before invoking other handlers. |
@@ -180,30 +186,23 @@ class MojoEventStreamListener implements Listener { |
handleWrite(); |
} |
if (_isOpen) { |
- _eventStream.enableSignals(enableSignals( |
- signalsWatched, signalsReceived)); |
+ _eventStream.enableSignals(signalsWatched); |
} |
_isInHandler = false; |
- }); |
+ }, onDone: close); |
} |
void close() { |
- if (_isOpen) { |
+ _isOpen = false; |
+ _endpoint = null; |
+ if (_eventStream != null) { |
_eventStream.close(); |
- _isOpen = false; |
_eventStream = null; |
- _endpoint = null; |
} |
} |
void handleRead() {} |
void handleWrite() {} |
- void handlePeerClosed() { |
- close(); |
- } |
- |
- MojoHandleSignals enableSignals(MojoHandleSignals watched, |
- MojoHandleSignals received) => watched; |
MojoMessagePipeEndpoint get endpoint => _endpoint; |
bool get isOpen => _isOpen; |