Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(150)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 18915008: Let StreamSubscription.cancel return a Future. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Rebase Created 7 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Abstract and private interface for a place to put events. */ 7 /** Abstract and private interface for a place to put events. */
8 abstract class _EventSink<T> { 8 abstract class _EventSink<T> {
9 void _add(T data); 9 void _add(T data);
10 void _addError(Object error); 10 void _addError(Object error);
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after
78 78
79 /* Event handlers provided in constructor. */ 79 /* Event handlers provided in constructor. */
80 _DataHandler<T> _onData; 80 _DataHandler<T> _onData;
81 _ErrorHandler _onError; 81 _ErrorHandler _onError;
82 _DoneHandler _onDone; 82 _DoneHandler _onDone;
83 final _Zone _zone = _Zone.current; 83 final _Zone _zone = _Zone.current;
84 84
85 /** Bit vector based on state-constants above. */ 85 /** Bit vector based on state-constants above. */
86 int _state; 86 int _state;
87 87
88 _FutureImpl _cancelFuture;
floitsch 2013/07/12 16:42:34 Add TODO that this should be in some field that is
Lasse Reichstein Nielsen 2013/07/17 07:28:46 And add documentation saying what it is, and where
89
88 /** 90 /**
89 * Queue of pending events. 91 * Queue of pending events.
90 * 92 *
91 * Is created when necessary, or set in constructor for preconfigured events. 93 * Is created when necessary, or set in constructor for preconfigured events.
92 */ 94 */
93 _PendingEvents _pending; 95 _PendingEvents _pending;
94 96
95 _BufferingStreamSubscription(this._onData, 97 _BufferingStreamSubscription(this._onData,
96 this._onError, 98 this._onError,
97 this._onDone, 99 this._onDone,
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
170 _pending.schedule(this); 172 _pending.schedule(this);
171 } else { 173 } else {
172 assert(_mayResumeInput); 174 assert(_mayResumeInput);
173 _state &= ~_STATE_INPUT_PAUSED; 175 _state &= ~_STATE_INPUT_PAUSED;
174 if (!_inCallback) _guardCallback(_onResume); 176 if (!_inCallback) _guardCallback(_onResume);
175 } 177 }
176 } 178 }
177 } 179 }
178 } 180 }
179 181
180 void cancel() { 182 void _chainCancelFuture(_FutureImpl future) {
Lasse Reichstein Nielsen 2013/07/17 07:28:46 _cancelFuture._setOrChainValue(future); It handle
181 if (_isCanceled) return; 183 if (future == null) {
184 _cancelFuture._setValue(null);
185 } else {
186 future._chain(_cancelFuture);
187 }
188 }
189
190 Future cancel() {
191 if (_isCanceled) return _cancelFuture;
182 _cancel(); 192 _cancel();
183 if (!_inCallback) { 193 if (!_inCallback) {
184 // otherwise checkState will be called after firing or callback completes. 194 // otherwise checkState will be called after firing or callback completes.
185 _state |= _STATE_IN_CALLBACK; 195 _state |= _STATE_IN_CALLBACK;
186 _onCancel(); 196 _chainCancelFuture(_onCancel());
Lasse Reichstein Nielsen 2013/07/17 07:28:46 So this line could just be: _cancelFuture.setOrCh
187 _pending = null; 197 _pending = null;
188 _state &= ~_STATE_IN_CALLBACK; 198 _state &= ~_STATE_IN_CALLBACK;
189 } 199 }
200 return _cancelFuture;
190 } 201 }
191 202
192 Future asFuture([var futureValue]) { 203 Future asFuture([var futureValue]) {
193 _FutureImpl<T> result = new _FutureImpl<T>(); 204 _FutureImpl<T> result = new _FutureImpl<T>();
194 205
195 // Overwrite the onDone and onError handlers. 206 // Overwrite the onDone and onError handlers.
196 _onDone = () { result._setValue(futureValue); }; 207 _onDone = () { result._setValue(futureValue); };
197 _onError = (error) { 208 _onError = (error) {
198 cancel(); 209 cancel();
199 result._setError(error); 210 result._setError(error);
(...skipping 11 matching lines...) Expand all
211 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; 222 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0;
212 bool get _isPaused => _state >= _STATE_PAUSE_COUNT; 223 bool get _isPaused => _state >= _STATE_PAUSE_COUNT;
213 bool get _canFire => _state < _STATE_IN_CALLBACK; 224 bool get _canFire => _state < _STATE_IN_CALLBACK;
214 bool get _mayResumeInput => 225 bool get _mayResumeInput =>
215 !_isPaused && (_pending == null || _pending.isEmpty); 226 !_isPaused && (_pending == null || _pending.isEmpty);
216 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; 227 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0;
217 228
218 bool get isPaused => _isPaused; 229 bool get isPaused => _isPaused;
219 230
220 void _cancel() { 231 void _cancel() {
232 _cancelFuture = new _FutureImpl();
221 _state |= _STATE_CANCELED; 233 _state |= _STATE_CANCELED;
222 _zone.cancelCallbackExpectation(); 234 _zone.cancelCallbackExpectation();
223 if (_hasPending) { 235 if (_hasPending) {
224 _pending.cancelSchedule(); 236 _pending.cancelSchedule();
225 } 237 }
226 } 238 }
227 239
228 /** 240 /**
229 * Increment the pause count. 241 * Increment the pause count.
230 * 242 *
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
283 // try/catch wrapping and send any errors to 295 // try/catch wrapping and send any errors to
284 // [_Zone.current.handleUncaughtError]. 296 // [_Zone.current.handleUncaughtError].
285 void _onPause() { 297 void _onPause() {
286 assert(_isInputPaused); 298 assert(_isInputPaused);
287 } 299 }
288 300
289 void _onResume() { 301 void _onResume() {
290 assert(!_isInputPaused); 302 assert(!_isInputPaused);
291 } 303 }
292 304
293 void _onCancel() { 305 _FutureImpl _onCancel() {
294 assert(_isCanceled); 306 assert(_isCanceled);
295 } 307 }
296 308
297 // Handle pending events. 309 // Handle pending events.
298 310
299 /** 311 /**
300 * Add a pending event. 312 * Add a pending event.
301 * 313 *
302 * If the subscription is not paused, this also schedules a firing 314 * If the subscription is not paused, this also schedules a firing
303 * of pending events later (if necessary). 315 * of pending events later (if necessary).
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after
387 if (_hasPending && _pending.isEmpty) { 399 if (_hasPending && _pending.isEmpty) {
388 _state &= ~_STATE_HAS_PENDING; 400 _state &= ~_STATE_HAS_PENDING;
389 if (_isInputPaused && _mayResumeInput) { 401 if (_isInputPaused && _mayResumeInput) {
390 _state &= ~_STATE_INPUT_PAUSED; 402 _state &= ~_STATE_INPUT_PAUSED;
391 } 403 }
392 } 404 }
393 // If the state changes during a callback, we immediately 405 // If the state changes during a callback, we immediately
394 // make a new state-change callback. Loop until the state didn't change. 406 // make a new state-change callback. Loop until the state didn't change.
395 while (true) { 407 while (true) {
396 if (_isCanceled) { 408 if (_isCanceled) {
397 _onCancel(); 409 _chainCancelFuture(_onCancel());
Lasse Reichstein Nielsen 2013/07/17 07:28:46 _cancelFuture._setOrChainValue(_onCancel());
398 _pending = null; 410 _pending = null;
399 return; 411 return;
400 } 412 }
401 bool isInputPaused = _isInputPaused; 413 bool isInputPaused = _isInputPaused;
402 if (wasInputPaused == isInputPaused) break; 414 if (wasInputPaused == isInputPaused) break;
403 _state ^= _STATE_IN_CALLBACK; 415 _state ^= _STATE_IN_CALLBACK;
404 if (isInputPaused) { 416 if (isInputPaused) {
405 _onPause(); 417 _onPause();
406 } else { 418 } else {
407 _onResume(); 419 _onResume();
(...skipping 592 matching lines...) Expand 10 before | Expand all | Expand 10 after
1000 _FutureImpl<bool> hasNext = _futureOrPrefetch; 1012 _FutureImpl<bool> hasNext = _futureOrPrefetch;
1001 _clear(); 1013 _clear();
1002 hasNext._setValue(false); 1014 hasNext._setValue(false);
1003 return; 1015 return;
1004 } 1016 }
1005 _subscription.pause(); 1017 _subscription.pause();
1006 _futureOrPrefetch = null; 1018 _futureOrPrefetch = null;
1007 _state = _STATE_EXTRA_DONE; 1019 _state = _STATE_EXTRA_DONE;
1008 } 1020 }
1009 } 1021 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698