| Index: mojo/public/dart/src/client.dart
|
| diff --git a/mojo/public/dart/src/client.dart b/mojo/public/dart/src/client.dart
|
| index 956a6525d6dc7e10331b57aa2e84f8d8209c48b9..9ae6be634b05f0c95e43c5a867306327eae235bc 100644
|
| --- a/mojo/public/dart/src/client.dart
|
| +++ b/mojo/public/dart/src/client.dart
|
| @@ -6,101 +6,133 @@ part of bindings;
|
|
|
| abstract class Client {
|
| core.MojoMessagePipeEndpoint _endpoint;
|
| - core.MojoHandle _handle;
|
| + core.MojoEventStream _eventStream;
|
| List _sendQueue;
|
| - List _completerQueue;
|
| + Map<int, Completer> _completerMap;
|
| bool _isOpen = false;
|
| + int _nextId = 0;
|
|
|
| - void handleResponse(MessageReader reader);
|
| + void handleResponse(ServiceMessage reader);
|
|
|
| - Client(this._endpoint) {
|
| + Client(core.MojoMessagePipeEndpoint endpoint) :
|
| + _sendQueue = [],
|
| + _completerMap = {},
|
| + _endpoint = endpoint,
|
| + _eventStream = new core.MojoEventStream(endpoint.handle);
|
| +
|
| + Client.fromHandle(int handle) {
|
| _sendQueue = [];
|
| - _completerQueue = [];
|
| - _handle = new core.MojoHandle(_endpoint.handle);
|
| + _completerMap = {};
|
| + _endpoint =
|
| + new core.MojoMessagePipeEndpoint(new core.MojoHandle(handle));
|
| + _eventStream = new core.MojoHandle(_endpoint.handle);
|
| }
|
|
|
| - void open() {
|
| - _handle.listen((int mojoSignal) {
|
| - if (core.MojoHandleSignals.isReadable(mojoSignal)) {
|
| - // Query how many bytes are available.
|
| - var result = _endpoint.query();
|
| - if (!result.status.isOk && !result.status.isResourceExhausted) {
|
| - // If something else happens, it means the handle wasn't really ready
|
| - // for reading, which indicates a bug in MojoHandle or the
|
| - // handle watcher.
|
| - throw new Exception("message pipe query failed: ${result.status}");
|
| - }
|
| + void _doRead() {
|
| + // Query how many bytes are available.
|
| + var result = _endpoint.query();
|
| + assert(result.status.isOk || result.status.isResourceExhausted);
|
|
|
| - // Read the data.
|
| - var bytes = new ByteData(result.bytesRead);
|
| - var handles = new List<core.RawMojoHandle>(result.handlesRead);
|
| - result = _endpoint.read(bytes, result.bytesRead, handles);
|
| - if (!result.status.isOk && !result.status.isResourceExhausted) {
|
| - throw new Exception("message pipe read failed: ${result.status}");
|
| - }
|
| - var message = new Message(bytes, handles);
|
| - var reader = new MessageReader(message);
|
| + // Read the data.
|
| + var bytes = new ByteData(result.bytesRead);
|
| + var handles = new List<core.MojoHandle>(result.handlesRead);
|
| + result = _endpoint.read(bytes, result.bytesRead, handles);
|
| + assert(result.status.isOk || result.status.isResourceExhausted);
|
| + var message = new ServiceMessage.fromMessage(new Message(bytes, handles));
|
| + handleResponse(message);
|
| + }
|
|
|
| - handleResponse(reader);
|
| + void _doWrite() {
|
| + if (_sendQueue.length > 0) {
|
| + List messageCompleter = _sendQueue.removeAt(0);
|
| + ServiceMessage message = messageCompleter[0];
|
| + Completer completer = messageCompleter[1];
|
| + _endpoint.write(message.buffer,
|
| + message.buffer.lengthInBytes,
|
| + message.handles);
|
| + if (!_endpoint.status.isOk) {
|
| + throw "message pipe write failed";
|
| }
|
| - if (core.MojoHandleSignals.isWritable(mojoSignal)) {
|
| - if (_sendQueue.length > 0) {
|
| - List messageCompleter = _sendQueue.removeAt(0);
|
| - _endpoint.write(messageCompleter[0].buffer,
|
| - messageCompleter[0].buffer.lengthInBytes,
|
| - messageCompleter[0].handles);
|
| - if (!_endpoint.status.isOk) {
|
| - throw new Exception("message pipe write failed");
|
| - }
|
| - if (messageCompleter[1] != null) {
|
| - _completerQueue.add(messageCompleter[1]);
|
| - }
|
| + if (completer != null) {
|
| + if (!message.header.hasRequestId) {
|
| + throw "Message has a completer, but does not expect a response";
|
| }
|
| - if ((_sendQueue.length == 0) && _handle.writeEnabled()) {
|
| - _handle.disableWriteEvents();
|
| + int requestId = message.header.requestId;
|
| + if (_completerMap[requestId] != null) {
|
| + throw "Request Id $requestId is already in use.";
|
| }
|
| + _completerMap[requestId] = completer;
|
| }
|
| - if (core.MojoHandleSignals.isNone(mojoSignal)) {
|
| - // The handle watcher will send MojoHandleSignals.NONE if the other
|
| - // endpoint of the pipe is closed.
|
| - _handle.close();
|
| + }
|
| + }
|
| +
|
| + void open() {
|
| + _eventStream.listen((List<int> event) {
|
| + var signalsWatched = new core.MojoHandleSignals(event[0]);
|
| + var signalsReceived = new core.MojoHandleSignals(event[1]);
|
| + if (signalsReceived.isPeerClosed) {
|
| + close();
|
| + return;
|
| + }
|
| +
|
| + if (signalsReceived.isReadable) {
|
| + _doRead();
|
| + }
|
| +
|
| + if (signalsReceived.isWritable) {
|
| + _doWrite();
|
| + }
|
| +
|
| + if (_sendQueue.length == 0) {
|
| + var withoutWritable = signalsWatched - core.MojoHandleSignals.WRITABLE;
|
| + _eventStream.enableSignals(withoutWritable);
|
| + } else {
|
| + _eventStream.enableSignals(signalsWatched);
|
| }
|
| });
|
| _isOpen = true;
|
| }
|
|
|
| void close() {
|
| - assert(isOpen);
|
| - _handle.close();
|
| - _isOpen = false;
|
| + if (_isOpen) {
|
| + _eventStream.close();
|
| + _eventStream = null;
|
| + _isOpen = false;
|
| + }
|
| }
|
|
|
| - void enqueueMessage(Type t, int name, Object msg) {
|
| - var builder = new MessageBuilder(name, align(getEncodedSize(t)));
|
| - builder.encodeStruct(t, msg);
|
| - var message = builder.finish();
|
| - _sendQueue.add([message, null]);
|
| - if (!_handle.writeEnabled()) {
|
| - _handle.enableWriteEvents();
|
| + void enqueueMessage(Struct message, int name) {
|
| + var header = new MessageHeader(name);
|
| + var serviceMessage = message.serializeWithHeader(header);
|
| + _sendQueue.add([serviceMessage, null]);
|
| + if (_sendQueue.length == 1) {
|
| + _eventStream.enableAllEvents();
|
| }
|
| }
|
|
|
| - Future enqueueMessageWithRequestID(
|
| - Type t, int name, int id, int flags, Object msg) {
|
| - var builder = new MessageWithRequestIDBuilder(
|
| - name, align(getEncodedSize(t)), id, flags);
|
| - builder.encodeStruct(t, msg);
|
| - var message = builder.finish();
|
| + int _getNextId() {
|
| + return _nextId++;
|
| + }
|
| +
|
| + Future enqueueMessageWithRequestId(
|
| + Struct message, int name, int id, int flags) {
|
| + if (id == -1) {
|
| + id = _getNextId();
|
| + }
|
|
|
| + var header = new MessageHeader.withRequestId(name, flags, id);
|
| + var serviceMessage = message.serializeWithHeader(header);
|
| var completer = new Completer();
|
| - _sendQueue.add([message, completer]);
|
| - if (!_handle.writeEnabled()) {
|
| - _handle.enableWriteEvents();
|
| + _sendQueue.add([serviceMessage, completer]);
|
| + if (_sendQueue.length == 1) {
|
| + _eventStream.enableAllEvents();
|
| + } else {
|
| + _eventStream.enableReadEvents();
|
| }
|
| return completer.future;
|
| }
|
|
|
| // Need a getter for this for access in subclasses.
|
| - List get completerQueue => _completerQueue;
|
| + Map<int, Completer> get completerMap => _completerMap;
|
| bool get isOpen => _isOpen;
|
| }
|
|
|