Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1259)

Unified Diff: mojo/public/dart/src/client.dart

Issue 800523004: Dart: Simplifies the handle watcher. Various cleanups and bugfixes. (Closed) Base URL: git@github.com:domokit/mojo.git@master
Patch Set: cleanup Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/public/dart/src/buffer.dart ('k') | mojo/public/dart/src/codec.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/public/dart/src/client.dart
diff --git a/mojo/public/dart/src/client.dart b/mojo/public/dart/src/client.dart
index 956a6525d6dc7e10331b57aa2e84f8d8209c48b9..04462b344b8878bba67213c9057923bf006090a8 100644
--- a/mojo/public/dart/src/client.dart
+++ b/mojo/public/dart/src/client.dart
@@ -6,73 +6,101 @@ part of bindings;
abstract class Client {
core.MojoMessagePipeEndpoint _endpoint;
- core.MojoHandle _handle;
+ core.MojoEventStream _eventStream;
List _sendQueue;
- List _completerQueue;
+ Map<int, Completer> _completerMap;
bool _isOpen = false;
+ int _nextId = 0;
void handleResponse(MessageReader reader);
- Client(this._endpoint) {
+ Client(core.MojoMessagePipeEndpoint endpoint) :
+ _sendQueue = [],
+ _completerMap = {},
+ _endpoint = endpoint,
+ _eventStream = new core.MojoEventStream(endpoint.handle);
+
+ Client.fromHandle(int handle) {
_sendQueue = [];
- _completerQueue = [];
- _handle = new core.MojoHandle(_endpoint.handle);
+ _completerMap = {};
+ _endpoint =
+ new core.MojoMessagePipeEndpoint(new core.MojoHandle(handle));
+ _eventStream = new core.MojoHandle(_endpoint.handle);
}
- void open() {
- _handle.listen((int mojoSignal) {
- if (core.MojoHandleSignals.isReadable(mojoSignal)) {
- // Query how many bytes are available.
- var result = _endpoint.query();
- if (!result.status.isOk && !result.status.isResourceExhausted) {
- // If something else happens, it means the handle wasn't really ready
- // for reading, which indicates a bug in MojoHandle or the
- // handle watcher.
- throw new Exception("message pipe query failed: ${result.status}");
- }
+ void _doRead() {
+ // Query how many bytes are available.
+ var result = _endpoint.query();
+ assert(result.status.isOk || result.status.isResourceExhausted);
- // Read the data.
- var bytes = new ByteData(result.bytesRead);
- var handles = new List<core.RawMojoHandle>(result.handlesRead);
- result = _endpoint.read(bytes, result.bytesRead, handles);
- if (!result.status.isOk && !result.status.isResourceExhausted) {
- throw new Exception("message pipe read failed: ${result.status}");
- }
- var message = new Message(bytes, handles);
- var reader = new MessageReader(message);
+ // Read the data.
+ var bytes = new ByteData(result.bytesRead);
+ var handles = new List<core.MojoHandle>(result.handlesRead);
+ result = _endpoint.read(bytes, result.bytesRead, handles);
+ assert(result.status.isOk || result.status.isResourceExhausted);
+ var message = new Message(bytes, handles);
+ var reader = new MessageReader(message);
+
+ handleResponse(reader);
+ }
- handleResponse(reader);
+ void _doWrite() {
+ if (_sendQueue.length > 0) {
+ List messageCompleter = _sendQueue.removeAt(0);
+ Message message = messageCompleter[0];
+ Completer completer = messageCompleter[1];
+ _endpoint.write(message.buffer,
+ message.buffer.lengthInBytes,
+ message.handles);
+ if (!_endpoint.status.isOk) {
+ throw "message pipe write failed";
}
- if (core.MojoHandleSignals.isWritable(mojoSignal)) {
- if (_sendQueue.length > 0) {
- List messageCompleter = _sendQueue.removeAt(0);
- _endpoint.write(messageCompleter[0].buffer,
- messageCompleter[0].buffer.lengthInBytes,
- messageCompleter[0].handles);
- if (!_endpoint.status.isOk) {
- throw new Exception("message pipe write failed");
- }
- if (messageCompleter[1] != null) {
- _completerQueue.add(messageCompleter[1]);
- }
+ if (completer != null) {
+ if (!message.expectsResponse) {
+ throw "Message has a completer, but does not expect a response";
}
- if ((_sendQueue.length == 0) && _handle.writeEnabled()) {
- _handle.disableWriteEvents();
+ int requestID = message.requestID;
+ if (_completerMap[requestID] != null) {
+ throw "Request ID $requestID is already in use.";
}
+ _completerMap[requestID] = completer;
}
- if (core.MojoHandleSignals.isNone(mojoSignal)) {
- // The handle watcher will send MojoHandleSignals.NONE if the other
- // endpoint of the pipe is closed.
- _handle.close();
+ }
+ }
+
+ void open() {
+ _eventStream.listen((List<int> event) {
+ var signalsWatched = new core.MojoHandleSignals(event[0]);
+ var signalsReceived = new core.MojoHandleSignals(event[1]);
+ if (signalsReceived.isPeerClosed) {
+ close();
+ return;
+ }
+
+ if (signalsReceived.isReadable) {
+ _doRead();
+ }
+
+ if (signalsReceived.isWritable) {
+ _doWrite();
+ }
+
+ if (_sendQueue.length == 0) {
+ var withoutWritable = signalsWatched - core.MojoHandleSignals.WRITABLE;
+ _eventStream.enableSignals(withoutWritable);
+ } else {
+ _eventStream.enableSignals(signalsWatched);
}
});
_isOpen = true;
}
void close() {
- assert(isOpen);
- _handle.close();
- _isOpen = false;
+ if (_isOpen) {
+ _eventStream.close();
+ _eventStream = null;
+ _isOpen = false;
+ }
}
void enqueueMessage(Type t, int name, Object msg) {
@@ -80,13 +108,21 @@ abstract class Client {
builder.encodeStruct(t, msg);
var message = builder.finish();
_sendQueue.add([message, null]);
- if (!_handle.writeEnabled()) {
- _handle.enableWriteEvents();
+ if (_sendQueue.length == 1) {
+ _eventStream.enableAllEvents();
}
}
+ int _getNextId() {
+ return _nextId++;
+ }
+
Future enqueueMessageWithRequestID(
Type t, int name, int id, int flags, Object msg) {
+ if (id == -1) {
+ id = _getNextId();
+ }
+
var builder = new MessageWithRequestIDBuilder(
name, align(getEncodedSize(t)), id, flags);
builder.encodeStruct(t, msg);
@@ -94,13 +130,15 @@ abstract class Client {
var completer = new Completer();
_sendQueue.add([message, completer]);
- if (!_handle.writeEnabled()) {
- _handle.enableWriteEvents();
+ if (_sendQueue.length == 1) {
+ _eventStream.enableAllEvents();
+ } else {
+ _eventStream.enableReadEvents();
}
return completer.future;
}
// Need a getter for this for access in subclasses.
- List get completerQueue => _completerQueue;
+ Map<int, Completer> get completerMap => _completerMap;
bool get isOpen => _isOpen;
}
« no previous file with comments | « mojo/public/dart/src/buffer.dart ('k') | mojo/public/dart/src/codec.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698