OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 part of bindings; | 5 part of bindings; |
6 | 6 |
7 abstract class Client { | 7 abstract class Client { |
8 core.MojoMessagePipeEndpoint _endpoint; | 8 core.MojoMessagePipeEndpoint _endpoint; |
9 core.MojoHandle _handle; | 9 core.MojoEventStream _eventStream; |
10 List _sendQueue; | 10 List _sendQueue; |
11 List _completerQueue; | 11 Map<int, Completer> _completerMap; |
12 bool _isOpen = false; | 12 bool _isOpen = false; |
| 13 int _nextId = 0; |
13 | 14 |
14 void handleResponse(MessageReader reader); | 15 void handleResponse(MessageReader reader); |
15 | 16 |
16 Client(this._endpoint) { | 17 Client(core.MojoMessagePipeEndpoint endpoint) : |
| 18 _sendQueue = [], |
| 19 _completerMap = {}, |
| 20 _endpoint = endpoint, |
| 21 _eventStream = new core.MojoEventStream(endpoint.handle); |
| 22 |
| 23 Client.fromHandle(int handle) { |
17 _sendQueue = []; | 24 _sendQueue = []; |
18 _completerQueue = []; | 25 _completerMap = {}; |
19 _handle = new core.MojoHandle(_endpoint.handle); | 26 _endpoint = |
| 27 new core.MojoMessagePipeEndpoint(new core.MojoHandle(handle)); |
| 28 _eventStream = new core.MojoHandle(_endpoint.handle); |
| 29 } |
| 30 |
| 31 void _doRead() { |
| 32 // Query how many bytes are available. |
| 33 var result = _endpoint.query(); |
| 34 assert(result.status.isOk || result.status.isResourceExhausted); |
| 35 |
| 36 // Read the data. |
| 37 var bytes = new ByteData(result.bytesRead); |
| 38 var handles = new List<core.MojoHandle>(result.handlesRead); |
| 39 result = _endpoint.read(bytes, result.bytesRead, handles); |
| 40 assert(result.status.isOk || result.status.isResourceExhausted); |
| 41 var message = new Message(bytes, handles); |
| 42 var reader = new MessageReader(message); |
| 43 |
| 44 handleResponse(reader); |
| 45 } |
| 46 |
| 47 void _doWrite() { |
| 48 if (_sendQueue.length > 0) { |
| 49 List messageCompleter = _sendQueue.removeAt(0); |
| 50 Message message = messageCompleter[0]; |
| 51 Completer completer = messageCompleter[1]; |
| 52 _endpoint.write(message.buffer, |
| 53 message.buffer.lengthInBytes, |
| 54 message.handles); |
| 55 if (!_endpoint.status.isOk) { |
| 56 throw "message pipe write failed"; |
| 57 } |
| 58 if (completer != null) { |
| 59 if (!message.expectsResponse) { |
| 60 throw "Message has a completer, but does not expect a response"; |
| 61 } |
| 62 int requestID = message.requestID; |
| 63 if (_completerMap[requestID] != null) { |
| 64 throw "Request ID $requestID is already in use."; |
| 65 } |
| 66 _completerMap[requestID] = completer; |
| 67 } |
| 68 } |
20 } | 69 } |
21 | 70 |
22 void open() { | 71 void open() { |
23 _handle.listen((int mojoSignal) { | 72 _eventStream.listen((List<int> event) { |
24 if (core.MojoHandleSignals.isReadable(mojoSignal)) { | 73 var signalsWatched = new core.MojoHandleSignals(event[0]); |
25 // Query how many bytes are available. | 74 var signalsReceived = new core.MojoHandleSignals(event[1]); |
26 var result = _endpoint.query(); | 75 if (signalsReceived.isPeerClosed) { |
27 if (!result.status.isOk && !result.status.isResourceExhausted) { | 76 close(); |
28 // If something else happens, it means the handle wasn't really ready | 77 return; |
29 // for reading, which indicates a bug in MojoHandle or the | 78 } |
30 // handle watcher. | |
31 throw new Exception("message pipe query failed: ${result.status}"); | |
32 } | |
33 | 79 |
34 // Read the data. | 80 if (signalsReceived.isReadable) { |
35 var bytes = new ByteData(result.bytesRead); | 81 _doRead(); |
36 var handles = new List<core.RawMojoHandle>(result.handlesRead); | 82 } |
37 result = _endpoint.read(bytes, result.bytesRead, handles); | |
38 if (!result.status.isOk && !result.status.isResourceExhausted) { | |
39 throw new Exception("message pipe read failed: ${result.status}"); | |
40 } | |
41 var message = new Message(bytes, handles); | |
42 var reader = new MessageReader(message); | |
43 | 83 |
44 handleResponse(reader); | 84 if (signalsReceived.isWritable) { |
| 85 _doWrite(); |
45 } | 86 } |
46 if (core.MojoHandleSignals.isWritable(mojoSignal)) { | 87 |
47 if (_sendQueue.length > 0) { | 88 if (_sendQueue.length == 0) { |
48 List messageCompleter = _sendQueue.removeAt(0); | 89 var withoutWritable = signalsWatched - core.MojoHandleSignals.WRITABLE; |
49 _endpoint.write(messageCompleter[0].buffer, | 90 _eventStream.enableSignals(withoutWritable); |
50 messageCompleter[0].buffer.lengthInBytes, | 91 } else { |
51 messageCompleter[0].handles); | 92 _eventStream.enableSignals(signalsWatched); |
52 if (!_endpoint.status.isOk) { | |
53 throw new Exception("message pipe write failed"); | |
54 } | |
55 if (messageCompleter[1] != null) { | |
56 _completerQueue.add(messageCompleter[1]); | |
57 } | |
58 } | |
59 if ((_sendQueue.length == 0) && _handle.writeEnabled()) { | |
60 _handle.disableWriteEvents(); | |
61 } | |
62 } | |
63 if (core.MojoHandleSignals.isNone(mojoSignal)) { | |
64 // The handle watcher will send MojoHandleSignals.NONE if the other | |
65 // endpoint of the pipe is closed. | |
66 _handle.close(); | |
67 } | 93 } |
68 }); | 94 }); |
69 _isOpen = true; | 95 _isOpen = true; |
70 } | 96 } |
71 | 97 |
72 void close() { | 98 void close() { |
73 assert(isOpen); | 99 if (_isOpen) { |
74 _handle.close(); | 100 _eventStream.close(); |
75 _isOpen = false; | 101 _eventStream = null; |
| 102 _isOpen = false; |
| 103 } |
76 } | 104 } |
77 | 105 |
78 void enqueueMessage(Type t, int name, Object msg) { | 106 void enqueueMessage(Type t, int name, Object msg) { |
79 var builder = new MessageBuilder(name, align(getEncodedSize(t))); | 107 var builder = new MessageBuilder(name, align(getEncodedSize(t))); |
80 builder.encodeStruct(t, msg); | 108 builder.encodeStruct(t, msg); |
81 var message = builder.finish(); | 109 var message = builder.finish(); |
82 _sendQueue.add([message, null]); | 110 _sendQueue.add([message, null]); |
83 if (!_handle.writeEnabled()) { | 111 if (_sendQueue.length == 1) { |
84 _handle.enableWriteEvents(); | 112 _eventStream.enableAllEvents(); |
85 } | 113 } |
86 } | 114 } |
87 | 115 |
| 116 int _getNextId() { |
| 117 return _nextId++; |
| 118 } |
| 119 |
88 Future enqueueMessageWithRequestID( | 120 Future enqueueMessageWithRequestID( |
89 Type t, int name, int id, int flags, Object msg) { | 121 Type t, int name, int id, int flags, Object msg) { |
| 122 if (id == -1) { |
| 123 id = _getNextId(); |
| 124 } |
| 125 |
90 var builder = new MessageWithRequestIDBuilder( | 126 var builder = new MessageWithRequestIDBuilder( |
91 name, align(getEncodedSize(t)), id, flags); | 127 name, align(getEncodedSize(t)), id, flags); |
92 builder.encodeStruct(t, msg); | 128 builder.encodeStruct(t, msg); |
93 var message = builder.finish(); | 129 var message = builder.finish(); |
94 | 130 |
95 var completer = new Completer(); | 131 var completer = new Completer(); |
96 _sendQueue.add([message, completer]); | 132 _sendQueue.add([message, completer]); |
97 if (!_handle.writeEnabled()) { | 133 if (_sendQueue.length == 1) { |
98 _handle.enableWriteEvents(); | 134 _eventStream.enableAllEvents(); |
| 135 } else { |
| 136 _eventStream.enableReadEvents(); |
99 } | 137 } |
100 return completer.future; | 138 return completer.future; |
101 } | 139 } |
102 | 140 |
103 // Need a getter for this for access in subclasses. | 141 // Need a getter for this for access in subclasses. |
104 List get completerQueue => _completerQueue; | 142 Map<int, Completer> get completerMap => _completerMap; |
105 bool get isOpen => _isOpen; | 143 bool get isOpen => _isOpen; |
106 } | 144 } |
OLD | NEW |