| Index: mojo/public/dart/src/client.dart
|
| diff --git a/mojo/public/dart/src/client.dart b/mojo/public/dart/src/client.dart
|
| index 956a6525d6dc7e10331b57aa2e84f8d8209c48b9..04462b344b8878bba67213c9057923bf006090a8 100644
|
| --- a/mojo/public/dart/src/client.dart
|
| +++ b/mojo/public/dart/src/client.dart
|
| @@ -6,73 +6,101 @@ part of bindings;
|
|
|
| abstract class Client {
|
| core.MojoMessagePipeEndpoint _endpoint;
|
| - core.MojoHandle _handle;
|
| + core.MojoEventStream _eventStream;
|
| List _sendQueue;
|
| - List _completerQueue;
|
| + Map<int, Completer> _completerMap;
|
| bool _isOpen = false;
|
| + int _nextId = 0;
|
|
|
| void handleResponse(MessageReader reader);
|
|
|
| - Client(this._endpoint) {
|
| + Client(core.MojoMessagePipeEndpoint endpoint) :
|
| + _sendQueue = [],
|
| + _completerMap = {},
|
| + _endpoint = endpoint,
|
| + _eventStream = new core.MojoEventStream(endpoint.handle);
|
| +
|
| + Client.fromHandle(int handle) {
|
| _sendQueue = [];
|
| - _completerQueue = [];
|
| - _handle = new core.MojoHandle(_endpoint.handle);
|
| + _completerMap = {};
|
| + _endpoint =
|
| + new core.MojoMessagePipeEndpoint(new core.MojoHandle(handle));
|
| + _eventStream = new core.MojoHandle(_endpoint.handle);
|
| }
|
|
|
| - void open() {
|
| - _handle.listen((int mojoSignal) {
|
| - if (core.MojoHandleSignals.isReadable(mojoSignal)) {
|
| - // Query how many bytes are available.
|
| - var result = _endpoint.query();
|
| - if (!result.status.isOk && !result.status.isResourceExhausted) {
|
| - // If something else happens, it means the handle wasn't really ready
|
| - // for reading, which indicates a bug in MojoHandle or the
|
| - // handle watcher.
|
| - throw new Exception("message pipe query failed: ${result.status}");
|
| - }
|
| + void _doRead() {
|
| + // Query how many bytes are available.
|
| + var result = _endpoint.query();
|
| + assert(result.status.isOk || result.status.isResourceExhausted);
|
|
|
| - // Read the data.
|
| - var bytes = new ByteData(result.bytesRead);
|
| - var handles = new List<core.RawMojoHandle>(result.handlesRead);
|
| - result = _endpoint.read(bytes, result.bytesRead, handles);
|
| - if (!result.status.isOk && !result.status.isResourceExhausted) {
|
| - throw new Exception("message pipe read failed: ${result.status}");
|
| - }
|
| - var message = new Message(bytes, handles);
|
| - var reader = new MessageReader(message);
|
| + // Read the data.
|
| + var bytes = new ByteData(result.bytesRead);
|
| + var handles = new List<core.MojoHandle>(result.handlesRead);
|
| + result = _endpoint.read(bytes, result.bytesRead, handles);
|
| + assert(result.status.isOk || result.status.isResourceExhausted);
|
| + var message = new Message(bytes, handles);
|
| + var reader = new MessageReader(message);
|
| +
|
| + handleResponse(reader);
|
| + }
|
|
|
| - handleResponse(reader);
|
| + void _doWrite() {
|
| + if (_sendQueue.length > 0) {
|
| + List messageCompleter = _sendQueue.removeAt(0);
|
| + Message message = messageCompleter[0];
|
| + Completer completer = messageCompleter[1];
|
| + _endpoint.write(message.buffer,
|
| + message.buffer.lengthInBytes,
|
| + message.handles);
|
| + if (!_endpoint.status.isOk) {
|
| + throw "message pipe write failed";
|
| }
|
| - if (core.MojoHandleSignals.isWritable(mojoSignal)) {
|
| - if (_sendQueue.length > 0) {
|
| - List messageCompleter = _sendQueue.removeAt(0);
|
| - _endpoint.write(messageCompleter[0].buffer,
|
| - messageCompleter[0].buffer.lengthInBytes,
|
| - messageCompleter[0].handles);
|
| - if (!_endpoint.status.isOk) {
|
| - throw new Exception("message pipe write failed");
|
| - }
|
| - if (messageCompleter[1] != null) {
|
| - _completerQueue.add(messageCompleter[1]);
|
| - }
|
| + if (completer != null) {
|
| + if (!message.expectsResponse) {
|
| + throw "Message has a completer, but does not expect a response";
|
| }
|
| - if ((_sendQueue.length == 0) && _handle.writeEnabled()) {
|
| - _handle.disableWriteEvents();
|
| + int requestID = message.requestID;
|
| + if (_completerMap[requestID] != null) {
|
| + throw "Request ID $requestID is already in use.";
|
| }
|
| + _completerMap[requestID] = completer;
|
| }
|
| - if (core.MojoHandleSignals.isNone(mojoSignal)) {
|
| - // The handle watcher will send MojoHandleSignals.NONE if the other
|
| - // endpoint of the pipe is closed.
|
| - _handle.close();
|
| + }
|
| + }
|
| +
|
| + void open() {
|
| + _eventStream.listen((List<int> event) {
|
| + var signalsWatched = new core.MojoHandleSignals(event[0]);
|
| + var signalsReceived = new core.MojoHandleSignals(event[1]);
|
| + if (signalsReceived.isPeerClosed) {
|
| + close();
|
| + return;
|
| + }
|
| +
|
| + if (signalsReceived.isReadable) {
|
| + _doRead();
|
| + }
|
| +
|
| + if (signalsReceived.isWritable) {
|
| + _doWrite();
|
| + }
|
| +
|
| + if (_sendQueue.length == 0) {
|
| + var withoutWritable = signalsWatched - core.MojoHandleSignals.WRITABLE;
|
| + _eventStream.enableSignals(withoutWritable);
|
| + } else {
|
| + _eventStream.enableSignals(signalsWatched);
|
| }
|
| });
|
| _isOpen = true;
|
| }
|
|
|
| void close() {
|
| - assert(isOpen);
|
| - _handle.close();
|
| - _isOpen = false;
|
| + if (_isOpen) {
|
| + _eventStream.close();
|
| + _eventStream = null;
|
| + _isOpen = false;
|
| + }
|
| }
|
|
|
| void enqueueMessage(Type t, int name, Object msg) {
|
| @@ -80,13 +108,21 @@ abstract class Client {
|
| builder.encodeStruct(t, msg);
|
| var message = builder.finish();
|
| _sendQueue.add([message, null]);
|
| - if (!_handle.writeEnabled()) {
|
| - _handle.enableWriteEvents();
|
| + if (_sendQueue.length == 1) {
|
| + _eventStream.enableAllEvents();
|
| }
|
| }
|
|
|
| + int _getNextId() {
|
| + return _nextId++;
|
| + }
|
| +
|
| Future enqueueMessageWithRequestID(
|
| Type t, int name, int id, int flags, Object msg) {
|
| + if (id == -1) {
|
| + id = _getNextId();
|
| + }
|
| +
|
| var builder = new MessageWithRequestIDBuilder(
|
| name, align(getEncodedSize(t)), id, flags);
|
| builder.encodeStruct(t, msg);
|
| @@ -94,13 +130,15 @@ abstract class Client {
|
|
|
| var completer = new Completer();
|
| _sendQueue.add([message, completer]);
|
| - if (!_handle.writeEnabled()) {
|
| - _handle.enableWriteEvents();
|
| + if (_sendQueue.length == 1) {
|
| + _eventStream.enableAllEvents();
|
| + } else {
|
| + _eventStream.enableReadEvents();
|
| }
|
| return completer.future;
|
| }
|
|
|
| // Need a getter for this for access in subclasses.
|
| - List get completerQueue => _completerQueue;
|
| + Map<int, Completer> get completerMap => _completerMap;
|
| bool get isOpen => _isOpen;
|
| }
|
|
|