Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(9)

Unified Diff: mojo/public/dart/mojo/lib/src/event_stream.dart

Issue 1414483010: Dart: Use a RawReceivePort to receive events for Mojo handles. (Closed) Base URL: git@github.com:domokit/mojo.git@master
Patch Set: Merge Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/public/dart/mojo/lib/src/event_stream.dart
diff --git a/mojo/public/dart/mojo/lib/src/event_stream.dart b/mojo/public/dart/mojo/lib/src/event_stream.dart
index 0ff091ee571c34f31e256879970b417ce9fa797d..33eedf42995e3c1cbd4f465f7d8428a3c9e5eca5 100644
--- a/mojo/public/dart/mojo/lib/src/event_stream.dart
+++ b/mojo/public/dart/mojo/lib/src/event_stream.dart
@@ -4,21 +4,17 @@
part of core;
-class MojoEventStream extends Stream<List<int>> {
+class MojoEventHandler {
Cutch 2015/11/11 17:47:17 This class doesn't handle events. It wraps a subsc
zra 2015/11/11 18:44:33 Renamed MojoEventSubscription
// The underlying Mojo handle.
MojoHandle _handle;
- // Providing our own stream controller allows us to take custom actions when
- // listeners pause/resume/etc. their StreamSubscription.
- StreamController _controller;
-
// The send port that we give to the handle watcher to notify us of handle
// events.
SendPort _sendPort;
// The receive port on which we listen and receive events from the handle
// watcher.
- ReceivePort _receivePort;
+ RawReceivePort _receivePort;
// The signals on this handle that we're interested in.
MojoHandleSignals _signals;
@@ -26,7 +22,7 @@ class MojoEventStream extends Stream<List<int>> {
// Whether listen has been called.
bool _isListening;
- MojoEventStream(MojoHandle handle,
+ MojoEventHandler(MojoHandle handle,
[MojoHandleSignals signals = MojoHandleSignals.PEER_CLOSED_READABLE])
: _handle = handle,
_signals = signals,
@@ -40,30 +36,21 @@ class MojoEventStream extends Stream<List<int>> {
Future close({bool immediate: false}) => _close(immediate: immediate);
- StreamSubscription<List<int>> listen(void onData(List event),
- {Function onError, void onDone(), bool cancelOnError}) {
+ void handleEvents(void router(List<int> event)) {
Cutch 2015/11/11 17:47:17 I find this name confusing because handleEvents do
zra 2015/11/11 18:44:33 Renamed subscribe
if (_isListening) {
throw new MojoApiError("Listen has already been called: $_handle.");
}
- _receivePort = new ReceivePort();
+ _receivePort = new RawReceivePort(router);
_sendPort = _receivePort.sendPort;
- _controller = new StreamController(
- sync: true,
- onPause: _onPauseStateChange,
- onResume: _onPauseStateChange);
- _controller.addStream(_receivePort).whenComplete(_controller.close);
if (_signals != MojoHandleSignals.NONE) {
- var res = new MojoResult(
- MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value));
- if (!res.isOk) {
+ int res = MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value);
+ if (res != MojoResult.kOk) {
throw new MojoInternalError("MojoHandleWatcher add failed: $res");
}
}
_isListening = true;
- return _controller.stream.listen(onData,
- onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
bool enableSignals(MojoHandleSignals signals) {
@@ -117,59 +104,44 @@ class MojoEventStream extends Stream<List<int>> {
}
}
- void _onPauseStateChange() {
- if (_controller.isPaused) {
- var res = new MojoResult(MojoHandleWatcher.remove(_handle.h));
- if (!res.isOk) {
- throw new MojoInternalError("MojoHandleWatcher add failed: $res");
- }
- } else {
- var res = new MojoResult(
- MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value));
- if (!res.isOk) {
- throw new MojoInternalError("MojoHandleWatcher add failed: $res");
- }
- }
- }
-
bool get readyRead => _handle.readyRead;
bool get readyWrite => _handle.readyWrite;
+ MojoHandleSignals get signals => _signals;
String toString() => "$_handle";
}
-typedef void ErrorHandler();
+typedef void ErrorHandler(Object e);
-class MojoEventStreamListener {
- StreamSubscription subscription;
+class MojoEventController {
ErrorHandler onError;
MojoMessagePipeEndpoint _endpoint;
- MojoEventStream _eventStream;
+ MojoEventHandler _eventHandler;
bool _isOpen = false;
bool _isInHandler = false;
bool _isPeerClosed = false;
- MojoEventStreamListener.fromEndpoint(MojoMessagePipeEndpoint endpoint)
+ MojoEventController.fromEndpoint(MojoMessagePipeEndpoint endpoint)
: _endpoint = endpoint,
- _eventStream = new MojoEventStream(endpoint.handle) {
- listen();
+ _eventHandler = new MojoEventHandler(endpoint.handle) {
+ handleEvents();
}
- MojoEventStreamListener.fromHandle(MojoHandle handle) {
+ MojoEventController.fromHandle(MojoHandle handle) {
_endpoint = new MojoMessagePipeEndpoint(handle);
- _eventStream = new MojoEventStream(handle);
- listen();
+ _eventHandler = new MojoEventHandler(handle);
+ handleEvents();
}
- MojoEventStreamListener.unbound();
+ MojoEventController.unbound();
void bind(MojoMessagePipeEndpoint endpoint) {
if (isBound) {
throw new MojoApiError("MojoEventStreamListener is already bound.");
}
_endpoint = endpoint;
- _eventStream = new MojoEventStream(endpoint.handle);
+ _eventHandler = new MojoEventHandler(endpoint.handle);
_isOpen = false;
_isInHandler = false;
_isPeerClosed = false;
@@ -180,67 +152,74 @@ class MojoEventStreamListener {
throw new MojoApiError("MojoEventStreamListener is already bound.");
}
_endpoint = new MojoMessagePipeEndpoint(handle);
- _eventStream = new MojoEventStream(handle);
+ _eventHandler = new MojoEventHandler(handle);
_isOpen = false;
_isInHandler = false;
_isPeerClosed = false;
}
- StreamSubscription<List<int>> listen() {
+ void handleEvents() {
if (!isBound) {
throw new MojoApiError("MojoEventStreamListener is unbound.");
}
- if (subscription != null) {
- throw new MojoApiError("Listen has already been called.");
- }
_isOpen = true;
- subscription = _eventStream.listen((List<int> event) {
- if (!_isOpen) {
- // The actual close of the underlying stream happens asynchronously
- // after the call to close. However, we start to ignore incoming events
- // immediately.
- return;
- }
- var signalsWatched = new MojoHandleSignals(event[0]);
- var signalsReceived = new MojoHandleSignals(event[1]);
- _isInHandler = true;
- if (signalsReceived.isReadable) {
- assert(_eventStream.readyRead);
- handleRead();
- }
- if (signalsReceived.isWritable) {
- assert(_eventStream.readyWrite);
- handleWrite();
- }
- _isPeerClosed = signalsReceived.isPeerClosed ||
- !_eventStream.enableSignals(signalsWatched);
- _isInHandler = false;
- if (_isPeerClosed) {
- close().then((_) {
+ _eventHandler.handleEvents((List<int> event) {
+ try {
+ _handleEvent(event);
+ } catch (e) {
+ close(immediate: true).then((_) {
if (onError != null) {
- onError();
+ onError(e);
}
});
}
- }, onDone: close);
- return subscription;
+ });
}
Future close({bool immediate: false}) {
var result;
_isOpen = false;
_endpoint = null;
- subscription = null;
- if (_eventStream != null) {
- result = _eventStream
+ if (_eventHandler != null) {
+ result = _eventHandler
._close(immediate: immediate, local: _isPeerClosed)
.then((_) {
- _eventStream = null;
+ _eventHandler = null;
});
}
return result != null ? result : new Future.value(null);
}
+ void _handleEvent(List<int> event) {
+ if (!_isOpen) {
+ // The actual close of the underlying stream happens asynchronously
+ // after the call to close. However, we start to ignore incoming events
+ // immediately.
+ return;
+ }
+ var signalsWatched = new MojoHandleSignals(event[0]);
+ var signalsReceived = new MojoHandleSignals(event[1]);
+ _isInHandler = true;
+ if (signalsReceived.isReadable) {
+ assert(_eventHandler.readyRead);
+ handleRead();
+ }
+ if (signalsReceived.isWritable) {
+ assert(_eventHandler.readyWrite);
+ handleWrite();
+ }
+ _isPeerClosed = signalsReceived.isPeerClosed ||
+ !_eventHandler.enableSignals(signalsWatched);
+ _isInHandler = false;
+ if (_isPeerClosed) {
+ close().then((_) {
+ if (onError != null) {
+ onError(null);
+ }
+ });
+ }
+ }
+
void handleRead() {}
Cutch 2015/11/11 17:47:17 Maybe this class should be called 'MojoEventHandle
zra 2015/11/11 18:44:33 Done.
void handleWrite() {}
@@ -250,6 +229,6 @@ class MojoEventStreamListener {
bool get isBound => _endpoint != null;
bool get isPeerClosed => _isPeerClosed;
- String toString() => "MojoEventStreamListener("
+ String toString() => "MojoEventController("
"isOpen: $isOpen, isBound: $isBound, endpoint: $_endpoint)";
}

Powered by Google App Engine
This is Rietveld 408576698