OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 part of bindings; | 5 part of bindings; |
6 | 6 |
7 abstract class Interface { | 7 abstract class Interface { |
8 core.MojoMessagePipeEndpoint _endpoint; | 8 core.MojoMessagePipeEndpoint _endpoint; |
9 core.MojoEventStream _eventStream; | 9 core.MojoEventStream _eventStream; |
10 List _sendQueue; | 10 List _sendQueue; |
11 bool _isOpen; | 11 bool _isOpen; |
| 12 bool _isClosing; |
| 13 bool _isInHandler; |
| 14 int _outstandingResponseFutures; |
12 | 15 |
13 Future<Message> handleMessage(MessageReader reader); | 16 Future<Message> handleMessage(MessageReader reader); |
14 | 17 |
15 Interface(core.MojoMessagePipeEndpoint endpoint) : | 18 Interface(core.MojoMessagePipeEndpoint endpoint) : |
16 _endpoint = endpoint, | 19 _endpoint = endpoint, |
17 _sendQueue = [], | 20 _sendQueue = [], |
18 _eventStream = new core.MojoEventStream(endpoint.handle), | 21 _eventStream = new core.MojoEventStream(endpoint.handle), |
19 _isOpen = false; | 22 _isOpen = false, |
| 23 _isClosing = false, |
| 24 _isInHandler = false, |
| 25 _outstandingResponseFutures = 0; |
20 | 26 |
21 Interface.fromHandle(int handle) { | 27 Interface.fromHandle(int handle) { |
22 _endpoint = | 28 _endpoint = |
23 new core.MojoMessagePipeEndpoint(new core.MojoHandle(handle)); | 29 new core.MojoMessagePipeEndpoint(new core.MojoHandle(handle)); |
24 _sendQueue = []; | 30 _sendQueue = []; |
25 _eventStream = new core.MojoEventStream(_endpoint.handle); | 31 _eventStream = new core.MojoEventStream(_endpoint.handle); |
26 _isOpen = false; | 32 _isOpen = false; |
| 33 _isClosing = false; |
| 34 _isInHandler = false; |
| 35 _outstandingResponseFutures = 0; |
27 } | 36 } |
28 | 37 |
29 void _doRead() { | 38 void _doRead() { |
30 assert(_eventStream.readyRead); | 39 assert(_eventStream.readyRead); |
31 | 40 |
32 // Query how many bytes are available. | 41 // Query how many bytes are available. |
33 var result = _endpoint.query(); | 42 var result = _endpoint.query(); |
34 assert(result.status.isOk || result.status.isResourceExhausted); | 43 assert(result.status.isOk || result.status.isResourceExhausted); |
35 | 44 |
36 // Read the data and view as a message. | 45 // Read the data and view as a message. |
37 var bytes = new ByteData(result.bytesRead); | 46 var bytes = new ByteData(result.bytesRead); |
38 var handles = new List<core.MojoHandle>(result.handlesRead); | 47 var handles = new List<core.MojoHandle>(result.handlesRead); |
39 result = _endpoint.read(bytes, result.bytesRead, handles); | 48 result = _endpoint.read(bytes, result.bytesRead, handles); |
40 assert(result.status.isOk || result.status.isResourceExhausted); | 49 assert(result.status.isOk || result.status.isResourceExhausted); |
41 var message = new Message(bytes, handles); | 50 var message = new Message(bytes, handles); |
42 var reader = new MessageReader(message); | 51 var reader = new MessageReader(message); |
43 | 52 |
44 // Prepare the response. | 53 // Prepare the response. Drop messages if we are closing. |
45 var responseFuture = handleMessage(reader); | 54 var responseFuture = _isClosing ? null : handleMessage(reader); |
46 | 55 |
47 // If there's a response, queue it up for sending. | 56 // If there's a response, queue it up for sending. |
48 if (responseFuture != null) { | 57 if (responseFuture != null) { |
| 58 _outstandingResponseFutures++; |
49 responseFuture.then((response) { | 59 responseFuture.then((response) { |
| 60 _outstandingResponseFutures--; |
50 _sendQueue.add(response); | 61 _sendQueue.add(response); |
51 if (_sendQueue.length == 1) { | 62 if (_sendQueue.length == 1) { |
52 _eventStream.enableWriteEvents(); | 63 _eventStream.enableWriteEvents(); |
53 } | 64 } |
54 }); | 65 }); |
| 66 } else if (_isClosing && |
| 67 (_sendQueue.length == 0) && |
| 68 (_outstandingResponseFutures == 0)) { |
| 69 // We are closing, there is no response to send for this message, the send |
| 70 // queue is empty, and there are no outstanding futures. Do the close now. |
| 71 _close(); |
55 } | 72 } |
56 } | 73 } |
57 | 74 |
58 void _doWrite() { | 75 void _doWrite() { |
59 if (_sendQueue.length > 0) { | 76 if (_sendQueue.length > 0) { |
60 assert(_eventStream.readyWrite); | 77 assert(_eventStream.readyWrite); |
61 var responseMessage = _sendQueue.removeAt(0); | 78 var responseMessage = _sendQueue.removeAt(0); |
62 _endpoint.write(responseMessage.buffer, | 79 _endpoint.write(responseMessage.buffer, |
63 responseMessage.buffer.lengthInBytes, | 80 responseMessage.buffer.lengthInBytes, |
64 responseMessage.handles); | 81 responseMessage.handles); |
65 if (!_endpoint.status.isOk) { | 82 if (!_endpoint.status.isOk) { |
66 throw "message pipe write failed: ${_endpoint.status}"; | 83 throw "message pipe write failed: ${_endpoint.status}"; |
67 } | 84 } |
| 85 } else if (_isClosing && (_outstandingResponseFutures == 0)) { |
| 86 // We are closing, the send queue is empty, and there are no outstanding |
| 87 // response futures. Really do the close, now. |
| 88 _close(); |
68 } | 89 } |
69 } | 90 } |
70 | 91 |
71 StreamSubscription<int> listen() { | 92 StreamSubscription<int> listen() { |
72 _isOpen = true; | 93 _isOpen = true; |
73 return _eventStream.listen((List<int> event) { | 94 return _eventStream.listen((List<int> event) { |
74 var signalsWatched = new core.MojoHandleSignals(event[0]); | 95 var signalsWatched = new core.MojoHandleSignals(event[0]); |
75 var signalsReceived = new core.MojoHandleSignals(event[1]); | 96 var signalsReceived = new core.MojoHandleSignals(event[1]); |
76 if (signalsReceived.isPeerClosed) { | 97 if (signalsReceived.isPeerClosed) { |
77 close(); | 98 // If the peer is closed, we can close the interface immediately. |
| 99 _close(); |
78 return; | 100 return; |
79 } | 101 } |
80 | 102 |
| 103 _isInHandler = true; |
81 if (signalsReceived.isReadable) { | 104 if (signalsReceived.isReadable) { |
82 _doRead(); | 105 _doRead(); |
83 } | 106 } |
84 | 107 |
85 if (signalsReceived.isWritable) { | 108 if (signalsReceived.isWritable) { |
86 _doWrite(); | 109 _doWrite(); |
87 } | 110 } |
88 | 111 |
89 if (_sendQueue.length == 0) { | 112 if (_sendQueue.length == 0) { |
90 var withoutWritable = signalsWatched - core.MojoHandleSignals.WRITABLE; | 113 var withoutWritable = signalsWatched - core.MojoHandleSignals.WRITABLE; |
91 _eventStream.enableSignals(withoutWritable); | 114 _eventStream.enableSignals(withoutWritable); |
92 } else { | 115 } else { |
93 _eventStream.enableSignals(signalsWatched); | 116 _eventStream.enableSignals(signalsWatched); |
94 } | 117 } |
| 118 _isInHandler = false; |
95 }); | 119 }); |
96 } | 120 } |
97 | 121 |
| 122 void _close() { |
| 123 _eventStream.close(); |
| 124 _isClosing = false; |
| 125 _isOpen = false; |
| 126 _eventStream = null; |
| 127 } |
| 128 |
98 void close() { | 129 void close() { |
99 // TODO(zra): Cancel outstanding Futures started in _doRead? | 130 if (!_isOpen) return; |
100 if (_isOpen) { | 131 if (_isInHandler || (_sendQueue.length > 0) || |
101 _eventStream.close(); | 132 (_outstandingResponseFutures > 0)) { |
102 _isOpen = false; | 133 // If close is called from within the event handler, or if there are |
103 _eventStream = null; | 134 // outstanding responses to send, then defer the close call until all |
| 135 // everything is finished. |
| 136 _isClosing = true; |
| 137 } else { |
| 138 _close(); |
104 } | 139 } |
105 } | 140 } |
106 | 141 |
107 Message buildResponse(Type t, int name, Object response) { | 142 Message buildResponse(Type t, int name, Object response) { |
108 var builder = new MessageBuilder(name, align(getEncodedSize(t))); | 143 var builder = new MessageBuilder(name, align(getEncodedSize(t))); |
109 builder.encodeStruct(t, response); | 144 builder.encodeStruct(t, response); |
110 return builder.finish(); | 145 return builder.finish(); |
111 } | 146 } |
112 | 147 |
113 Message buildResponseWithID( | 148 Message buildResponseWithID( |
(...skipping 13 matching lines...) Expand all Loading... |
127 } | 162 } |
128 | 163 |
129 Future enqueueMessageWithRequestID(Type t, int name, int id, Object msg) { | 164 Future enqueueMessageWithRequestID(Type t, int name, int id, Object msg) { |
130 // TODO(zra): Is this correct? | 165 // TODO(zra): Is this correct? |
131 throw "The client interface should not expect a response"; | 166 throw "The client interface should not expect a response"; |
132 } | 167 } |
133 | 168 |
134 bool get isOpen => _isOpen; | 169 bool get isOpen => _isOpen; |
135 core.MojoMessagePipeEndpoint get endpoint => _endpoint; | 170 core.MojoMessagePipeEndpoint get endpoint => _endpoint; |
136 } | 171 } |
OLD | NEW |