| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 part of bindings; | 5 part of bindings; |
| 6 | 6 |
| 7 abstract class Client { | 7 abstract class Client { |
| 8 core.MojoMessagePipeEndpoint _endpoint; | 8 core.MojoMessagePipeEndpoint _endpoint; |
| 9 core.MojoHandle _handle; | 9 core.MojoEventStream _eventStream; |
| 10 List _sendQueue; | 10 List _sendQueue; |
| 11 List _completerQueue; | 11 Map<int, Completer> _completerMap; |
| 12 bool _isOpen = false; | 12 bool _isOpen = false; |
| 13 int _nextId = 0; |
| 13 | 14 |
| 14 void handleResponse(MessageReader reader); | 15 void handleResponse(ServiceMessage reader); |
| 15 | 16 |
| 16 Client(this._endpoint) { | 17 Client(core.MojoMessagePipeEndpoint endpoint) : |
| 18 _sendQueue = [], |
| 19 _completerMap = {}, |
| 20 _endpoint = endpoint, |
| 21 _eventStream = new core.MojoEventStream(endpoint.handle); |
| 22 |
| 23 Client.fromHandle(int handle) { |
| 17 _sendQueue = []; | 24 _sendQueue = []; |
| 18 _completerQueue = []; | 25 _completerMap = {}; |
| 19 _handle = new core.MojoHandle(_endpoint.handle); | 26 _endpoint = |
| 27 new core.MojoMessagePipeEndpoint(new core.MojoHandle(handle)); |
| 28 _eventStream = new core.MojoHandle(_endpoint.handle); |
| 29 } |
| 30 |
| 31 void _doRead() { |
| 32 // Query how many bytes are available. |
| 33 var result = _endpoint.query(); |
| 34 assert(result.status.isOk || result.status.isResourceExhausted); |
| 35 |
| 36 // Read the data. |
| 37 var bytes = new ByteData(result.bytesRead); |
| 38 var handles = new List<core.MojoHandle>(result.handlesRead); |
| 39 result = _endpoint.read(bytes, result.bytesRead, handles); |
| 40 assert(result.status.isOk || result.status.isResourceExhausted); |
| 41 var message = new ServiceMessage.fromMessage(new Message(bytes, handles)); |
| 42 handleResponse(message); |
| 43 } |
| 44 |
| 45 void _doWrite() { |
| 46 if (_sendQueue.length > 0) { |
| 47 List messageCompleter = _sendQueue.removeAt(0); |
| 48 ServiceMessage message = messageCompleter[0]; |
| 49 Completer completer = messageCompleter[1]; |
| 50 _endpoint.write(message.buffer, |
| 51 message.buffer.lengthInBytes, |
| 52 message.handles); |
| 53 if (!_endpoint.status.isOk) { |
| 54 throw "message pipe write failed"; |
| 55 } |
| 56 if (completer != null) { |
| 57 if (!message.header.hasRequestId) { |
| 58 throw "Message has a completer, but does not expect a response"; |
| 59 } |
| 60 int requestId = message.header.requestId; |
| 61 if (_completerMap[requestId] != null) { |
| 62 throw "Request Id $requestId is already in use."; |
| 63 } |
| 64 _completerMap[requestId] = completer; |
| 65 } |
| 66 } |
| 20 } | 67 } |
| 21 | 68 |
| 22 void open() { | 69 void open() { |
| 23 _handle.listen((int mojoSignal) { | 70 _eventStream.listen((List<int> event) { |
| 24 if (core.MojoHandleSignals.isReadable(mojoSignal)) { | 71 var signalsWatched = new core.MojoHandleSignals(event[0]); |
| 25 // Query how many bytes are available. | 72 var signalsReceived = new core.MojoHandleSignals(event[1]); |
| 26 var result = _endpoint.query(); | 73 if (signalsReceived.isPeerClosed) { |
| 27 if (!result.status.isOk && !result.status.isResourceExhausted) { | 74 close(); |
| 28 // If something else happens, it means the handle wasn't really ready | 75 return; |
| 29 // for reading, which indicates a bug in MojoHandle or the | 76 } |
| 30 // handle watcher. | |
| 31 throw new Exception("message pipe query failed: ${result.status}"); | |
| 32 } | |
| 33 | 77 |
| 34 // Read the data. | 78 if (signalsReceived.isReadable) { |
| 35 var bytes = new ByteData(result.bytesRead); | 79 _doRead(); |
| 36 var handles = new List<core.RawMojoHandle>(result.handlesRead); | 80 } |
| 37 result = _endpoint.read(bytes, result.bytesRead, handles); | |
| 38 if (!result.status.isOk && !result.status.isResourceExhausted) { | |
| 39 throw new Exception("message pipe read failed: ${result.status}"); | |
| 40 } | |
| 41 var message = new Message(bytes, handles); | |
| 42 var reader = new MessageReader(message); | |
| 43 | 81 |
| 44 handleResponse(reader); | 82 if (signalsReceived.isWritable) { |
| 83 _doWrite(); |
| 45 } | 84 } |
| 46 if (core.MojoHandleSignals.isWritable(mojoSignal)) { | 85 |
| 47 if (_sendQueue.length > 0) { | 86 if (_sendQueue.length == 0) { |
| 48 List messageCompleter = _sendQueue.removeAt(0); | 87 var withoutWritable = signalsWatched - core.MojoHandleSignals.WRITABLE; |
| 49 _endpoint.write(messageCompleter[0].buffer, | 88 _eventStream.enableSignals(withoutWritable); |
| 50 messageCompleter[0].buffer.lengthInBytes, | 89 } else { |
| 51 messageCompleter[0].handles); | 90 _eventStream.enableSignals(signalsWatched); |
| 52 if (!_endpoint.status.isOk) { | |
| 53 throw new Exception("message pipe write failed"); | |
| 54 } | |
| 55 if (messageCompleter[1] != null) { | |
| 56 _completerQueue.add(messageCompleter[1]); | |
| 57 } | |
| 58 } | |
| 59 if ((_sendQueue.length == 0) && _handle.writeEnabled()) { | |
| 60 _handle.disableWriteEvents(); | |
| 61 } | |
| 62 } | |
| 63 if (core.MojoHandleSignals.isNone(mojoSignal)) { | |
| 64 // The handle watcher will send MojoHandleSignals.NONE if the other | |
| 65 // endpoint of the pipe is closed. | |
| 66 _handle.close(); | |
| 67 } | 91 } |
| 68 }); | 92 }); |
| 69 _isOpen = true; | 93 _isOpen = true; |
| 70 } | 94 } |
| 71 | 95 |
| 72 void close() { | 96 void close() { |
| 73 assert(isOpen); | 97 if (_isOpen) { |
| 74 _handle.close(); | 98 _eventStream.close(); |
| 75 _isOpen = false; | 99 _eventStream = null; |
| 76 } | 100 _isOpen = false; |
| 77 | |
| 78 void enqueueMessage(Type t, int name, Object msg) { | |
| 79 var builder = new MessageBuilder(name, align(getEncodedSize(t))); | |
| 80 builder.encodeStruct(t, msg); | |
| 81 var message = builder.finish(); | |
| 82 _sendQueue.add([message, null]); | |
| 83 if (!_handle.writeEnabled()) { | |
| 84 _handle.enableWriteEvents(); | |
| 85 } | 101 } |
| 86 } | 102 } |
| 87 | 103 |
| 88 Future enqueueMessageWithRequestID( | 104 void enqueueMessage(Struct message, int name) { |
| 89 Type t, int name, int id, int flags, Object msg) { | 105 var header = new MessageHeader(name); |
| 90 var builder = new MessageWithRequestIDBuilder( | 106 var serviceMessage = message.serializeWithHeader(header); |
| 91 name, align(getEncodedSize(t)), id, flags); | 107 _sendQueue.add([serviceMessage, null]); |
| 92 builder.encodeStruct(t, msg); | 108 if (_sendQueue.length == 1) { |
| 93 var message = builder.finish(); | 109 _eventStream.enableAllEvents(); |
| 110 } |
| 111 } |
| 94 | 112 |
| 113 int _getNextId() { |
| 114 return _nextId++; |
| 115 } |
| 116 |
| 117 Future enqueueMessageWithRequestId( |
| 118 Struct message, int name, int id, int flags) { |
| 119 if (id == -1) { |
| 120 id = _getNextId(); |
| 121 } |
| 122 |
| 123 var header = new MessageHeader.withRequestId(name, flags, id); |
| 124 var serviceMessage = message.serializeWithHeader(header); |
| 95 var completer = new Completer(); | 125 var completer = new Completer(); |
| 96 _sendQueue.add([message, completer]); | 126 _sendQueue.add([serviceMessage, completer]); |
| 97 if (!_handle.writeEnabled()) { | 127 if (_sendQueue.length == 1) { |
| 98 _handle.enableWriteEvents(); | 128 _eventStream.enableAllEvents(); |
| 129 } else { |
| 130 _eventStream.enableReadEvents(); |
| 99 } | 131 } |
| 100 return completer.future; | 132 return completer.future; |
| 101 } | 133 } |
| 102 | 134 |
| 103 // Need a getter for this for access in subclasses. | 135 // Need a getter for this for access in subclasses. |
| 104 List get completerQueue => _completerQueue; | 136 Map<int, Completer> get completerMap => _completerMap; |
| 105 bool get isOpen => _isOpen; | 137 bool get isOpen => _isOpen; |
| 106 } | 138 } |
| OLD | NEW |