Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1435)

Unified Diff: mojo/public/dart/mojo/lib/src/event_stream.dart

Issue 1414483010: Dart: Use a RawReceivePort to receive events for Mojo handles. (Closed) Base URL: git@github.com:domokit/mojo.git@master
Patch Set: Format Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/public/dart/mojo/lib/src/event_stream.dart
diff --git a/mojo/public/dart/mojo/lib/src/event_stream.dart b/mojo/public/dart/mojo/lib/src/event_stream.dart
index 0ff091ee571c34f31e256879970b417ce9fa797d..5b816fa05603f4279fce54cd3e93cd626e9996a9 100644
--- a/mojo/public/dart/mojo/lib/src/event_stream.dart
+++ b/mojo/public/dart/mojo/lib/src/event_stream.dart
@@ -4,33 +4,29 @@
part of core;
-class MojoEventStream extends Stream<List<int>> {
+class MojoEventSubscription {
Cutch 2015/11/11 19:08:16 Consider 'MojoHandleEventSubscription'
zra 2015/11/11 20:43:26 Since Handles are the only source of events, and s
// The underlying Mojo handle.
MojoHandle _handle;
- // Providing our own stream controller allows us to take custom actions when
- // listeners pause/resume/etc. their StreamSubscription.
- StreamController _controller;
-
// The send port that we give to the handle watcher to notify us of handle
// events.
SendPort _sendPort;
// The receive port on which we listen and receive events from the handle
// watcher.
- ReceivePort _receivePort;
+ RawReceivePort _receivePort;
// The signals on this handle that we're interested in.
MojoHandleSignals _signals;
// Whether listen has been called.
Cutch 2015/11/11 19:08:15 s/listen/subscribe
zra 2015/11/11 20:43:26 Done.
- bool _isListening;
+ bool _isSubscribed;
- MojoEventStream(MojoHandle handle,
+ MojoEventSubscription(MojoHandle handle,
[MojoHandleSignals signals = MojoHandleSignals.PEER_CLOSED_READABLE])
: _handle = handle,
_signals = signals,
- _isListening = false {
+ _isSubscribed = false {
MojoResult result = MojoHandle.registerFinalizer(this);
if (!result.isOk) {
throw new MojoInternalError(
@@ -40,35 +36,26 @@ class MojoEventStream extends Stream<List<int>> {
Future close({bool immediate: false}) => _close(immediate: immediate);
- StreamSubscription<List<int>> listen(void onData(List event),
- {Function onError, void onDone(), bool cancelOnError}) {
- if (_isListening) {
- throw new MojoApiError("Listen has already been called: $_handle.");
+ void subscribe(void handler(List<int> event)) {
+ if (_isSubscribed) {
+ throw new MojoApiError("subscribe() has already been called: $this.");
}
- _receivePort = new ReceivePort();
+ _receivePort = new RawReceivePort(handler);
_sendPort = _receivePort.sendPort;
- _controller = new StreamController(
- sync: true,
- onPause: _onPauseStateChange,
- onResume: _onPauseStateChange);
- _controller.addStream(_receivePort).whenComplete(_controller.close);
if (_signals != MojoHandleSignals.NONE) {
- var res = new MojoResult(
- MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value));
- if (!res.isOk) {
+ int res = MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value);
+ if (res != MojoResult.kOk) {
throw new MojoInternalError("MojoHandleWatcher add failed: $res");
}
}
- _isListening = true;
- return _controller.stream.listen(onData,
- onError: onError, onDone: onDone, cancelOnError: cancelOnError);
+ _isSubscribed = true;
}
bool enableSignals(MojoHandleSignals signals) {
_signals = signals;
- if (_isListening) {
+ if (_isSubscribed) {
return MojoHandleWatcher.add(_handle.h, _sendPort, signals.value) ==
MojoResult.kOk;
}
@@ -82,7 +69,7 @@ class MojoEventStream extends Stream<List<int>> {
Future _close({bool immediate: false, bool local: false}) {
if (_handle != null) {
- if (_isListening && !local) {
+ if (_isSubscribed && !local) {
return _handleWatcherClose(immediate: immediate).then((result) {
// If the handle watcher is gone, then close the handle ourselves.
if (!result.isOk) {
@@ -117,59 +104,44 @@ class MojoEventStream extends Stream<List<int>> {
}
}
- void _onPauseStateChange() {
- if (_controller.isPaused) {
- var res = new MojoResult(MojoHandleWatcher.remove(_handle.h));
- if (!res.isOk) {
- throw new MojoInternalError("MojoHandleWatcher add failed: $res");
- }
- } else {
- var res = new MojoResult(
- MojoHandleWatcher.add(_handle.h, _sendPort, _signals.value));
- if (!res.isOk) {
- throw new MojoInternalError("MojoHandleWatcher add failed: $res");
- }
- }
- }
-
bool get readyRead => _handle.readyRead;
bool get readyWrite => _handle.readyWrite;
+ MojoHandleSignals get signals => _signals;
String toString() => "$_handle";
}
-typedef void ErrorHandler();
+typedef void ErrorHandler(Object e);
-class MojoEventStreamListener {
- StreamSubscription subscription;
+class MojoEventHandler {
Cutch 2015/11/11 19:08:16 Consider 'MojoHandleEventHandler'
zra 2015/11/11 20:43:26 Like above, my feeling is that this is too verbose
ErrorHandler onError;
MojoMessagePipeEndpoint _endpoint;
- MojoEventStream _eventStream;
+ MojoEventSubscription _eventSubscription;
bool _isOpen = false;
bool _isInHandler = false;
bool _isPeerClosed = false;
- MojoEventStreamListener.fromEndpoint(MojoMessagePipeEndpoint endpoint)
+ MojoEventHandler.fromEndpoint(MojoMessagePipeEndpoint endpoint)
: _endpoint = endpoint,
- _eventStream = new MojoEventStream(endpoint.handle) {
- listen();
+ _eventSubscription = new MojoEventSubscription(endpoint.handle) {
+ handleEvents();
}
- MojoEventStreamListener.fromHandle(MojoHandle handle) {
+ MojoEventHandler.fromHandle(MojoHandle handle) {
_endpoint = new MojoMessagePipeEndpoint(handle);
- _eventStream = new MojoEventStream(handle);
- listen();
+ _eventSubscription = new MojoEventSubscription(handle);
+ handleEvents();
}
- MojoEventStreamListener.unbound();
+ MojoEventHandler.unbound();
void bind(MojoMessagePipeEndpoint endpoint) {
if (isBound) {
throw new MojoApiError("MojoEventStreamListener is already bound.");
}
_endpoint = endpoint;
- _eventStream = new MojoEventStream(endpoint.handle);
+ _eventSubscription = new MojoEventSubscription(endpoint.handle);
_isOpen = false;
_isInHandler = false;
_isPeerClosed = false;
@@ -180,67 +152,74 @@ class MojoEventStreamListener {
throw new MojoApiError("MojoEventStreamListener is already bound.");
}
_endpoint = new MojoMessagePipeEndpoint(handle);
- _eventStream = new MojoEventStream(handle);
+ _eventSubscription = new MojoEventSubscription(handle);
_isOpen = false;
_isInHandler = false;
_isPeerClosed = false;
}
- StreamSubscription<List<int>> listen() {
+ void handleEvents() {
Cutch 2015/11/11 19:08:16 handleEvents -> beginHandlingEvents()
zra 2015/11/11 20:43:26 Done.
if (!isBound) {
throw new MojoApiError("MojoEventStreamListener is unbound.");
}
- if (subscription != null) {
- throw new MojoApiError("Listen has already been called.");
- }
_isOpen = true;
- subscription = _eventStream.listen((List<int> event) {
- if (!_isOpen) {
- // The actual close of the underlying stream happens asynchronously
- // after the call to close. However, we start to ignore incoming events
- // immediately.
- return;
- }
- var signalsWatched = new MojoHandleSignals(event[0]);
- var signalsReceived = new MojoHandleSignals(event[1]);
- _isInHandler = true;
- if (signalsReceived.isReadable) {
- assert(_eventStream.readyRead);
- handleRead();
- }
- if (signalsReceived.isWritable) {
- assert(_eventStream.readyWrite);
- handleWrite();
- }
- _isPeerClosed = signalsReceived.isPeerClosed ||
- !_eventStream.enableSignals(signalsWatched);
- _isInHandler = false;
- if (_isPeerClosed) {
- close().then((_) {
+ _eventSubscription.subscribe((List<int> event) {
+ try {
+ _handleEvent(event);
+ } catch (e) {
+ close(immediate: true).then((_) {
if (onError != null) {
- onError();
+ onError(e);
}
});
}
- }, onDone: close);
- return subscription;
+ });
}
Future close({bool immediate: false}) {
var result;
_isOpen = false;
_endpoint = null;
- subscription = null;
- if (_eventStream != null) {
- result = _eventStream
+ if (_eventSubscription != null) {
+ result = _eventSubscription
._close(immediate: immediate, local: _isPeerClosed)
.then((_) {
- _eventStream = null;
+ _eventSubscription = null;
});
}
return result != null ? result : new Future.value(null);
}
+ void _handleEvent(List<int> event) {
+ if (!_isOpen) {
+ // The actual close of the underlying stream happens asynchronously
+ // after the call to close. However, we start to ignore incoming events
+ // immediately.
+ return;
+ }
+ var signalsWatched = new MojoHandleSignals(event[0]);
+ var signalsReceived = new MojoHandleSignals(event[1]);
+ _isInHandler = true;
+ if (signalsReceived.isReadable) {
+ assert(_eventSubscription.readyRead);
+ handleRead();
+ }
+ if (signalsReceived.isWritable) {
+ assert(_eventSubscription.readyWrite);
+ handleWrite();
+ }
+ _isPeerClosed = signalsReceived.isPeerClosed ||
+ !_eventSubscription.enableSignals(signalsWatched);
+ _isInHandler = false;
+ if (_isPeerClosed) {
+ close().then((_) {
+ if (onError != null) {
+ onError(null);
+ }
+ });
+ }
+ }
+
void handleRead() {}
void handleWrite() {}
@@ -250,6 +229,6 @@ class MojoEventStreamListener {
bool get isBound => _endpoint != null;
bool get isPeerClosed => _isPeerClosed;
- String toString() => "MojoEventStreamListener("
+ String toString() => "MojoEventHandler("
"isOpen: $isOpen, isBound: $isBound, endpoint: $_endpoint)";
}

Powered by Google App Engine
This is Rietveld 408576698