OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 part of core; | 5 part of core; |
6 | 6 |
7 class MojoEventStream extends Stream<List<int>> { | 7 class MojoEventStream extends Stream<List<int>> { |
8 // The underlying Mojo handle. | 8 // The underlying Mojo handle. |
9 MojoHandle _handle; | 9 MojoHandle _handle; |
10 | 10 |
(...skipping 19 matching lines...) Expand all Loading... |
30 [MojoHandleSignals signals = MojoHandleSignals.PEER_CLOSED_READABLE]) | 30 [MojoHandleSignals signals = MojoHandleSignals.PEER_CLOSED_READABLE]) |
31 : _handle = handle, | 31 : _handle = handle, |
32 _signals = signals, | 32 _signals = signals, |
33 _isListening = false { | 33 _isListening = false { |
34 MojoResult result = MojoHandle.register(this); | 34 MojoResult result = MojoHandle.register(this); |
35 if (!result.isOk) { | 35 if (!result.isOk) { |
36 throw "Failed to register the MojoHandle: $result."; | 36 throw "Failed to register the MojoHandle: $result."; |
37 } | 37 } |
38 } | 38 } |
39 | 39 |
40 Future close() { | 40 Future close({bool immediate: false}) { |
41 if (_handle != null) { | 41 if (_handle != null) { |
42 if (_isListening) { | 42 if (_isListening) { |
43 return _handleWatcherClose(); | 43 return _handleWatcherClose(immediate: immediate); |
44 } else { | 44 } else { |
45 _localClose(); | 45 _localClose(); |
46 return new Future.value(null); | 46 return new Future.value(null); |
47 } | 47 } |
48 } | 48 } |
49 } | 49 } |
50 | 50 |
51 StreamSubscription<List<int>> listen(void onData(List event), | 51 StreamSubscription<List<int>> listen(void onData(List event), |
52 {Function onError, void onDone(), bool cancelOnError}) { | 52 {Function onError, void onDone(), bool cancelOnError}) { |
53 if (_isListening) { | 53 if (_isListening) { |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
85 throw "MojoHandleWatcher add failed: $res"; | 85 throw "MojoHandleWatcher add failed: $res"; |
86 } | 86 } |
87 } | 87 } |
88 } | 88 } |
89 | 89 |
90 void enableReadEvents() => | 90 void enableReadEvents() => |
91 enableSignals(MojoHandleSignals.PEER_CLOSED_READABLE); | 91 enableSignals(MojoHandleSignals.PEER_CLOSED_READABLE); |
92 void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE); | 92 void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE); |
93 void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE); | 93 void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE); |
94 | 94 |
95 Future _handleWatcherClose() { | 95 Future _handleWatcherClose({bool immediate: false}) { |
96 assert(_handle != null); | 96 assert(_handle != null); |
97 assert(MojoHandle._removeUnclosedHandle(_handle)); | 97 assert(MojoHandle._removeUnclosedHandle(_handle)); |
98 return MojoHandleWatcher.close(_handle.h, wait: true).then((r) { | 98 return MojoHandleWatcher.close(_handle.h, wait: !immediate).then((r) { |
99 if (_receivePort != null) { | 99 if (_receivePort != null) { |
100 _receivePort.close(); | 100 _receivePort.close(); |
101 _receivePort = null; | 101 _receivePort = null; |
102 } | 102 } |
103 return new MojoResult(r); | 103 return new MojoResult(r); |
104 }); | 104 }); |
105 } | 105 } |
106 | 106 |
107 void _localClose() { | 107 void _localClose() { |
108 assert(_handle != null); | 108 assert(_handle != null); |
109 _handle.close(); | 109 _handle.close(); |
110 _handle = null; | 110 _handle = null; |
111 if (_receivePort != null) { | 111 if (_receivePort != null) { |
112 _receivePort.close(); | 112 _receivePort.close(); |
113 _receivePort = null; | 113 _receivePort = null; |
114 } | 114 } |
115 } | 115 } |
116 | 116 |
117 void _onSubscriptionStateChange() { | 117 void _onSubscriptionStateChange() { |
118 if (!_controller.hasListener) { | 118 if (!_controller.hasListener) { |
119 close(); | 119 // No one is listening, close it immediately. |
| 120 close(immediate: true); |
120 } | 121 } |
121 } | 122 } |
122 | 123 |
123 void _onPauseStateChange() { | 124 void _onPauseStateChange() { |
124 if (_controller.isPaused) { | 125 if (_controller.isPaused) { |
125 var res = new MojoResult(MojoHandleWatcher.remove(_handle.h)); | 126 var res = new MojoResult(MojoHandleWatcher.remove(_handle.h)); |
126 if (!res.isOk) { | 127 if (!res.isOk) { |
127 throw "MojoHandleWatcher add failed: $res"; | 128 throw "MojoHandleWatcher add failed: $res"; |
128 } | 129 } |
129 } else { | 130 } else { |
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
203 } | 204 } |
204 if (signalsReceived.isWritable) { | 205 if (signalsReceived.isWritable) { |
205 assert(_eventStream.readyWrite); | 206 assert(_eventStream.readyWrite); |
206 handleWrite(); | 207 handleWrite(); |
207 } | 208 } |
208 if (!signalsReceived.isPeerClosed) { | 209 if (!signalsReceived.isPeerClosed) { |
209 _eventStream.enableSignals(signalsWatched); | 210 _eventStream.enableSignals(signalsWatched); |
210 } | 211 } |
211 _isInHandler = false; | 212 _isInHandler = false; |
212 if (signalsReceived.isPeerClosed) { | 213 if (signalsReceived.isPeerClosed) { |
213 // nodefer is true here because there is no need to wait to close until | 214 // immediate is true here because there is no need to wait to close |
214 // outstanding messages are sent. The other side is gone. | 215 // until outstanding messages are sent. The other side is gone. |
215 close(nodefer: true).then((_) { | 216 close(immediate: true).then((_) { |
216 if (onError != null) { | 217 if (onError != null) { |
217 onError(); | 218 onError(); |
218 } | 219 } |
219 }); | 220 }); |
220 } | 221 } |
221 }, onDone: close); | 222 }, onDone: close); |
222 return subscription; | 223 return subscription; |
223 } | 224 } |
224 | 225 |
225 Future close({bool nodefer: false}) { | 226 Future close({bool immediate: false}) { |
226 var result; | 227 var result; |
227 _isOpen = false; | 228 _isOpen = false; |
228 _endpoint = null; | 229 _endpoint = null; |
229 subscription = null; | 230 subscription = null; |
230 if (_eventStream != null) { | 231 if (_eventStream != null) { |
231 result = _eventStream.close().then((_) { | 232 result = _eventStream.close(immediate: immediate).then((_) { |
232 _eventStream = null; | 233 _eventStream = null; |
233 }); | 234 }); |
234 } | 235 } |
235 return result != null ? result : new Future.value(null); | 236 return result != null ? result : new Future.value(null); |
236 } | 237 } |
237 | 238 |
238 void handleRead() {} | 239 void handleRead() {} |
239 void handleWrite() {} | 240 void handleWrite() {} |
240 | 241 |
241 MojoMessagePipeEndpoint get endpoint => _endpoint; | 242 MojoMessagePipeEndpoint get endpoint => _endpoint; |
242 bool get isOpen => _isOpen; | 243 bool get isOpen => _isOpen; |
243 bool get isInHandler => _isInHandler; | 244 bool get isInHandler => _isInHandler; |
244 bool get isBound => _endpoint != null; | 245 bool get isBound => _endpoint != null; |
245 | 246 |
246 String toString() => "MojoEventStreamListener(" | 247 String toString() => "MojoEventStreamListener(" |
247 "isOpen: $isOpen, isBound: $isBound, endpoint: $_endpoint)"; | 248 "isOpen: $isOpen, isBound: $isBound, endpoint: $_endpoint)"; |
248 } | 249 } |
OLD | NEW |