Chromium Code Reviews| Index: mojo/public/dart/src/interface.dart |
| diff --git a/mojo/public/dart/src/interface.dart b/mojo/public/dart/src/interface.dart |
| index 8d8970a0d4b3a28ad2cea1da0b9ce7afc8ecf416..c51f77dfddad2e994927bc98ca58ddb7c113f8c9 100644 |
| --- a/mojo/public/dart/src/interface.dart |
| +++ b/mojo/public/dart/src/interface.dart |
| @@ -10,72 +10,110 @@ abstract class Interface { |
| List _sendQueue; |
| bool _isOpen; |
| - Message handleMessage(MessageReader reader); |
| + Future<Message> handleMessage(MessageReader reader); |
| - Interface(this._endpoint) { |
| + Interface(core.MojoMessagePipeEndpoint endpoint) : |
| + _endpoint = endpoint, |
| + _sendQueue = [], |
| + _handle = new core.MojoHandle(endpoint.handle), |
| + _isOpen = false; |
| + |
| + Interface.fromHandle(int handle) { |
| + _endpoint = |
| + new core.MojoMessagePipeEndpoint(new core.RawMojoHandle(handle)); |
| _sendQueue = []; |
| _handle = new core.MojoHandle(_endpoint.handle); |
| _isOpen = false; |
| } |
| + void _doRead() { |
| + assert(_handle.readyRead); |
| + |
| + // Query how many bytes are available. |
| + var result = _endpoint.query(); |
| + if (!result.status.isOk && !result.status.isResourceExhausted) { |
| + // If something else happens, it means the handle wasn't really ready |
| + // for reading, which indicates a bug in MojoHandle or the |
| + // handle watcher. |
| + throw "message pipe query failed: ${result.status} for $_handle"; |
| + } |
|
abarth-chromium
2015/01/04 08:46:07
If these conditions represent bugs, do we need to
zra
2015/01/05 15:45:41
No. Changed to asserts here and in client.dart.
|
| + |
| + // Read the data and view as a message. |
| + var bytes = new ByteData(result.bytesRead); |
| + var handles = new List<core.RawMojoHandle>(result.handlesRead); |
| + result = _endpoint.read(bytes, result.bytesRead, handles); |
| + if (!result.status.isOk && !result.status.isResourceExhausted) { |
| + // If something else happens, it means the handle wasn't really ready |
| + // for reading, which indicates a bug in MojoHandle or the |
| + // handle watcher. |
| + throw "message pipe read failed: ${result.status}"; |
| + } |
| + var message = new Message(bytes, handles); |
| + var reader = new MessageReader(message); |
| + |
| + // Prepare the response. |
| + var responseFuture = handleMessage(reader); |
| + |
| + // If there's a response, queue it up for sending. |
| + if (responseFuture != null) { |
| + responseFuture.then((response) { |
| + _sendQueue.add(response); |
| + if (_sendQueue.length == 1) { |
| + _handle.enableWriteEvents(); |
| + } |
| + }); |
| + } |
| + } |
| + |
| + void _doWrite() { |
| + if (_sendQueue.length > 0) { |
| + assert(_handle.readyWrite); |
| + var responseMessage = _sendQueue.removeAt(0); |
| + _endpoint.write(responseMessage.buffer, |
| + responseMessage.buffer.lengthInBytes, |
| + responseMessage.handles); |
| + if (!_endpoint.status.isOk) { |
| + throw "message pipe write failed: ${_endpoint.status}"; |
| + } |
| + } |
| + } |
| + |
| StreamSubscription<int> listen() { |
| _isOpen = true; |
| - return _handle.listen((int mojoSignal) { |
| - if (core.MojoHandleSignals.isReadable(mojoSignal)) { |
| - // Query how many bytes are available. |
| - var result = _endpoint.query(); |
| - if (!result.status.isOk && !result.status.isResourceExhausted) { |
| - // If something else happens, it means the handle wasn't really ready |
| - // for reading, which indicates a bug in MojoHandle or the |
| - // event listener. |
| - throw new Exception("message pipe query failed: ${result.status}"); |
| - } |
| + return _handle.listen((List<int> event) { |
| + var signalsWatched = new core.MojoHandleSignals(event[0]); |
| + var signalsReceived = new core.MojoHandleSignals(event[1]); |
| + if (signalsReceived.isPeerClosed) { |
| + close(); |
| + return; |
| + } |
| - // Read the data and view as a message. |
| - var bytes = new ByteData(result.bytesRead); |
| - var handles = new List<core.RawMojoHandle>(result.handlesRead); |
| - result = _endpoint.read(bytes, result.bytesRead, handles); |
| - if (!result.status.isOk && !result.status.isResourceExhausted) { |
| - // If something else happens, it means the handle wasn't really ready |
| - // for reading, which indicates a bug in MojoHandle or the |
| - // event listener. |
| - throw new Exception("message pipe read failed: ${result.status}"); |
| - } |
| - var message = new Message(bytes, handles); |
| - var reader = new MessageReader(message); |
| - |
| - // Prepare the response. |
| - var responseMessage = handleMessage(reader); |
| - // If there's a response, queue it up for sending. |
| - if (responseMessage != null) { |
| - _sendQueue.add(responseMessage); |
| - if ((_sendQueue.length > 0) && !_handle.writeEnabled()) { |
| - _handle.enableWriteEvents(); |
| - } |
| - } |
| + if (signalsReceived.isReadable) { |
| + _doRead(); |
| } |
| - if (core.MojoHandleSignals.isWritable(mojoSignal)) { |
| - if (_sendQueue.length > 0) { |
| - var responseMessage = _sendQueue.removeAt(0); |
| - _endpoint.write(responseMessage.buffer, |
| - responseMessage.buffer.lengthInBytes, |
| - responseMessage.handles); |
| - if (!_endpoint.status.isOk) { |
| - throw new Exception("message pipe write failed"); |
| - } |
| - } |
| - if ((_sendQueue.length == 0) && _handle.writeEnabled()) { |
| - _handle.disableWriteEvents(); |
| - } |
| + |
| + if (signalsReceived.isWritable) { |
| + _doWrite(); |
| } |
| - if (core.MojoHandleSignals.isNone(mojoSignal)) { |
| - // The handle watcher will send MojoHandleSignals.NONE when the other |
| - // endpoint of the pipe is closed. |
| - _handle.close(); |
| + |
| + if (_sendQueue.length == 0) { |
| + var withoutWritable = signalsWatched - core.MojoHandleSignals.WRITABLE; |
| + _handle.enableSignals(withoutWritable); |
| + } else { |
| + _handle.enableSignals(signalsWatched); |
| } |
| }); |
| } |
| + void close() { |
| + // TODO(zra): Cancel outstanding Futures started in _doRead? |
| + if (_isOpen) { |
| + _handle.close(); |
| + _isOpen = false; |
| + _handle = null; |
| + } |
| + } |
| + |
| Message buildResponse(Type t, int name, Object response) { |
| var builder = new MessageBuilder(name, align(getEncodedSize(t))); |
| builder.encodeStruct(t, response); |
| @@ -95,14 +133,14 @@ abstract class Interface { |
| builder.encodeStruct(t, msg); |
| var message = builder.finish(); |
| _sendQueue.add(message); |
| - if (!_handle.writeEnabled()) { |
| - _handle.enableWriteEvents(); |
| - } |
| + _handle.enableWriteEvents(); |
| } |
| Future enqueueMessageWithRequestID(Type t, int name, int id, Object msg) { |
| - throw new Exception("The client Mixin should not expect a response"); |
| + // TODO(zra): Is this correct? |
| + throw "The client interface should not expect a response"; |
| } |
| bool get isOpen => _isOpen; |
| + core.MojoMessagePipeEndpoint get endpoint => _endpoint; |
| } |