Index: mojo/public/dart/src/client.dart |
diff --git a/mojo/public/dart/src/client.dart b/mojo/public/dart/src/client.dart |
index 956a6525d6dc7e10331b57aa2e84f8d8209c48b9..767df912f3d42089e7c4b2161e5f4eaa73df6051 100644 |
--- a/mojo/public/dart/src/client.dart |
+++ b/mojo/public/dart/src/client.dart |
@@ -8,71 +8,106 @@ abstract class Client { |
core.MojoMessagePipeEndpoint _endpoint; |
core.MojoHandle _handle; |
List _sendQueue; |
- List _completerQueue; |
+ Map<int, Completer> _completerMap; |
bool _isOpen = false; |
+ int _nextId = 0; |
void handleResponse(MessageReader reader); |
- Client(this._endpoint) { |
+ Client(core.MojoMessagePipeEndpoint endpoint) : |
+ _sendQueue = [], |
+ _completerMap = {}, |
+ _endpoint = endpoint, |
+ _handle = new core.MojoHandle(endpoint.handle); |
+ |
+ Client.fromHandle(int handle) { |
_sendQueue = []; |
- _completerQueue = []; |
+ _completerMap = {}; |
+ _endpoint = |
+ new core.MojoMessagePipeEndpoint(new core.RawMojoHandle(handle)); |
_handle = new core.MojoHandle(_endpoint.handle); |
} |
- void open() { |
- _handle.listen((int mojoSignal) { |
- if (core.MojoHandleSignals.isReadable(mojoSignal)) { |
- // Query how many bytes are available. |
- var result = _endpoint.query(); |
- if (!result.status.isOk && !result.status.isResourceExhausted) { |
- // If something else happens, it means the handle wasn't really ready |
- // for reading, which indicates a bug in MojoHandle or the |
- // handle watcher. |
- throw new Exception("message pipe query failed: ${result.status}"); |
- } |
+ void _doRead() { |
+ // Query how many bytes are available. |
+ var result = _endpoint.query(); |
+ if (!result.status.isOk && !result.status.isResourceExhausted) { |
+ // If something else happens, it means the handle wasn't really ready |
+ // for reading, which indicates a bug in MojoHandle or the |
+ // handle watcher. |
+ throw "message pipe query failed: ${result.status}"; |
+ } |
- // Read the data. |
- var bytes = new ByteData(result.bytesRead); |
- var handles = new List<core.RawMojoHandle>(result.handlesRead); |
- result = _endpoint.read(bytes, result.bytesRead, handles); |
- if (!result.status.isOk && !result.status.isResourceExhausted) { |
- throw new Exception("message pipe read failed: ${result.status}"); |
- } |
- var message = new Message(bytes, handles); |
- var reader = new MessageReader(message); |
+ // Read the data. |
+ var bytes = new ByteData(result.bytesRead); |
+ var handles = new List<core.RawMojoHandle>(result.handlesRead); |
+ result = _endpoint.read(bytes, result.bytesRead, handles); |
+ if (!result.status.isOk && !result.status.isResourceExhausted) { |
+ throw "message pipe read failed: ${result.status}"; |
+ } |
+ var message = new Message(bytes, handles); |
+ var reader = new MessageReader(message); |
- handleResponse(reader); |
+ handleResponse(reader); |
+ } |
+ |
+ void _doWrite() { |
+ if (_sendQueue.length > 0) { |
+ List messageCompleter = _sendQueue.removeAt(0); |
+ Message message = messageCompleter[0]; |
+ Completer completer = messageCompleter[1]; |
+ _endpoint.write(message.buffer, |
+ message.buffer.lengthInBytes, |
+ message.handles); |
+ if (!_endpoint.status.isOk) { |
+ throw "message pipe write failed"; |
} |
- if (core.MojoHandleSignals.isWritable(mojoSignal)) { |
- if (_sendQueue.length > 0) { |
- List messageCompleter = _sendQueue.removeAt(0); |
- _endpoint.write(messageCompleter[0].buffer, |
- messageCompleter[0].buffer.lengthInBytes, |
- messageCompleter[0].handles); |
- if (!_endpoint.status.isOk) { |
- throw new Exception("message pipe write failed"); |
- } |
- if (messageCompleter[1] != null) { |
- _completerQueue.add(messageCompleter[1]); |
- } |
+ if (completer != null) { |
+ if (!message.expectsResponse) { |
+ throw "Message has a completer, but does not expect a response"; |
} |
- if ((_sendQueue.length == 0) && _handle.writeEnabled()) { |
- _handle.disableWriteEvents(); |
+ int requestID = message.requestID; |
+ if (_completerMap[requestID] != null) { |
+ throw "Request ID $requestID is already in use."; |
} |
+ _completerMap[requestID] = completer; |
+ } |
+ } |
+ } |
+ |
+ void open() { |
+ _handle.listen((List<int> event) { |
+ var signalsWatched = new core.MojoHandleSignals(event[0]); |
+ var signalsReceived = new core.MojoHandleSignals(event[1]); |
+ if (signalsReceived.isPeerClosed) { |
+ close(); |
+ return; |
+ } |
+ |
+ if (signalsReceived.isReadable) { |
+ _doRead(); |
} |
- if (core.MojoHandleSignals.isNone(mojoSignal)) { |
- // The handle watcher will send MojoHandleSignals.NONE if the other |
- // endpoint of the pipe is closed. |
- _handle.close(); |
+ |
+ if (signalsReceived.isWritable) { |
+ _doWrite(); |
+ } |
+ |
+ if (_sendQueue.length == 0) { |
+ var withoutWritable = signalsWatched - core.MojoHandleSignals.WRITABLE; |
+ _handle.enableSignals(withoutWritable); |
+ } else { |
+ _handle.enableSignals(signalsWatched); |
} |
}); |
_isOpen = true; |
} |
void close() { |
- assert(isOpen); |
- _handle.close(); |
- _isOpen = false; |
+ if (_isOpen) { |
+ _handle.close(); |
+ _handle = null; |
+ _isOpen = false; |
+ } |
} |
void enqueueMessage(Type t, int name, Object msg) { |
@@ -80,13 +115,26 @@ abstract class Client { |
builder.encodeStruct(t, msg); |
var message = builder.finish(); |
_sendQueue.add([message, null]); |
- if (!_handle.writeEnabled()) { |
- _handle.enableWriteEvents(); |
+ if (_sendQueue.length == 1) { |
+ _handle.enableAllEvents(); |
+ } |
+ } |
+ |
+ int _getNextId() { |
+ int next = _nextId; |
+ _nextId++; |
+ if (_nextId.bitLength > 64) { |
siva
2014/12/29 23:20:44
maybe document what this magic number 64 is.
zra
2014/12/30 16:29:33
Added a comment.
|
+ _nextId = 1; |
} |
+ return next; |
} |
Future enqueueMessageWithRequestID( |
Type t, int name, int id, int flags, Object msg) { |
+ if (id == -1) { |
+ id = _getNextId(); |
+ } |
+ |
var builder = new MessageWithRequestIDBuilder( |
name, align(getEncodedSize(t)), id, flags); |
builder.encodeStruct(t, msg); |
@@ -94,13 +142,15 @@ abstract class Client { |
var completer = new Completer(); |
_sendQueue.add([message, completer]); |
- if (!_handle.writeEnabled()) { |
- _handle.enableWriteEvents(); |
+ if (_sendQueue.length == 1) { |
+ _handle.enableAllEvents(); |
+ } else { |
+ _handle.enableReadEvents(); |
} |
return completer.future; |
} |
// Need a getter for this for access in subclasses. |
- List get completerQueue => _completerQueue; |
+ Map<int, Completer> get completerMap => _completerMap; |
bool get isOpen => _isOpen; |
} |