Chromium Code Reviews| Index: mojo/public/dart/src/client.dart |
| diff --git a/mojo/public/dart/src/client.dart b/mojo/public/dart/src/client.dart |
| index 956a6525d6dc7e10331b57aa2e84f8d8209c48b9..767df912f3d42089e7c4b2161e5f4eaa73df6051 100644 |
| --- a/mojo/public/dart/src/client.dart |
| +++ b/mojo/public/dart/src/client.dart |
| @@ -8,71 +8,106 @@ abstract class Client { |
| core.MojoMessagePipeEndpoint _endpoint; |
| core.MojoHandle _handle; |
| List _sendQueue; |
| - List _completerQueue; |
| + Map<int, Completer> _completerMap; |
| bool _isOpen = false; |
| + int _nextId = 0; |
| void handleResponse(MessageReader reader); |
| - Client(this._endpoint) { |
| + Client(core.MojoMessagePipeEndpoint endpoint) : |
| + _sendQueue = [], |
| + _completerMap = {}, |
| + _endpoint = endpoint, |
| + _handle = new core.MojoHandle(endpoint.handle); |
| + |
| + Client.fromHandle(int handle) { |
| _sendQueue = []; |
| - _completerQueue = []; |
| + _completerMap = {}; |
| + _endpoint = |
| + new core.MojoMessagePipeEndpoint(new core.RawMojoHandle(handle)); |
| _handle = new core.MojoHandle(_endpoint.handle); |
| } |
| - void open() { |
| - _handle.listen((int mojoSignal) { |
| - if (core.MojoHandleSignals.isReadable(mojoSignal)) { |
| - // Query how many bytes are available. |
| - var result = _endpoint.query(); |
| - if (!result.status.isOk && !result.status.isResourceExhausted) { |
| - // If something else happens, it means the handle wasn't really ready |
| - // for reading, which indicates a bug in MojoHandle or the |
| - // handle watcher. |
| - throw new Exception("message pipe query failed: ${result.status}"); |
| - } |
| + void _doRead() { |
| + // Query how many bytes are available. |
| + var result = _endpoint.query(); |
| + if (!result.status.isOk && !result.status.isResourceExhausted) { |
| + // If something else happens, it means the handle wasn't really ready |
| + // for reading, which indicates a bug in MojoHandle or the |
| + // handle watcher. |
| + throw "message pipe query failed: ${result.status}"; |
| + } |
| - // Read the data. |
| - var bytes = new ByteData(result.bytesRead); |
| - var handles = new List<core.RawMojoHandle>(result.handlesRead); |
| - result = _endpoint.read(bytes, result.bytesRead, handles); |
| - if (!result.status.isOk && !result.status.isResourceExhausted) { |
| - throw new Exception("message pipe read failed: ${result.status}"); |
| - } |
| - var message = new Message(bytes, handles); |
| - var reader = new MessageReader(message); |
| + // Read the data. |
| + var bytes = new ByteData(result.bytesRead); |
| + var handles = new List<core.RawMojoHandle>(result.handlesRead); |
| + result = _endpoint.read(bytes, result.bytesRead, handles); |
| + if (!result.status.isOk && !result.status.isResourceExhausted) { |
| + throw "message pipe read failed: ${result.status}"; |
| + } |
| + var message = new Message(bytes, handles); |
| + var reader = new MessageReader(message); |
| - handleResponse(reader); |
| + handleResponse(reader); |
| + } |
| + |
| + void _doWrite() { |
| + if (_sendQueue.length > 0) { |
| + List messageCompleter = _sendQueue.removeAt(0); |
| + Message message = messageCompleter[0]; |
| + Completer completer = messageCompleter[1]; |
| + _endpoint.write(message.buffer, |
| + message.buffer.lengthInBytes, |
| + message.handles); |
| + if (!_endpoint.status.isOk) { |
| + throw "message pipe write failed"; |
| } |
| - if (core.MojoHandleSignals.isWritable(mojoSignal)) { |
| - if (_sendQueue.length > 0) { |
| - List messageCompleter = _sendQueue.removeAt(0); |
| - _endpoint.write(messageCompleter[0].buffer, |
| - messageCompleter[0].buffer.lengthInBytes, |
| - messageCompleter[0].handles); |
| - if (!_endpoint.status.isOk) { |
| - throw new Exception("message pipe write failed"); |
| - } |
| - if (messageCompleter[1] != null) { |
| - _completerQueue.add(messageCompleter[1]); |
| - } |
| + if (completer != null) { |
| + if (!message.expectsResponse) { |
| + throw "Message has a completer, but does not expect a response"; |
| } |
| - if ((_sendQueue.length == 0) && _handle.writeEnabled()) { |
| - _handle.disableWriteEvents(); |
| + int requestID = message.requestID; |
| + if (_completerMap[requestID] != null) { |
| + throw "Request ID $requestID is already in use."; |
| } |
| + _completerMap[requestID] = completer; |
| + } |
| + } |
| + } |
| + |
| + void open() { |
| + _handle.listen((List<int> event) { |
| + var signalsWatched = new core.MojoHandleSignals(event[0]); |
| + var signalsReceived = new core.MojoHandleSignals(event[1]); |
| + if (signalsReceived.isPeerClosed) { |
| + close(); |
| + return; |
| + } |
| + |
| + if (signalsReceived.isReadable) { |
| + _doRead(); |
| } |
| - if (core.MojoHandleSignals.isNone(mojoSignal)) { |
| - // The handle watcher will send MojoHandleSignals.NONE if the other |
| - // endpoint of the pipe is closed. |
| - _handle.close(); |
| + |
| + if (signalsReceived.isWritable) { |
| + _doWrite(); |
| + } |
| + |
| + if (_sendQueue.length == 0) { |
| + var withoutWritable = signalsWatched - core.MojoHandleSignals.WRITABLE; |
| + _handle.enableSignals(withoutWritable); |
| + } else { |
| + _handle.enableSignals(signalsWatched); |
| } |
| }); |
| _isOpen = true; |
| } |
| void close() { |
| - assert(isOpen); |
| - _handle.close(); |
| - _isOpen = false; |
| + if (_isOpen) { |
| + _handle.close(); |
| + _handle = null; |
| + _isOpen = false; |
| + } |
| } |
| void enqueueMessage(Type t, int name, Object msg) { |
| @@ -80,13 +115,26 @@ abstract class Client { |
| builder.encodeStruct(t, msg); |
| var message = builder.finish(); |
| _sendQueue.add([message, null]); |
| - if (!_handle.writeEnabled()) { |
| - _handle.enableWriteEvents(); |
| + if (_sendQueue.length == 1) { |
| + _handle.enableAllEvents(); |
| + } |
| + } |
| + |
| + int _getNextId() { |
| + int next = _nextId; |
| + _nextId++; |
| + if (_nextId.bitLength > 64) { |
|
siva
2014/12/29 23:20:44
maybe document what this magic number 64 is.
zra
2014/12/30 16:29:33
Added a comment.
|
| + _nextId = 1; |
| } |
| + return next; |
| } |
| Future enqueueMessageWithRequestID( |
| Type t, int name, int id, int flags, Object msg) { |
| + if (id == -1) { |
| + id = _getNextId(); |
| + } |
| + |
| var builder = new MessageWithRequestIDBuilder( |
| name, align(getEncodedSize(t)), id, flags); |
| builder.encodeStruct(t, msg); |
| @@ -94,13 +142,15 @@ abstract class Client { |
| var completer = new Completer(); |
| _sendQueue.add([message, completer]); |
| - if (!_handle.writeEnabled()) { |
| - _handle.enableWriteEvents(); |
| + if (_sendQueue.length == 1) { |
| + _handle.enableAllEvents(); |
| + } else { |
| + _handle.enableReadEvents(); |
| } |
| return completer.future; |
| } |
| // Need a getter for this for access in subclasses. |
| - List get completerQueue => _completerQueue; |
| + Map<int, Completer> get completerMap => _completerMap; |
| bool get isOpen => _isOpen; |
| } |