Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(8)

Side by Side Diff: mojo/public/dart/src/event_stream.dart

Issue 996923003: Dart: Better handle leak checks. close() is async. (Closed) Base URL: git@github.com:domokit/mojo.git@master
Patch Set: Fix regexes Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 part of core; 5 part of core;
6 6
7 class MojoEventStream extends Stream<List<int>> { 7 class MojoEventStream extends Stream<List<int>> {
8 // The underlying Mojo handle. 8 // The underlying Mojo handle.
9 MojoHandle _handle; 9 MojoHandle _handle;
10 10
(...skipping 19 matching lines...) Expand all
30 [MojoHandleSignals signals = MojoHandleSignals.PEER_CLOSED_READABLE]) 30 [MojoHandleSignals signals = MojoHandleSignals.PEER_CLOSED_READABLE])
31 : _handle = handle, 31 : _handle = handle,
32 _signals = signals, 32 _signals = signals,
33 _isListening = false { 33 _isListening = false {
34 MojoResult result = MojoHandle.register(this); 34 MojoResult result = MojoHandle.register(this);
35 if (!result.isOk) { 35 if (!result.isOk) {
36 throw "Failed to register the MojoHandle: $result."; 36 throw "Failed to register the MojoHandle: $result.";
37 } 37 }
38 } 38 }
39 39
40 void close() { 40 Future close() {
41 if (_handle != null) { 41 if (_handle != null) {
42 if (_isListening) { 42 if (_isListening) {
43 MojoHandleWatcher.close(_handle); 43 return _handleWatcherClose();
44 } else { 44 } else {
45 _handle.close(); 45 _localClose();
46 return new Future.value(null);
46 } 47 }
47 _handle = null;
48 }
49 if (_receivePort != null) {
50 _receivePort.close();
51 _receivePort = null;
52 } 48 }
53 } 49 }
54 50
55 StreamSubscription<List<int>> listen(void onData(List event), 51 StreamSubscription<List<int>> listen(void onData(List event),
56 {Function onError, void onDone(), bool cancelOnError}) { 52 {Function onError, void onDone(), bool cancelOnError}) {
57 if (_isListening) { 53 if (_isListening) {
58 throw "Listen has already been called: $_handle."; 54 throw "Listen has already been called: $_handle.";
59 } 55 }
60 _receivePort = new ReceivePort(); 56 _receivePort = new ReceivePort();
61 _sendPort = _receivePort.sendPort; 57 _sendPort = _receivePort.sendPort;
(...skipping 25 matching lines...) Expand all
87 throw "MojoHandleWatcher add failed: $res"; 83 throw "MojoHandleWatcher add failed: $res";
88 } 84 }
89 } 85 }
90 } 86 }
91 87
92 void enableReadEvents() => 88 void enableReadEvents() =>
93 enableSignals(MojoHandleSignals.PEER_CLOSED_READABLE); 89 enableSignals(MojoHandleSignals.PEER_CLOSED_READABLE);
94 void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE); 90 void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE);
95 void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE); 91 void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE);
96 92
93 Future _handleWatcherClose() {
94 assert(_handle != null);
95 return MojoHandleWatcher.close(_handle, wait: true).then((_) {
96 if (_receivePort != null) {
97 _receivePort.close();
98 _receivePort = null;
99 }
100 });
101 }
102
103 void _localClose() {
104 assert(_handle != null);
105 _handle.close();
106 _handle = null;
107 if (_receivePort != null) {
108 _receivePort.close();
109 _receivePort = null;
110 }
111 }
112
97 void _onSubscriptionStateChange() { 113 void _onSubscriptionStateChange() {
98 if (!_controller.hasListener) { 114 if (!_controller.hasListener) {
99 close(); 115 close();
100 } 116 }
101 } 117 }
102 118
103 void _onPauseStateChange() { 119 void _onPauseStateChange() {
104 if (_controller.isPaused) { 120 if (_controller.isPaused) {
105 var res = MojoHandleWatcher.remove(_handle); 121 var res = MojoHandleWatcher.remove(_handle);
106 if (!res.isOk) { 122 if (!res.isOk) {
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
160 assert(!isBound); 176 assert(!isBound);
161 _endpoint = new MojoMessagePipeEndpoint(handle); 177 _endpoint = new MojoMessagePipeEndpoint(handle);
162 _eventStream = new MojoEventStream(handle); 178 _eventStream = new MojoEventStream(handle);
163 _isOpen = false; 179 _isOpen = false;
164 } 180 }
165 181
166 StreamSubscription<List<int>> listen() { 182 StreamSubscription<List<int>> listen() {
167 assert(isBound && (subscription == null)); 183 assert(isBound && (subscription == null));
168 _isOpen = true; 184 _isOpen = true;
169 subscription = _eventStream.listen((List<int> event) { 185 subscription = _eventStream.listen((List<int> event) {
186 if (!_isOpen) {
187 // The actual close of the underlying stream happens asynchronously
188 // after the call to close. However, we start to ignore incoming events
189 // immediately.
190 return;
191 }
170 var signalsWatched = new MojoHandleSignals(event[0]); 192 var signalsWatched = new MojoHandleSignals(event[0]);
171 var signalsReceived = new MojoHandleSignals(event[1]); 193 var signalsReceived = new MojoHandleSignals(event[1]);
172 _isInHandler = true; 194 _isInHandler = true;
173 if (signalsReceived.isReadable) { 195 if (signalsReceived.isReadable) {
174 assert(_eventStream.readyRead); 196 assert(_eventStream.readyRead);
175 handleRead(); 197 handleRead();
176 } 198 }
177 if (signalsReceived.isWritable) { 199 if (signalsReceived.isWritable) {
178 assert(_eventStream.readyWrite); 200 assert(_eventStream.readyWrite);
179 handleWrite(); 201 handleWrite();
180 } 202 }
181 if (_isOpen) { 203 if (_isOpen) {
182 _eventStream.enableSignals(signalsWatched); 204 _eventStream.enableSignals(signalsWatched);
183 } 205 }
184 _isInHandler = false; 206 _isInHandler = false;
185 if (signalsReceived.isPeerClosed) { 207 if (signalsReceived.isPeerClosed) {
186 if (onError != null) onError(); 208 if (onError != null) {
209 onError();
210 }
187 close(); 211 close();
188 // The peer being closed obviates any other signal we might
189 // have received since we won't be able to read or write the handle.
190 // Thus, we just return before invoking other handlers.
191 return;
192 } 212 }
193 }, onDone: close); 213 }, onDone: close);
194 return subscription; 214 return subscription;
195 } 215 }
196 216
197 void close() { 217 Future close() {
218 var result;
198 _isOpen = false; 219 _isOpen = false;
199 _endpoint = null; 220 _endpoint = null;
200 subscription = null; 221 subscription = null;
201 if (_eventStream != null) { 222 if (_eventStream != null) {
202 _eventStream.close(); 223 result = _eventStream.close().then((_) {
203 _eventStream = null; 224 _eventStream = null;
225 });
204 } 226 }
227 return result != null ? result : new Future.value(null);
205 } 228 }
206 229
207 void handleRead() {} 230 void handleRead() {}
208 void handleWrite() {} 231 void handleWrite() {}
209 232
210 MojoMessagePipeEndpoint get endpoint => _endpoint; 233 MojoMessagePipeEndpoint get endpoint => _endpoint;
211 bool get isOpen => _isOpen; 234 bool get isOpen => _isOpen;
212 bool get isInHandler => _isInHandler; 235 bool get isInHandler => _isInHandler;
213 bool get isBound => _endpoint != null; 236 bool get isBound => _endpoint != null;
214 237
215 String toString() => "MojoEventStreamListener(" 238 String toString() => "MojoEventStreamListener("
216 "isOpen: $isOpen, isBound: $isBound, endpoint: $_endpoint)"; 239 "isOpen: $isOpen, isBound: $isBound, endpoint: $_endpoint)";
217 } 240 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698