OLD | NEW |
---|---|
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
9 void _add(T data); | 9 void _add(T data); |
10 void _addError(Object error); | 10 void _addError(Object error); |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
64 * after a call to [resume] if there are pending events. | 64 * after a call to [resume] if there are pending events. |
65 */ | 65 */ |
66 static const int _STATE_INPUT_PAUSED = 4; | 66 static const int _STATE_INPUT_PAUSED = 4; |
67 /** | 67 /** |
68 * Whether the subscription has been canceled. | 68 * Whether the subscription has been canceled. |
69 * | 69 * |
70 * Set by calling [cancel], or by handling a "done" event, or an "error" event | 70 * Set by calling [cancel], or by handling a "done" event, or an "error" event |
71 * when `cancelOnError` is true. | 71 * when `cancelOnError` is true. |
72 */ | 72 */ |
73 static const int _STATE_CANCELED = 8; | 73 static const int _STATE_CANCELED = 8; |
74 static const int _STATE_IN_CALLBACK = 16; | 74 static const int _STATE_IN_ERROR_CANCEL = 16; |
floitsch
2013/10/12 18:53:57
Add comment that _STATE_IN_ERROR_CANCEL implies th
Lasse Reichstein Nielsen
2013/10/14 11:32:33
And document what it means. In painful detail, bec
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
75 static const int _STATE_HAS_PENDING = 32; | 75 static const int _STATE_IN_CALLBACK = 32; |
76 static const int _STATE_PAUSE_COUNT = 64; | 76 static const int _STATE_HAS_PENDING = 64; |
77 static const int _STATE_PAUSE_COUNT_SHIFT = 6; | 77 static const int _STATE_PAUSE_COUNT = 128; |
78 static const int _STATE_PAUSE_COUNT_SHIFT = 7; | |
78 | 79 |
79 /* Event handlers provided in constructor. */ | 80 /* Event handlers provided in constructor. */ |
80 _DataHandler<T> _onData; | 81 _DataHandler<T> _onData; |
81 _ErrorHandler _onError; | 82 _ErrorHandler _onError; |
82 _DoneHandler _onDone; | 83 _DoneHandler _onDone; |
83 final Zone _zone = Zone.current; | 84 final Zone _zone = Zone.current; |
84 | 85 |
85 /** Bit vector based on state-constants above. */ | 86 /** Bit vector based on state-constants above. */ |
86 int _state; | 87 int _state; |
87 | 88 |
89 _Future _cancelFuture; | |
floitsch
2013/10/12 18:53:57
Add "TODO(floitsch): reuse another field"
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
90 | |
88 /** | 91 /** |
89 * Queue of pending events. | 92 * Queue of pending events. |
90 * | 93 * |
91 * Is created when necessary, or set in constructor for preconfigured events. | 94 * Is created when necessary, or set in constructor for preconfigured events. |
92 */ | 95 */ |
93 _PendingEvents _pending; | 96 _PendingEvents _pending; |
94 | 97 |
95 _BufferingStreamSubscription(void onData(T data), | 98 _BufferingStreamSubscription(void onData(T data), |
96 void onError(error), | 99 void onError(error), |
97 void onDone(), | 100 void onDone(), |
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
172 _pending.schedule(this); | 175 _pending.schedule(this); |
173 } else { | 176 } else { |
174 assert(_mayResumeInput); | 177 assert(_mayResumeInput); |
175 _state &= ~_STATE_INPUT_PAUSED; | 178 _state &= ~_STATE_INPUT_PAUSED; |
176 if (!_inCallback) _guardCallback(_onResume); | 179 if (!_inCallback) _guardCallback(_onResume); |
177 } | 180 } |
178 } | 181 } |
179 } | 182 } |
180 } | 183 } |
181 | 184 |
182 void cancel() { | 185 Future cancel() { |
183 if (_isCanceled) return; | 186 _state &= ~_STATE_IN_ERROR_CANCEL; |
floitsch
2013/10/12 18:53:57
Add comment.
// The user doesn't want to receive a
Lasse Reichstein Nielsen
2013/10/14 11:32:33
any events anymore -> any further events.
This co
Anders Johnsen
2013/10/16 11:52:21
Done.
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
187 if (_isCanceled) return _cancelFuture; | |
184 _cancel(); | 188 _cancel(); |
185 if (!_inCallback) { | 189 return _cancelFuture; |
186 // otherwise checkState will be called after firing or callback completes. | |
187 _state |= _STATE_IN_CALLBACK; | |
188 _onCancel(); | |
189 _pending = null; | |
190 _state &= ~_STATE_IN_CALLBACK; | |
191 } | |
192 } | 190 } |
193 | 191 |
194 Future asFuture([var futureValue]) { | 192 Future asFuture([var futureValue]) { |
195 _Future<T> result = new _Future<T>(); | 193 _Future<T> result = new _Future<T>(); |
196 | 194 |
197 // Overwrite the onDone and onError handlers. | 195 // Overwrite the onDone and onError handlers. |
198 _onDone = () { result._complete(futureValue); }; | 196 _onDone = () { result._complete(futureValue); }; |
199 _onError = (error) { | 197 _onError = (error) { |
200 cancel(); | 198 cancel(); |
201 result._completeError(error); | 199 result._completeError(error); |
202 }; | 200 }; |
203 | 201 |
204 return result; | 202 return result; |
205 } | 203 } |
206 | 204 |
207 // State management. | 205 // State management. |
208 | 206 |
209 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; | 207 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; |
210 bool get _isClosed => (_state & _STATE_CLOSED) != 0; | 208 bool get _isClosed => (_state & _STATE_CLOSED) != 0; |
211 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; | 209 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; |
210 bool get _inErrorCancel => (_state & _STATE_IN_ERROR_CANCEL) != 0; | |
212 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; | 211 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; |
213 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; | 212 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; |
214 bool get _isPaused => _state >= _STATE_PAUSE_COUNT; | 213 bool get _isPaused => _state >= _STATE_PAUSE_COUNT; |
215 bool get _canFire => _state < _STATE_IN_CALLBACK; | 214 bool get _canFire => _state < _STATE_IN_CALLBACK; |
216 bool get _mayResumeInput => | 215 bool get _mayResumeInput => |
217 !_isPaused && (_pending == null || _pending.isEmpty); | 216 !_isPaused && (_pending == null || _pending.isEmpty); |
218 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; | 217 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; |
219 | 218 |
220 bool get isPaused => _isPaused; | 219 bool get isPaused => _isPaused; |
221 | 220 |
222 void _cancel() { | 221 void _cancel() { |
223 _state |= _STATE_CANCELED; | 222 _state |= _STATE_CANCELED; |
224 if (_hasPending) { | 223 if (_hasPending) { |
225 _pending.cancelSchedule(); | 224 _pending.cancelSchedule(); |
226 } | 225 } |
226 if (!_inCallback) _pending = null; | |
227 _cancelFuture = _onCancel(); | |
floitsch
2013/10/12 18:53:57
I'm not sure we are allowed to call "_onCancel" wh
Lasse Reichstein Nielsen
2013/10/14 11:32:33
I accepted that we had to call _onCancel during an
Anders Johnsen
2013/10/16 11:52:21
Done.
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
227 } | 228 } |
228 | 229 |
229 /** | 230 /** |
230 * Increment the pause count. | 231 * Increment the pause count. |
231 * | 232 * |
232 * Also marks input as paused. | 233 * Also marks input as paused. |
233 */ | 234 */ |
234 void _incrementPauseCount() { | 235 void _incrementPauseCount() { |
235 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | 236 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; |
236 } | 237 } |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
284 // try/catch wrapping and send any errors to | 285 // try/catch wrapping and send any errors to |
285 // [_Zone.current.handleUncaughtError]. | 286 // [_Zone.current.handleUncaughtError]. |
286 void _onPause() { | 287 void _onPause() { |
287 assert(_isInputPaused); | 288 assert(_isInputPaused); |
288 } | 289 } |
289 | 290 |
290 void _onResume() { | 291 void _onResume() { |
291 assert(!_isInputPaused); | 292 assert(!_isInputPaused); |
292 } | 293 } |
293 | 294 |
294 void _onCancel() { | 295 Future _onCancel() { |
295 assert(_isCanceled); | 296 assert(_isCanceled); |
296 } | 297 } |
297 | 298 |
298 // Handle pending events. | 299 // Handle pending events. |
299 | 300 |
300 /** | 301 /** |
301 * Add a pending event. | 302 * Add a pending event. |
302 * | 303 * |
303 * If the subscription is not paused, this also schedules a firing | 304 * If the subscription is not paused, this also schedules a firing |
304 * of pending events later (if necessary). | 305 * of pending events later (if necessary). |
(...skipping 21 matching lines...) Expand all Loading... | |
326 _zone.runUnaryGuarded(_onData, data); | 327 _zone.runUnaryGuarded(_onData, data); |
327 _state &= ~_STATE_IN_CALLBACK; | 328 _state &= ~_STATE_IN_CALLBACK; |
328 _checkState(wasInputPaused); | 329 _checkState(wasInputPaused); |
329 } | 330 } |
330 | 331 |
331 void _sendError(var error) { | 332 void _sendError(var error) { |
332 assert(!_isCanceled); | 333 assert(!_isCanceled); |
333 assert(!_isPaused); | 334 assert(!_isPaused); |
334 assert(!_inCallback); | 335 assert(!_inCallback); |
335 bool wasInputPaused = _isInputPaused; | 336 bool wasInputPaused = _isInputPaused; |
336 _state |= _STATE_IN_CALLBACK; | 337 void sendError() { |
floitsch
2013/10/12 18:53:57
one line before and one line after the function.
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
337 if (!_zone.inSameErrorZone(Zone.current)) { | 338 if (!_isCanceled || _inErrorCancel) { |
floitsch
2013/10/12 18:53:57
Comment: If the subscription has been canceled whi
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
338 // Errors are not allowed to traverse zone boundaries. | 339 _state |= _STATE_IN_CALLBACK; |
339 Zone.current.handleUncaughtError(error); | 340 if (!_zone.inSameErrorZone(Zone.current)) { |
341 // Errors are not allowed to traverse zone boundaries. | |
342 Zone.current.handleUncaughtError(error); | |
343 } else { | |
344 _zone.runUnaryGuarded(_onError, error); | |
345 } | |
346 _state &= ~_STATE_IN_CALLBACK; | |
347 } | |
348 } | |
349 if (_cancelOnError) { | |
350 _state |= _STATE_IN_ERROR_CANCEL; | |
351 _cancel(); | |
352 if (_cancelFuture != null) { | |
353 _cancelFuture.whenComplete(sendError); | |
354 } else { | |
355 sendError(); | |
356 } | |
340 } else { | 357 } else { |
341 _zone.runUnaryGuarded(_onError, error); | 358 sendError(); |
359 // Only check state if not cancelOnError. | |
360 _checkState(wasInputPaused); | |
342 } | 361 } |
343 _state &= ~_STATE_IN_CALLBACK; | |
344 if (_cancelOnError) { | |
345 _cancel(); | |
346 } | |
347 _checkState(wasInputPaused); | |
348 } | 362 } |
349 | 363 |
350 void _sendDone() { | 364 void _sendDone() { |
351 assert(!_isCanceled); | 365 assert(!_isCanceled); |
352 assert(!_isPaused); | 366 assert(!_isPaused); |
353 assert(!_inCallback); | 367 assert(!_inCallback); |
354 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); | 368 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); |
355 _zone.runGuarded(_onDone); | 369 _zone.runGuarded(_onDone); |
356 _onCancel(); // No checkState after cancel, it is always the last event. | 370 // TODO(ajohnsen): Run it before _onDone and wait for future? |
Anders Johnsen
2013/10/11 12:32:40
Please comment on this one - I'm not sure what the
floitsch
2013/10/12 18:53:57
Neither.
The done event should *not* call cancel.
Anders Johnsen
2013/10/16 11:52:21
Done.
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
371 _cancel(); // No checkState after cancel, it is always the last event. | |
357 _state &= ~_STATE_IN_CALLBACK; | 372 _state &= ~_STATE_IN_CALLBACK; |
358 } | 373 } |
359 | 374 |
360 /** | 375 /** |
361 * Call a hook function. | 376 * Call a hook function. |
362 * | 377 * |
363 * The call is properly wrapped in code to avoid other callbacks | 378 * The call is properly wrapped in code to avoid other callbacks |
364 * during the call, and it checks for state changes after the call | 379 * during the call, and it checks for state changes after the call |
365 * that should cause further callbacks. | 380 * that should cause further callbacks. |
366 */ | 381 */ |
(...skipping 21 matching lines...) Expand all Loading... | |
388 if (_hasPending && _pending.isEmpty) { | 403 if (_hasPending && _pending.isEmpty) { |
389 _state &= ~_STATE_HAS_PENDING; | 404 _state &= ~_STATE_HAS_PENDING; |
390 if (_isInputPaused && _mayResumeInput) { | 405 if (_isInputPaused && _mayResumeInput) { |
391 _state &= ~_STATE_INPUT_PAUSED; | 406 _state &= ~_STATE_INPUT_PAUSED; |
392 } | 407 } |
393 } | 408 } |
394 // If the state changes during a callback, we immediately | 409 // If the state changes during a callback, we immediately |
395 // make a new state-change callback. Loop until the state didn't change. | 410 // make a new state-change callback. Loop until the state didn't change. |
396 while (true) { | 411 while (true) { |
397 if (_isCanceled) { | 412 if (_isCanceled) { |
398 _onCancel(); | |
399 _pending = null; | 413 _pending = null; |
400 return; | 414 return; |
401 } | 415 } |
402 bool isInputPaused = _isInputPaused; | 416 bool isInputPaused = _isInputPaused; |
403 if (wasInputPaused == isInputPaused) break; | 417 if (wasInputPaused == isInputPaused) break; |
404 _state ^= _STATE_IN_CALLBACK; | 418 _state ^= _STATE_IN_CALLBACK; |
405 if (isInputPaused) { | 419 if (isInputPaused) { |
406 _onPause(); | 420 _onPause(); |
407 } else { | 421 } else { |
408 _onResume(); | 422 _onResume(); |
(...skipping 298 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
707 void onError(void handleError(Object data)) {} | 721 void onError(void handleError(Object data)) {} |
708 void onDone(void handleDone()) {} | 722 void onDone(void handleDone()) {} |
709 | 723 |
710 void pause([Future resumeSignal]) { | 724 void pause([Future resumeSignal]) { |
711 _pauseCounter++; | 725 _pauseCounter++; |
712 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); | 726 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); |
713 } | 727 } |
714 void resume() { | 728 void resume() { |
715 if (_pauseCounter > 0) _pauseCounter--; | 729 if (_pauseCounter > 0) _pauseCounter--; |
716 } | 730 } |
717 void cancel() {} | 731 Future cancel() => null; |
718 bool get isPaused => _pauseCounter > 0; | 732 bool get isPaused => _pauseCounter > 0; |
719 | 733 |
720 Future asFuture([futureValue]) => new _Future(); | 734 Future asFuture([futureValue]) => new _Future(); |
721 } | 735 } |
722 | 736 |
723 class _AsBroadcastStream<T> extends Stream<T> { | 737 class _AsBroadcastStream<T> extends Stream<T> { |
724 final Stream<T> _source; | 738 final Stream<T> _source; |
725 final _broadcastCallback _onListenHandler; | 739 final _broadcastCallback _onListenHandler; |
726 final _broadcastCallback _onCancelHandler; | 740 final _broadcastCallback _onCancelHandler; |
727 final Zone _zone; | 741 final Zone _zone; |
(...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
830 } | 844 } |
831 | 845 |
832 void pause([Future resumeSignal]) { | 846 void pause([Future resumeSignal]) { |
833 _stream._pauseSubscription(resumeSignal); | 847 _stream._pauseSubscription(resumeSignal); |
834 } | 848 } |
835 | 849 |
836 void resume() { | 850 void resume() { |
837 _stream._resumeSubscription(); | 851 _stream._resumeSubscription(); |
838 } | 852 } |
839 | 853 |
840 void cancel() { | 854 Future cancel() { |
841 _stream._cancelSubscription(); | 855 _stream._cancelSubscription(); |
856 return null; | |
842 } | 857 } |
843 | 858 |
844 bool get isPaused { | 859 bool get isPaused { |
845 return _stream._isSubscriptionPaused; | 860 return _stream._isSubscriptionPaused; |
846 } | 861 } |
847 | 862 |
848 Future asFuture([var futureValue]) { | 863 Future asFuture([var futureValue]) { |
849 throw new UnsupportedError( | 864 throw new UnsupportedError( |
850 "Cannot change handlers of asBroadcastStream source subscription."); | 865 "Cannot change handlers of asBroadcastStream source subscription."); |
851 } | 866 } |
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
942 } | 957 } |
943 | 958 |
944 /** Clears up the internal state when the iterator ends. */ | 959 /** Clears up the internal state when the iterator ends. */ |
945 void _clear() { | 960 void _clear() { |
946 _subscription = null; | 961 _subscription = null; |
947 _futureOrPrefetch = null; | 962 _futureOrPrefetch = null; |
948 _current = null; | 963 _current = null; |
949 _state = _STATE_DONE; | 964 _state = _STATE_DONE; |
950 } | 965 } |
951 | 966 |
952 void cancel() { | 967 Future cancel() { |
953 StreamSubscription subscription = _subscription; | 968 StreamSubscription subscription = _subscription; |
954 if (_state == _STATE_MOVING) { | 969 if (_state == _STATE_MOVING) { |
955 _Future<bool> hasNext = _futureOrPrefetch; | 970 _Future<bool> hasNext = _futureOrPrefetch; |
956 _clear(); | 971 _clear(); |
957 hasNext._complete(false); | 972 hasNext._complete(false); |
958 } else { | 973 } else { |
959 _clear(); | 974 _clear(); |
960 } | 975 } |
961 subscription.cancel(); | 976 return subscription.cancel(); |
962 } | 977 } |
963 | 978 |
964 void _onData(T data) { | 979 void _onData(T data) { |
965 if (_state == _STATE_MOVING) { | 980 if (_state == _STATE_MOVING) { |
966 _current = data; | 981 _current = data; |
967 _Future<bool> hasNext = _futureOrPrefetch; | 982 _Future<bool> hasNext = _futureOrPrefetch; |
968 _futureOrPrefetch = null; | 983 _futureOrPrefetch = null; |
969 _state = _STATE_FOUND; | 984 _state = _STATE_FOUND; |
970 hasNext._complete(true); | 985 hasNext._complete(true); |
971 return; | 986 return; |
(...skipping 23 matching lines...) Expand all Loading... | |
995 _Future<bool> hasNext = _futureOrPrefetch; | 1010 _Future<bool> hasNext = _futureOrPrefetch; |
996 _clear(); | 1011 _clear(); |
997 hasNext._complete(false); | 1012 hasNext._complete(false); |
998 return; | 1013 return; |
999 } | 1014 } |
1000 _subscription.pause(); | 1015 _subscription.pause(); |
1001 _futureOrPrefetch = null; | 1016 _futureOrPrefetch = null; |
1002 _state = _STATE_EXTRA_DONE; | 1017 _state = _STATE_EXTRA_DONE; |
1003 } | 1018 } |
1004 } | 1019 } |
OLD | NEW |