| Index: mojo/public/dart/src/interface.dart
|
| diff --git a/mojo/public/dart/src/interface.dart b/mojo/public/dart/src/interface.dart
|
| index 8d8970a0d4b3a28ad2cea1da0b9ce7afc8ecf416..0a8cfd4be6163af9bb275a3c9b895410bb51025b 100644
|
| --- a/mojo/public/dart/src/interface.dart
|
| +++ b/mojo/public/dart/src/interface.dart
|
| @@ -6,76 +6,104 @@ part of bindings;
|
|
|
| abstract class Interface {
|
| core.MojoMessagePipeEndpoint _endpoint;
|
| - core.MojoHandle _handle;
|
| + core.MojoEventStream _eventStream;
|
| List _sendQueue;
|
| bool _isOpen;
|
|
|
| - Message handleMessage(MessageReader reader);
|
| + Future<Message> handleMessage(MessageReader reader);
|
|
|
| - Interface(this._endpoint) {
|
| + Interface(core.MojoMessagePipeEndpoint endpoint) :
|
| + _endpoint = endpoint,
|
| + _sendQueue = [],
|
| + _eventStream = new core.MojoEventStream(endpoint.handle),
|
| + _isOpen = false;
|
| +
|
| + Interface.fromHandle(int handle) {
|
| + _endpoint =
|
| + new core.MojoMessagePipeEndpoint(new core.MojoHandle(handle));
|
| _sendQueue = [];
|
| - _handle = new core.MojoHandle(_endpoint.handle);
|
| + _eventStream = new core.MojoEventStream(_endpoint.handle);
|
| _isOpen = false;
|
| }
|
|
|
| + void _doRead() {
|
| + assert(_eventStream.readyRead);
|
| +
|
| + // Query how many bytes are available.
|
| + var result = _endpoint.query();
|
| + assert(result.status.isOk || result.status.isResourceExhausted);
|
| +
|
| + // Read the data and view as a message.
|
| + var bytes = new ByteData(result.bytesRead);
|
| + var handles = new List<core.MojoHandle>(result.handlesRead);
|
| + result = _endpoint.read(bytes, result.bytesRead, handles);
|
| + assert(result.status.isOk || result.status.isResourceExhausted);
|
| + var message = new Message(bytes, handles);
|
| + var reader = new MessageReader(message);
|
| +
|
| + // Prepare the response.
|
| + var responseFuture = handleMessage(reader);
|
| +
|
| + // If there's a response, queue it up for sending.
|
| + if (responseFuture != null) {
|
| + responseFuture.then((response) {
|
| + _sendQueue.add(response);
|
| + if (_sendQueue.length == 1) {
|
| + _eventStream.enableWriteEvents();
|
| + }
|
| + });
|
| + }
|
| + }
|
| +
|
| + void _doWrite() {
|
| + if (_sendQueue.length > 0) {
|
| + assert(_eventStream.readyWrite);
|
| + var responseMessage = _sendQueue.removeAt(0);
|
| + _endpoint.write(responseMessage.buffer,
|
| + responseMessage.buffer.lengthInBytes,
|
| + responseMessage.handles);
|
| + if (!_endpoint.status.isOk) {
|
| + throw "message pipe write failed: ${_endpoint.status}";
|
| + }
|
| + }
|
| + }
|
| +
|
| StreamSubscription<int> listen() {
|
| _isOpen = true;
|
| - return _handle.listen((int mojoSignal) {
|
| - if (core.MojoHandleSignals.isReadable(mojoSignal)) {
|
| - // Query how many bytes are available.
|
| - var result = _endpoint.query();
|
| - if (!result.status.isOk && !result.status.isResourceExhausted) {
|
| - // If something else happens, it means the handle wasn't really ready
|
| - // for reading, which indicates a bug in MojoHandle or the
|
| - // event listener.
|
| - throw new Exception("message pipe query failed: ${result.status}");
|
| - }
|
| + return _eventStream.listen((List<int> event) {
|
| + var signalsWatched = new core.MojoHandleSignals(event[0]);
|
| + var signalsReceived = new core.MojoHandleSignals(event[1]);
|
| + if (signalsReceived.isPeerClosed) {
|
| + close();
|
| + return;
|
| + }
|
|
|
| - // Read the data and view as a message.
|
| - var bytes = new ByteData(result.bytesRead);
|
| - var handles = new List<core.RawMojoHandle>(result.handlesRead);
|
| - result = _endpoint.read(bytes, result.bytesRead, handles);
|
| - if (!result.status.isOk && !result.status.isResourceExhausted) {
|
| - // If something else happens, it means the handle wasn't really ready
|
| - // for reading, which indicates a bug in MojoHandle or the
|
| - // event listener.
|
| - throw new Exception("message pipe read failed: ${result.status}");
|
| - }
|
| - var message = new Message(bytes, handles);
|
| - var reader = new MessageReader(message);
|
| -
|
| - // Prepare the response.
|
| - var responseMessage = handleMessage(reader);
|
| - // If there's a response, queue it up for sending.
|
| - if (responseMessage != null) {
|
| - _sendQueue.add(responseMessage);
|
| - if ((_sendQueue.length > 0) && !_handle.writeEnabled()) {
|
| - _handle.enableWriteEvents();
|
| - }
|
| - }
|
| + if (signalsReceived.isReadable) {
|
| + _doRead();
|
| }
|
| - if (core.MojoHandleSignals.isWritable(mojoSignal)) {
|
| - if (_sendQueue.length > 0) {
|
| - var responseMessage = _sendQueue.removeAt(0);
|
| - _endpoint.write(responseMessage.buffer,
|
| - responseMessage.buffer.lengthInBytes,
|
| - responseMessage.handles);
|
| - if (!_endpoint.status.isOk) {
|
| - throw new Exception("message pipe write failed");
|
| - }
|
| - }
|
| - if ((_sendQueue.length == 0) && _handle.writeEnabled()) {
|
| - _handle.disableWriteEvents();
|
| - }
|
| +
|
| + if (signalsReceived.isWritable) {
|
| + _doWrite();
|
| }
|
| - if (core.MojoHandleSignals.isNone(mojoSignal)) {
|
| - // The handle watcher will send MojoHandleSignals.NONE when the other
|
| - // endpoint of the pipe is closed.
|
| - _handle.close();
|
| +
|
| + if (_sendQueue.length == 0) {
|
| + var withoutWritable = signalsWatched - core.MojoHandleSignals.WRITABLE;
|
| + _eventStream.enableSignals(withoutWritable);
|
| + } else {
|
| + _eventStream.enableSignals(signalsWatched);
|
| }
|
| });
|
| }
|
|
|
| + void close() {
|
| + // TODO(zra): Cancel outstanding Futures started in _doRead?
|
| + if (_isOpen) {
|
| + _eventStream.close();
|
| + _isOpen = false;
|
| + _eventStream = null;
|
| + }
|
| + }
|
| +
|
| Message buildResponse(Type t, int name, Object response) {
|
| var builder = new MessageBuilder(name, align(getEncodedSize(t)));
|
| builder.encodeStruct(t, response);
|
| @@ -95,14 +123,14 @@ abstract class Interface {
|
| builder.encodeStruct(t, msg);
|
| var message = builder.finish();
|
| _sendQueue.add(message);
|
| - if (!_handle.writeEnabled()) {
|
| - _handle.enableWriteEvents();
|
| - }
|
| + _eventStream.enableWriteEvents();
|
| }
|
|
|
| Future enqueueMessageWithRequestID(Type t, int name, int id, Object msg) {
|
| - throw new Exception("The client Mixin should not expect a response");
|
| + // TODO(zra): Is this correct?
|
| + throw "The client interface should not expect a response";
|
| }
|
|
|
| bool get isOpen => _isOpen;
|
| + core.MojoMessagePipeEndpoint get endpoint => _endpoint;
|
| }
|
|
|