Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(935)

Unified Diff: mojo/public/dart/src/interface.dart

Issue 800523004: Dart: Simplifies the handle watcher. Various cleanups and bugfixes. (Closed) Base URL: git@github.com:domokit/mojo.git@master
Patch Set: Created 6 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/public/dart/src/interface.dart
diff --git a/mojo/public/dart/src/interface.dart b/mojo/public/dart/src/interface.dart
index 8d8970a0d4b3a28ad2cea1da0b9ce7afc8ecf416..f6999074e5eaad156f4be5843200da7b010a2772 100644
--- a/mojo/public/dart/src/interface.dart
+++ b/mojo/public/dart/src/interface.dart
@@ -10,72 +10,109 @@ abstract class Interface {
List _sendQueue;
bool _isOpen;
- Message handleMessage(MessageReader reader);
+ Future<Message> handleMessage(MessageReader reader);
- Interface(this._endpoint) {
+ Interface(core.MojoMessagePipeEndpoint endpoint) :
+ _endpoint = endpoint,
+ _sendQueue = [],
+ _handle = new core.MojoHandle(endpoint.handle),
+ _isOpen = false;
+
+ Interface.fromHandle(int handle) {
+ _endpoint =
+ new core.MojoMessagePipeEndpoint(new core.RawMojoHandle(handle));
_sendQueue = [];
_handle = new core.MojoHandle(_endpoint.handle);
_isOpen = false;
}
+ void _doRead() {
+ assert(_handle.readyRead);
+
+ // Query how many bytes are available.
+ var result = _endpoint.query();
+ if (!result.status.isOk && !result.status.isResourceExhausted) {
+ // If something else happens, it means the handle wasn't really ready
+ // for reading, which indicates a bug in MojoHandle or the
+ // handle watcher.
+ throw "message pipe query failed: ${result.status} for $_handle";
+ }
+
+ // Read the data and view as a message.
+ var bytes = new ByteData(result.bytesRead);
+ var handles = new List<core.RawMojoHandle>(result.handlesRead);
+ result = _endpoint.read(bytes, result.bytesRead, handles);
+ if (!result.status.isOk && !result.status.isResourceExhausted) {
+ // If something else happens, it means the handle wasn't really ready
+ // for reading, which indicates a bug in MojoHandle or the
+ // handle watcher.
+ throw "message pipe read failed: ${result.status}";
+ }
+ var message = new Message(bytes, handles);
+ var reader = new MessageReader(message);
+
+ // Prepare the response.
+ var responseFuture = handleMessage(reader);
+
+ // If there's a response, queue it up for sending.
+ if (responseFuture != null) {
+ responseFuture.then((response) {
+ _sendQueue.add(response);
+ if (_sendQueue.length == 1) {
+ _handle.enableWriteEvents();
+ }
+ });
+ }
+ }
+
+ void _doWrite() {
+ if (_sendQueue.length > 0) {
+ assert(_handle.readyWrite);
+ var responseMessage = _sendQueue.removeAt(0);
+ _endpoint.write(responseMessage.buffer,
+ responseMessage.buffer.lengthInBytes,
+ responseMessage.handles);
+ if (!_endpoint.status.isOk) {
+ throw "message pipe write failed: ${_endpoint.status}";
+ }
+ }
+ }
+
StreamSubscription<int> listen() {
_isOpen = true;
- return _handle.listen((int mojoSignal) {
- if (core.MojoHandleSignals.isReadable(mojoSignal)) {
- // Query how many bytes are available.
- var result = _endpoint.query();
- if (!result.status.isOk && !result.status.isResourceExhausted) {
- // If something else happens, it means the handle wasn't really ready
- // for reading, which indicates a bug in MojoHandle or the
- // event listener.
- throw new Exception("message pipe query failed: ${result.status}");
- }
+ return _handle.listen((List<int> event) {
+ var signalsWatched = new core.MojoHandleSignals(event[0]);
+ var signalsReceived = new core.MojoHandleSignals(event[1]);
+ if (signalsReceived.isPeerClosed) {
+ close();
+ return;
+ }
- // Read the data and view as a message.
- var bytes = new ByteData(result.bytesRead);
- var handles = new List<core.RawMojoHandle>(result.handlesRead);
- result = _endpoint.read(bytes, result.bytesRead, handles);
- if (!result.status.isOk && !result.status.isResourceExhausted) {
- // If something else happens, it means the handle wasn't really ready
- // for reading, which indicates a bug in MojoHandle or the
- // event listener.
- throw new Exception("message pipe read failed: ${result.status}");
- }
- var message = new Message(bytes, handles);
- var reader = new MessageReader(message);
-
- // Prepare the response.
- var responseMessage = handleMessage(reader);
- // If there's a response, queue it up for sending.
- if (responseMessage != null) {
- _sendQueue.add(responseMessage);
- if ((_sendQueue.length > 0) && !_handle.writeEnabled()) {
- _handle.enableWriteEvents();
- }
- }
+ if (signalsReceived.isReadable) {
+ _doRead();
}
- if (core.MojoHandleSignals.isWritable(mojoSignal)) {
- if (_sendQueue.length > 0) {
- var responseMessage = _sendQueue.removeAt(0);
- _endpoint.write(responseMessage.buffer,
- responseMessage.buffer.lengthInBytes,
- responseMessage.handles);
- if (!_endpoint.status.isOk) {
- throw new Exception("message pipe write failed");
- }
- }
- if ((_sendQueue.length == 0) && _handle.writeEnabled()) {
- _handle.disableWriteEvents();
- }
+
+ if (signalsReceived.isWritable) {
+ _doWrite();
}
- if (core.MojoHandleSignals.isNone(mojoSignal)) {
- // The handle watcher will send MojoHandleSignals.NONE when the other
- // endpoint of the pipe is closed.
- _handle.close();
+
+ if (_sendQueue.length == 0) {
+ var withoutWritable = signalsWatched - core.MojoHandleSignals.WRITABLE;
+ _handle.enableSignals(withoutWritable);
+ } else {
+ _handle.enableSignals(signalsWatched);
}
});
}
+ void close() {
+ if (_isOpen) {
+ _handle.close();
+ _isOpen = false;
+ _handle = null;
+ }
+ }
+
Message buildResponse(Type t, int name, Object response) {
var builder = new MessageBuilder(name, align(getEncodedSize(t)));
builder.encodeStruct(t, response);
@@ -95,14 +132,14 @@ abstract class Interface {
builder.encodeStruct(t, msg);
var message = builder.finish();
_sendQueue.add(message);
- if (!_handle.writeEnabled()) {
- _handle.enableWriteEvents();
- }
+ _handle.enableWriteEvents();
}
Future enqueueMessageWithRequestID(Type t, int name, int id, Object msg) {
- throw new Exception("The client Mixin should not expect a response");
+ // TODO(zra): Is this correct?
+ throw "The client interface should not expect a response";
}
bool get isOpen => _isOpen;
+ core.MojoMessagePipeEndpoint get endpoint => _endpoint;
}

Powered by Google App Engine
This is Rietveld 408576698