Index: mojo/public/dart/src/interface.dart |
diff --git a/mojo/public/dart/src/interface.dart b/mojo/public/dart/src/interface.dart |
index 8d8970a0d4b3a28ad2cea1da0b9ce7afc8ecf416..f6999074e5eaad156f4be5843200da7b010a2772 100644 |
--- a/mojo/public/dart/src/interface.dart |
+++ b/mojo/public/dart/src/interface.dart |
@@ -10,72 +10,109 @@ abstract class Interface { |
List _sendQueue; |
bool _isOpen; |
- Message handleMessage(MessageReader reader); |
+ Future<Message> handleMessage(MessageReader reader); |
- Interface(this._endpoint) { |
+ Interface(core.MojoMessagePipeEndpoint endpoint) : |
+ _endpoint = endpoint, |
+ _sendQueue = [], |
+ _handle = new core.MojoHandle(endpoint.handle), |
+ _isOpen = false; |
+ |
+ Interface.fromHandle(int handle) { |
+ _endpoint = |
+ new core.MojoMessagePipeEndpoint(new core.RawMojoHandle(handle)); |
_sendQueue = []; |
_handle = new core.MojoHandle(_endpoint.handle); |
_isOpen = false; |
} |
+ void _doRead() { |
+ assert(_handle.readyRead); |
+ |
+ // Query how many bytes are available. |
+ var result = _endpoint.query(); |
+ if (!result.status.isOk && !result.status.isResourceExhausted) { |
+ // If something else happens, it means the handle wasn't really ready |
+ // for reading, which indicates a bug in MojoHandle or the |
+ // handle watcher. |
+ throw "message pipe query failed: ${result.status} for $_handle"; |
+ } |
+ |
+ // Read the data and view as a message. |
+ var bytes = new ByteData(result.bytesRead); |
+ var handles = new List<core.RawMojoHandle>(result.handlesRead); |
+ result = _endpoint.read(bytes, result.bytesRead, handles); |
+ if (!result.status.isOk && !result.status.isResourceExhausted) { |
+ // If something else happens, it means the handle wasn't really ready |
+ // for reading, which indicates a bug in MojoHandle or the |
+ // handle watcher. |
+ throw "message pipe read failed: ${result.status}"; |
+ } |
+ var message = new Message(bytes, handles); |
+ var reader = new MessageReader(message); |
+ |
+ // Prepare the response. |
+ var responseFuture = handleMessage(reader); |
+ |
+ // If there's a response, queue it up for sending. |
+ if (responseFuture != null) { |
+ responseFuture.then((response) { |
+ _sendQueue.add(response); |
+ if (_sendQueue.length == 1) { |
+ _handle.enableWriteEvents(); |
+ } |
+ }); |
+ } |
+ } |
+ |
+ void _doWrite() { |
+ if (_sendQueue.length > 0) { |
+ assert(_handle.readyWrite); |
+ var responseMessage = _sendQueue.removeAt(0); |
+ _endpoint.write(responseMessage.buffer, |
+ responseMessage.buffer.lengthInBytes, |
+ responseMessage.handles); |
+ if (!_endpoint.status.isOk) { |
+ throw "message pipe write failed: ${_endpoint.status}"; |
+ } |
+ } |
+ } |
+ |
StreamSubscription<int> listen() { |
_isOpen = true; |
- return _handle.listen((int mojoSignal) { |
- if (core.MojoHandleSignals.isReadable(mojoSignal)) { |
- // Query how many bytes are available. |
- var result = _endpoint.query(); |
- if (!result.status.isOk && !result.status.isResourceExhausted) { |
- // If something else happens, it means the handle wasn't really ready |
- // for reading, which indicates a bug in MojoHandle or the |
- // event listener. |
- throw new Exception("message pipe query failed: ${result.status}"); |
- } |
+ return _handle.listen((List<int> event) { |
+ var signalsWatched = new core.MojoHandleSignals(event[0]); |
+ var signalsReceived = new core.MojoHandleSignals(event[1]); |
+ if (signalsReceived.isPeerClosed) { |
+ close(); |
+ return; |
+ } |
- // Read the data and view as a message. |
- var bytes = new ByteData(result.bytesRead); |
- var handles = new List<core.RawMojoHandle>(result.handlesRead); |
- result = _endpoint.read(bytes, result.bytesRead, handles); |
- if (!result.status.isOk && !result.status.isResourceExhausted) { |
- // If something else happens, it means the handle wasn't really ready |
- // for reading, which indicates a bug in MojoHandle or the |
- // event listener. |
- throw new Exception("message pipe read failed: ${result.status}"); |
- } |
- var message = new Message(bytes, handles); |
- var reader = new MessageReader(message); |
- |
- // Prepare the response. |
- var responseMessage = handleMessage(reader); |
- // If there's a response, queue it up for sending. |
- if (responseMessage != null) { |
- _sendQueue.add(responseMessage); |
- if ((_sendQueue.length > 0) && !_handle.writeEnabled()) { |
- _handle.enableWriteEvents(); |
- } |
- } |
+ if (signalsReceived.isReadable) { |
+ _doRead(); |
} |
- if (core.MojoHandleSignals.isWritable(mojoSignal)) { |
- if (_sendQueue.length > 0) { |
- var responseMessage = _sendQueue.removeAt(0); |
- _endpoint.write(responseMessage.buffer, |
- responseMessage.buffer.lengthInBytes, |
- responseMessage.handles); |
- if (!_endpoint.status.isOk) { |
- throw new Exception("message pipe write failed"); |
- } |
- } |
- if ((_sendQueue.length == 0) && _handle.writeEnabled()) { |
- _handle.disableWriteEvents(); |
- } |
+ |
+ if (signalsReceived.isWritable) { |
+ _doWrite(); |
} |
- if (core.MojoHandleSignals.isNone(mojoSignal)) { |
- // The handle watcher will send MojoHandleSignals.NONE when the other |
- // endpoint of the pipe is closed. |
- _handle.close(); |
+ |
+ if (_sendQueue.length == 0) { |
+ var withoutWritable = signalsWatched - core.MojoHandleSignals.WRITABLE; |
+ _handle.enableSignals(withoutWritable); |
+ } else { |
+ _handle.enableSignals(signalsWatched); |
} |
}); |
} |
+ void close() { |
+ if (_isOpen) { |
+ _handle.close(); |
+ _isOpen = false; |
+ _handle = null; |
+ } |
+ } |
+ |
Message buildResponse(Type t, int name, Object response) { |
var builder = new MessageBuilder(name, align(getEncodedSize(t))); |
builder.encodeStruct(t, response); |
@@ -95,14 +132,14 @@ abstract class Interface { |
builder.encodeStruct(t, msg); |
var message = builder.finish(); |
_sendQueue.add(message); |
- if (!_handle.writeEnabled()) { |
- _handle.enableWriteEvents(); |
- } |
+ _handle.enableWriteEvents(); |
} |
Future enqueueMessageWithRequestID(Type t, int name, int id, Object msg) { |
- throw new Exception("The client Mixin should not expect a response"); |
+ // TODO(zra): Is this correct? |
+ throw "The client interface should not expect a response"; |
} |
bool get isOpen => _isOpen; |
+ core.MojoMessagePipeEndpoint get endpoint => _endpoint; |
} |