Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(940)

Unified Diff: mojo/public/dart/src/client.dart

Issue 800523004: Dart: Simplifies the handle watcher. Various cleanups and bugfixes. (Closed) Base URL: git@github.com:domokit/mojo.git@master
Patch Set: Created 6 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/public/dart/src/client.dart
diff --git a/mojo/public/dart/src/client.dart b/mojo/public/dart/src/client.dart
index 956a6525d6dc7e10331b57aa2e84f8d8209c48b9..767df912f3d42089e7c4b2161e5f4eaa73df6051 100644
--- a/mojo/public/dart/src/client.dart
+++ b/mojo/public/dart/src/client.dart
@@ -8,71 +8,106 @@ abstract class Client {
core.MojoMessagePipeEndpoint _endpoint;
core.MojoHandle _handle;
List _sendQueue;
- List _completerQueue;
+ Map<int, Completer> _completerMap;
bool _isOpen = false;
+ int _nextId = 0;
void handleResponse(MessageReader reader);
- Client(this._endpoint) {
+ Client(core.MojoMessagePipeEndpoint endpoint) :
+ _sendQueue = [],
+ _completerMap = {},
+ _endpoint = endpoint,
+ _handle = new core.MojoHandle(endpoint.handle);
+
+ Client.fromHandle(int handle) {
_sendQueue = [];
- _completerQueue = [];
+ _completerMap = {};
+ _endpoint =
+ new core.MojoMessagePipeEndpoint(new core.RawMojoHandle(handle));
_handle = new core.MojoHandle(_endpoint.handle);
}
- void open() {
- _handle.listen((int mojoSignal) {
- if (core.MojoHandleSignals.isReadable(mojoSignal)) {
- // Query how many bytes are available.
- var result = _endpoint.query();
- if (!result.status.isOk && !result.status.isResourceExhausted) {
- // If something else happens, it means the handle wasn't really ready
- // for reading, which indicates a bug in MojoHandle or the
- // handle watcher.
- throw new Exception("message pipe query failed: ${result.status}");
- }
+ void _doRead() {
+ // Query how many bytes are available.
+ var result = _endpoint.query();
+ if (!result.status.isOk && !result.status.isResourceExhausted) {
+ // If something else happens, it means the handle wasn't really ready
+ // for reading, which indicates a bug in MojoHandle or the
+ // handle watcher.
+ throw "message pipe query failed: ${result.status}";
+ }
- // Read the data.
- var bytes = new ByteData(result.bytesRead);
- var handles = new List<core.RawMojoHandle>(result.handlesRead);
- result = _endpoint.read(bytes, result.bytesRead, handles);
- if (!result.status.isOk && !result.status.isResourceExhausted) {
- throw new Exception("message pipe read failed: ${result.status}");
- }
- var message = new Message(bytes, handles);
- var reader = new MessageReader(message);
+ // Read the data.
+ var bytes = new ByteData(result.bytesRead);
+ var handles = new List<core.RawMojoHandle>(result.handlesRead);
+ result = _endpoint.read(bytes, result.bytesRead, handles);
+ if (!result.status.isOk && !result.status.isResourceExhausted) {
+ throw "message pipe read failed: ${result.status}";
+ }
+ var message = new Message(bytes, handles);
+ var reader = new MessageReader(message);
- handleResponse(reader);
+ handleResponse(reader);
+ }
+
+ void _doWrite() {
+ if (_sendQueue.length > 0) {
+ List messageCompleter = _sendQueue.removeAt(0);
+ Message message = messageCompleter[0];
+ Completer completer = messageCompleter[1];
+ _endpoint.write(message.buffer,
+ message.buffer.lengthInBytes,
+ message.handles);
+ if (!_endpoint.status.isOk) {
+ throw "message pipe write failed";
}
- if (core.MojoHandleSignals.isWritable(mojoSignal)) {
- if (_sendQueue.length > 0) {
- List messageCompleter = _sendQueue.removeAt(0);
- _endpoint.write(messageCompleter[0].buffer,
- messageCompleter[0].buffer.lengthInBytes,
- messageCompleter[0].handles);
- if (!_endpoint.status.isOk) {
- throw new Exception("message pipe write failed");
- }
- if (messageCompleter[1] != null) {
- _completerQueue.add(messageCompleter[1]);
- }
+ if (completer != null) {
+ if (!message.expectsResponse) {
+ throw "Message has a completer, but does not expect a response";
}
- if ((_sendQueue.length == 0) && _handle.writeEnabled()) {
- _handle.disableWriteEvents();
+ int requestID = message.requestID;
+ if (_completerMap[requestID] != null) {
+ throw "Request ID $requestID is already in use.";
}
+ _completerMap[requestID] = completer;
+ }
+ }
+ }
+
+ void open() {
+ _handle.listen((List<int> event) {
+ var signalsWatched = new core.MojoHandleSignals(event[0]);
+ var signalsReceived = new core.MojoHandleSignals(event[1]);
+ if (signalsReceived.isPeerClosed) {
+ close();
+ return;
+ }
+
+ if (signalsReceived.isReadable) {
+ _doRead();
}
- if (core.MojoHandleSignals.isNone(mojoSignal)) {
- // The handle watcher will send MojoHandleSignals.NONE if the other
- // endpoint of the pipe is closed.
- _handle.close();
+
+ if (signalsReceived.isWritable) {
+ _doWrite();
+ }
+
+ if (_sendQueue.length == 0) {
+ var withoutWritable = signalsWatched - core.MojoHandleSignals.WRITABLE;
+ _handle.enableSignals(withoutWritable);
+ } else {
+ _handle.enableSignals(signalsWatched);
}
});
_isOpen = true;
}
void close() {
- assert(isOpen);
- _handle.close();
- _isOpen = false;
+ if (_isOpen) {
+ _handle.close();
+ _handle = null;
+ _isOpen = false;
+ }
}
void enqueueMessage(Type t, int name, Object msg) {
@@ -80,13 +115,26 @@ abstract class Client {
builder.encodeStruct(t, msg);
var message = builder.finish();
_sendQueue.add([message, null]);
- if (!_handle.writeEnabled()) {
- _handle.enableWriteEvents();
+ if (_sendQueue.length == 1) {
+ _handle.enableAllEvents();
+ }
+ }
+
+ int _getNextId() {
+ int next = _nextId;
+ _nextId++;
+ if (_nextId.bitLength > 64) {
siva 2014/12/29 23:20:44 maybe document what this magic number 64 is.
zra 2014/12/30 16:29:33 Added a comment.
+ _nextId = 1;
}
+ return next;
}
Future enqueueMessageWithRequestID(
Type t, int name, int id, int flags, Object msg) {
+ if (id == -1) {
+ id = _getNextId();
+ }
+
var builder = new MessageWithRequestIDBuilder(
name, align(getEncodedSize(t)), id, flags);
builder.encodeStruct(t, msg);
@@ -94,13 +142,15 @@ abstract class Client {
var completer = new Completer();
_sendQueue.add([message, completer]);
- if (!_handle.writeEnabled()) {
- _handle.enableWriteEvents();
+ if (_sendQueue.length == 1) {
+ _handle.enableAllEvents();
+ } else {
+ _handle.enableReadEvents();
}
return completer.future;
}
// Need a getter for this for access in subclasses.
- List get completerQueue => _completerQueue;
+ Map<int, Completer> get completerMap => _completerMap;
bool get isOpen => _isOpen;
}

Powered by Google App Engine
This is Rietveld 408576698