Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(276)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 25094002: Adapt streams for additional stackTrace argument. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Abstract and private interface for a place to put events. */ 7 /** Abstract and private interface for a place to put events. */
8 abstract class _EventSink<T> { 8 abstract class _EventSink<T> {
9 void _add(T data); 9 void _add(T data);
10 void _addError(Object error); 10 void _addError(Object error, StackTrace stackTrace);
11 void _close(); 11 void _close();
12 } 12 }
13 13
14 /** 14 /**
15 * Abstract and private interface for a place to send events. 15 * Abstract and private interface for a place to send events.
16 * 16 *
17 * Used by event buffering to finally dispatch the pending event, where 17 * Used by event buffering to finally dispatch the pending event, where
18 * [_EventSink] is where the event first enters the stream subscription, 18 * [_EventSink] is where the event first enters the stream subscription,
19 * and may yet be buffered. 19 * and may yet be buffered.
20 */ 20 */
21 abstract class _EventDispatch<T> { 21 abstract class _EventDispatch<T> {
22 void _sendData(T data); 22 void _sendData(T data);
23 void _sendError(Object error); 23 void _sendError(Object error, StackTrace stackTrace);
24 void _sendDone(); 24 void _sendDone();
25 } 25 }
26 26
27 /** 27 /**
28 * Default implementation of stream subscription of buffering events. 28 * Default implementation of stream subscription of buffering events.
29 * 29 *
30 * The only public methods are those of [StreamSubscription], so instances of 30 * The only public methods are those of [StreamSubscription], so instances of
31 * [_BufferingStreamSubscription] can be returned directly as a 31 * [_BufferingStreamSubscription] can be returned directly as a
32 * [StreamSubscription] without exposing internal functionality. 32 * [StreamSubscription] without exposing internal functionality.
33 * 33 *
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
71 * when `cancelOnError` is true. 71 * when `cancelOnError` is true.
72 */ 72 */
73 static const int _STATE_CANCELED = 8; 73 static const int _STATE_CANCELED = 8;
74 static const int _STATE_IN_CALLBACK = 16; 74 static const int _STATE_IN_CALLBACK = 16;
75 static const int _STATE_HAS_PENDING = 32; 75 static const int _STATE_HAS_PENDING = 32;
76 static const int _STATE_PAUSE_COUNT = 64; 76 static const int _STATE_PAUSE_COUNT = 64;
77 static const int _STATE_PAUSE_COUNT_SHIFT = 6; 77 static const int _STATE_PAUSE_COUNT_SHIFT = 6;
78 78
79 /* Event handlers provided in constructor. */ 79 /* Event handlers provided in constructor. */
80 _DataHandler<T> _onData; 80 _DataHandler<T> _onData;
81 _ErrorHandler _onError; 81 Function _onError;
82 _DoneHandler _onDone; 82 _DoneHandler _onDone;
83 final Zone _zone = Zone.current; 83 final Zone _zone = Zone.current;
84 84
85 /** Bit vector based on state-constants above. */ 85 /** Bit vector based on state-constants above. */
86 int _state; 86 int _state;
87 87
88 /** 88 /**
89 * Queue of pending events. 89 * Queue of pending events.
90 * 90 *
91 * Is created when necessary, or set in constructor for preconfigured events. 91 * Is created when necessary, or set in constructor for preconfigured events.
92 */ 92 */
93 _PendingEvents _pending; 93 _PendingEvents _pending;
94 94
95 _BufferingStreamSubscription(void onData(T data), 95 _BufferingStreamSubscription(void onData(T data),
96 void onError(error), 96 Function onError,
97 void onDone(), 97 void onDone(),
98 bool cancelOnError) 98 bool cancelOnError)
99 : _onData = Zone.current.registerUnaryCallback(onData), 99 : _onData = Zone.current.registerUnaryCallback(onData),
100 _onError = Zone.current.registerUnaryCallback(onError), 100 _onError = _registerErrorHandler(onError, Zone.current),
101 _onDone = Zone.current.registerCallback(onDone), 101 _onDone = Zone.current.registerCallback(onDone),
102 _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { 102 _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) {
103 assert(_onData != null); 103 assert(_onData != null);
104 assert(_onError != null); 104 assert(_onError != null);
105 assert(_onDone != null); 105 assert(_onDone != null);
106 } 106 }
107 107
108 /** 108 /**
109 * Sets the subscription's pending events object. 109 * Sets the subscription's pending events object.
110 * 110 *
(...skipping 20 matching lines...) Expand all
131 assert(_isCanceled); 131 assert(_isCanceled);
132 _PendingEvents events = _pending; 132 _PendingEvents events = _pending;
133 _pending = null; 133 _pending = null;
134 return events; 134 return events;
135 } 135 }
136 136
137 // StreamSubscription interface. 137 // StreamSubscription interface.
138 138
139 void onData(void handleData(T event)) { 139 void onData(void handleData(T event)) {
140 if (handleData == null) handleData = _nullDataHandler; 140 if (handleData == null) handleData = _nullDataHandler;
141 _onData = handleData; 141 _onData = Zone.current.registerUnaryCallback(handleData);
142 } 142 }
143 143
144 void onError(void handleError(error)) { 144 void onError(Function handleError) {
145 if (handleError == null) handleError = _nullErrorHandler; 145 if (handleError == null) handleError = _nullErrorHandler;
146 _onError = handleError; 146 _onError = _registerErrorHandler(handleError, Zone.current);
147 } 147 }
148 148
149 void onDone(void handleDone()) { 149 void onDone(void handleDone()) {
150 if (handleDone == null) handleDone = _nullDoneHandler; 150 if (handleDone == null) handleDone = _nullDoneHandler;
151 _onDone = handleDone; 151 _onDone = Zone.current.registerCallback(handleDone);
152 } 152 }
153 153
154 void pause([Future resumeSignal]) { 154 void pause([Future resumeSignal]) {
155 if (_isCanceled) return; 155 if (_isCanceled) return;
156 bool wasPaused = _isPaused; 156 bool wasPaused = _isPaused;
157 bool wasInputPaused = _isInputPaused; 157 bool wasInputPaused = _isInputPaused;
158 // Increment pause count and mark input paused (if it isn't already). 158 // Increment pause count and mark input paused (if it isn't already).
159 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; 159 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
160 if (resumeSignal != null) resumeSignal.whenComplete(resume); 160 if (resumeSignal != null) resumeSignal.whenComplete(resume);
161 if (!wasPaused && _pending != null) _pending.cancelSchedule(); 161 if (!wasPaused && _pending != null) _pending.cancelSchedule();
(...skipping 27 matching lines...) Expand all
189 _pending = null; 189 _pending = null;
190 _state &= ~_STATE_IN_CALLBACK; 190 _state &= ~_STATE_IN_CALLBACK;
191 } 191 }
192 } 192 }
193 193
194 Future asFuture([var futureValue]) { 194 Future asFuture([var futureValue]) {
195 _Future<T> result = new _Future<T>(); 195 _Future<T> result = new _Future<T>();
196 196
197 // Overwrite the onDone and onError handlers. 197 // Overwrite the onDone and onError handlers.
198 _onDone = () { result._complete(futureValue); }; 198 _onDone = () { result._complete(futureValue); };
199 _onError = (error) { 199 _onError = (error, stackTrace) {
200 cancel(); 200 cancel();
201 result._completeError(error); 201 result._completeError(error, stackTrace);
202 }; 202 };
203 203
204 return result; 204 return result;
205 } 205 }
206 206
207 // State management. 207 // State management.
208 208
209 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; 209 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0;
210 bool get _isClosed => (_state & _STATE_CLOSED) != 0; 210 bool get _isClosed => (_state & _STATE_CLOSED) != 0;
211 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; 211 bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
252 void _add(T data) { 252 void _add(T data) {
253 assert(!_isClosed); 253 assert(!_isClosed);
254 if (_isCanceled) return; 254 if (_isCanceled) return;
255 if (_canFire) { 255 if (_canFire) {
256 _sendData(data); 256 _sendData(data);
257 } else { 257 } else {
258 _addPending(new _DelayedData(data)); 258 _addPending(new _DelayedData(data));
259 } 259 }
260 } 260 }
261 261
262 void _addError(Object error) { 262 void _addError(Object error, StackTrace stackTrace) {
263 if (_isCanceled) return; 263 if (_isCanceled) return;
264 if (_canFire) { 264 if (_canFire) {
265 _sendError(error); // Reports cancel after sending. 265 _sendError(error, stackTrace); // Reports cancel after sending.
266 } else { 266 } else {
267 _addPending(new _DelayedError(error)); 267 _addPending(new _DelayedError(error, stackTrace));
268 } 268 }
269 } 269 }
270 270
271 void _close() { 271 void _close() {
272 assert(!_isClosed); 272 assert(!_isClosed);
273 if (_isCanceled) return; 273 if (_isCanceled) return;
274 _state |= _STATE_CLOSED; 274 _state |= _STATE_CLOSED;
275 if (_canFire) { 275 if (_canFire) {
276 _sendDone(); 276 _sendDone();
277 } else { 277 } else {
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
321 assert(!_isCanceled); 321 assert(!_isCanceled);
322 assert(!_isPaused); 322 assert(!_isPaused);
323 assert(!_inCallback); 323 assert(!_inCallback);
324 bool wasInputPaused = _isInputPaused; 324 bool wasInputPaused = _isInputPaused;
325 _state |= _STATE_IN_CALLBACK; 325 _state |= _STATE_IN_CALLBACK;
326 _zone.runUnaryGuarded(_onData, data); 326 _zone.runUnaryGuarded(_onData, data);
327 _state &= ~_STATE_IN_CALLBACK; 327 _state &= ~_STATE_IN_CALLBACK;
328 _checkState(wasInputPaused); 328 _checkState(wasInputPaused);
329 } 329 }
330 330
331 void _sendError(var error) { 331 void _sendError(var error, StackTrace stackTrace) {
332 assert(!_isCanceled); 332 assert(!_isCanceled);
333 assert(!_isPaused); 333 assert(!_isPaused);
334 assert(!_inCallback); 334 assert(!_inCallback);
335 bool wasInputPaused = _isInputPaused; 335 bool wasInputPaused = _isInputPaused;
336 _state |= _STATE_IN_CALLBACK; 336 _state |= _STATE_IN_CALLBACK;
337 if (!_zone.inSameErrorZone(Zone.current)) { 337 if (!_zone.inSameErrorZone(Zone.current)) {
338 // Errors are not allowed to traverse zone boundaries. 338 // Errors are not allowed to traverse zone boundaries.
339 Zone.current.handleUncaughtError(error, getAttachedStackTrace(error)); 339 Zone.current.handleUncaughtError(error, stackTrace);
340 } else if (_onError is ZoneBinaryCallback) {
341 _zone.runBinaryGuarded(_onError, error, stackTrace);
340 } else { 342 } else {
341 _zone.runUnaryGuarded(_onError, error); 343 _zone.runUnaryGuarded(_onError, error);
342 } 344 }
343 _state &= ~_STATE_IN_CALLBACK; 345 _state &= ~_STATE_IN_CALLBACK;
344 if (_cancelOnError) { 346 if (_cancelOnError) {
345 _cancel(); 347 _cancel();
346 } 348 }
347 _checkState(wasInputPaused); 349 _checkState(wasInputPaused);
348 } 350 }
349 351
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after
417 } 419 }
418 420
419 // ------------------------------------------------------------------- 421 // -------------------------------------------------------------------
420 // Common base class for single and multi-subscription streams. 422 // Common base class for single and multi-subscription streams.
421 // ------------------------------------------------------------------- 423 // -------------------------------------------------------------------
422 abstract class _StreamImpl<T> extends Stream<T> { 424 abstract class _StreamImpl<T> extends Stream<T> {
423 // ------------------------------------------------------------------ 425 // ------------------------------------------------------------------
424 // Stream interface. 426 // Stream interface.
425 427
426 StreamSubscription<T> listen(void onData(T data), 428 StreamSubscription<T> listen(void onData(T data),
427 { void onError(error), 429 { Function onError,
428 void onDone(), 430 void onDone(),
429 bool cancelOnError }) { 431 bool cancelOnError }) {
430 if (onData == null) onData = _nullDataHandler; 432 if (onData == null) onData = _nullDataHandler;
431 if (onError == null) onError = _nullErrorHandler; 433 if (onError == null) onError = _nullErrorHandler;
432 if (onDone == null) onDone = _nullDoneHandler; 434 if (onDone == null) onDone = _nullDoneHandler;
433 cancelOnError = identical(true, cancelOnError); 435 cancelOnError = identical(true, cancelOnError);
434 StreamSubscription subscription = 436 StreamSubscription subscription =
435 _createSubscription(onData, onError, onDone, cancelOnError); 437 _createSubscription(onData, onError, onDone, cancelOnError);
436 _onListen(subscription); 438 _onListen(subscription);
437 return subscription; 439 return subscription;
(...skipping 21 matching lines...) Expand all
459 final _EventGenerator _pending; 461 final _EventGenerator _pending;
460 /** 462 /**
461 * Initializes the stream to have only the events provided by a 463 * Initializes the stream to have only the events provided by a
462 * [_PendingEvents]. 464 * [_PendingEvents].
463 * 465 *
464 * A new [_PendingEvents] must be generated for each listen. 466 * A new [_PendingEvents] must be generated for each listen.
465 */ 467 */
466 _GeneratedStreamImpl(this._pending); 468 _GeneratedStreamImpl(this._pending);
467 469
468 StreamSubscription _createSubscription(void onData(T data), 470 StreamSubscription _createSubscription(void onData(T data),
469 void onError(Object error), 471 Function onError,
470 void onDone(), 472 void onDone(),
471 bool cancelOnError) { 473 bool cancelOnError) {
472 _BufferingStreamSubscription<T> subscription = 474 _BufferingStreamSubscription<T> subscription =
473 new _BufferingStreamSubscription( 475 new _BufferingStreamSubscription(
474 onData, onError, onDone, cancelOnError); 476 onData, onError, onDone, cancelOnError);
475 subscription._setPendingEvents(_pending()); 477 subscription._setPendingEvents(_pending());
476 return subscription; 478 return subscription;
477 } 479 }
478 } 480 }
479 481
(...skipping 15 matching lines...) Expand all
495 // Send one event per call to moveNext. 497 // Send one event per call to moveNext.
496 // If moveNext returns true, send the current element as data. 498 // If moveNext returns true, send the current element as data.
497 // If moveNext returns false, send a done event and clear the _iterator. 499 // If moveNext returns false, send a done event and clear the _iterator.
498 // If moveNext throws an error, send an error and clear the _iterator. 500 // If moveNext throws an error, send an error and clear the _iterator.
499 // After an error, no further events will be sent. 501 // After an error, no further events will be sent.
500 bool isDone; 502 bool isDone;
501 try { 503 try {
502 isDone = !_iterator.moveNext(); 504 isDone = !_iterator.moveNext();
503 } catch (e, s) { 505 } catch (e, s) {
504 _iterator = null; 506 _iterator = null;
505 dispatch._sendError(_asyncError(e, s)); 507 dispatch._sendError(_asyncError(e, s), s);
506 return; 508 return;
507 } 509 }
508 if (!isDone) { 510 if (!isDone) {
509 dispatch._sendData(_iterator.current); 511 dispatch._sendData(_iterator.current);
510 } else { 512 } else {
511 _iterator = null; 513 _iterator = null;
512 dispatch._sendDone(); 514 dispatch._sendDone();
513 } 515 }
514 } 516 }
515 517
516 void clear() { 518 void clear() {
517 if (isScheduled) cancelSchedule(); 519 if (isScheduled) cancelSchedule();
518 _iterator = null; 520 _iterator = null;
519 } 521 }
520 } 522 }
521 523
522 524
523 // Internal helpers. 525 // Internal helpers.
524 526
525 // Types of the different handlers on a stream. Types used to type fields. 527 // Types of the different handlers on a stream. Types used to type fields.
526 typedef void _DataHandler<T>(T value); 528 typedef void _DataHandler<T>(T value);
527 typedef void _ErrorHandler(error);
528 typedef void _DoneHandler(); 529 typedef void _DoneHandler();
529 530
530 531
531 /** Default data handler, does nothing. */ 532 /** Default data handler, does nothing. */
532 void _nullDataHandler(var value) {} 533 void _nullDataHandler(var value) {}
533 534
534 /** Default error handler, reports the error to the current zone's handler. */ 535 /** Default error handler, reports the error to the current zone's handler. */
535 void _nullErrorHandler(error) { 536 void _nullErrorHandler(error, [StackTrace stackTrace]) {
536 Zone.current.handleUncaughtError(error, getAttachedStackTrace(error)); 537 Zone.current.handleUncaughtError(error, stackTrace);
537 } 538 }
538 539
539 /** Default done handler, does nothing. */ 540 /** Default done handler, does nothing. */
540 void _nullDoneHandler() {} 541 void _nullDoneHandler() {}
541 542
542 543
543 /** A delayed event on a buffering stream subscription. */ 544 /** A delayed event on a buffering stream subscription. */
544 abstract class _DelayedEvent { 545 abstract class _DelayedEvent {
545 /** Added as a linked list on the [StreamController]. */ 546 /** Added as a linked list on the [StreamController]. */
546 _DelayedEvent next; 547 _DelayedEvent next;
547 /** Execute the delayed event on the [StreamController]. */ 548 /** Execute the delayed event on the [StreamController]. */
548 void perform(_EventDispatch dispatch); 549 void perform(_EventDispatch dispatch);
549 } 550 }
550 551
551 /** A delayed data event. */ 552 /** A delayed data event. */
552 class _DelayedData<T> extends _DelayedEvent { 553 class _DelayedData<T> extends _DelayedEvent {
553 final T value; 554 final T value;
554 _DelayedData(this.value); 555 _DelayedData(this.value);
555 void perform(_EventDispatch<T> dispatch) { 556 void perform(_EventDispatch<T> dispatch) {
556 dispatch._sendData(value); 557 dispatch._sendData(value);
557 } 558 }
558 } 559 }
559 560
560 /** A delayed error event. */ 561 /** A delayed error event. */
561 class _DelayedError extends _DelayedEvent { 562 class _DelayedError extends _DelayedEvent {
562 final error; 563 final error;
563 _DelayedError(this.error); 564 final StackTrace stackTrace;
565
566 _DelayedError(this.error, this.stackTrace);
564 void perform(_EventDispatch dispatch) { 567 void perform(_EventDispatch dispatch) {
565 dispatch._sendError(error); 568 dispatch._sendError(error, stackTrace);
566 } 569 }
567 } 570 }
568 571
569 /** A delayed done event. */ 572 /** A delayed done event. */
570 class _DelayedDone implements _DelayedEvent { 573 class _DelayedDone implements _DelayedEvent {
571 const _DelayedDone(); 574 const _DelayedDone();
572 void perform(_EventDispatch dispatch) { 575 void perform(_EventDispatch dispatch) {
573 dispatch._sendDone(); 576 dispatch._sendDone();
574 } 577 }
575 578
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
697 700
698 typedef void _broadcastCallback(StreamSubscription subscription); 701 typedef void _broadcastCallback(StreamSubscription subscription);
699 702
700 /** 703 /**
701 * Dummy subscription that will never receive any events. 704 * Dummy subscription that will never receive any events.
702 */ 705 */
703 class _DummyStreamSubscription<T> implements StreamSubscription<T> { 706 class _DummyStreamSubscription<T> implements StreamSubscription<T> {
704 int _pauseCounter = 0; 707 int _pauseCounter = 0;
705 708
706 void onData(void handleData(T data)) {} 709 void onData(void handleData(T data)) {}
707 void onError(void handleError(Object data)) {} 710 void onError(Function handleError) {}
708 void onDone(void handleDone()) {} 711 void onDone(void handleDone()) {}
709 712
710 void pause([Future resumeSignal]) { 713 void pause([Future resumeSignal]) {
711 _pauseCounter++; 714 _pauseCounter++;
712 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); 715 if (resumeSignal != null) resumeSignal.then((_) { resume(); });
713 } 716 }
714 void resume() { 717 void resume() {
715 if (_pauseCounter > 0) _pauseCounter--; 718 if (_pauseCounter > 0) _pauseCounter--;
716 } 719 }
717 void cancel() {} 720 void cancel() {}
(...skipping 16 matching lines...) Expand all
734 void onCancelHandler(StreamSubscription subscription)) 737 void onCancelHandler(StreamSubscription subscription))
735 : _onListenHandler = Zone.current.registerUnaryCallback(onListenHandler), 738 : _onListenHandler = Zone.current.registerUnaryCallback(onListenHandler),
736 _onCancelHandler = Zone.current.registerUnaryCallback(onCancelHandler), 739 _onCancelHandler = Zone.current.registerUnaryCallback(onCancelHandler),
737 _zone = Zone.current { 740 _zone = Zone.current {
738 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); 741 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel);
739 } 742 }
740 743
741 bool get isBroadcast => true; 744 bool get isBroadcast => true;
742 745
743 StreamSubscription<T> listen(void onData(T data), 746 StreamSubscription<T> listen(void onData(T data),
744 { void onError(Object error), 747 { Function onError,
745 void onDone(), 748 void onDone(),
746 bool cancelOnError}) { 749 bool cancelOnError}) {
747 if (_controller == null) { 750 if (_controller == null) {
748 // Return a dummy subscription backed by nothing, since 751 // Return a dummy subscription backed by nothing, since
749 // it won't ever receive any events. 752 // it won't ever receive any events.
750 return new _DummyStreamSubscription<T>(); 753 return new _DummyStreamSubscription<T>();
751 } 754 }
752 if (_subscription == null) { 755 if (_subscription == null) {
753 _subscription = _source.listen(_controller.add, 756 _subscription = _source.listen(_controller.add,
754 onError: _controller.addError, 757 onError: _controller.addError,
(...skipping 214 matching lines...) Expand 10 before | Expand all | Expand 10 after
969 _state = _STATE_FOUND; 972 _state = _STATE_FOUND;
970 hasNext._complete(true); 973 hasNext._complete(true);
971 return; 974 return;
972 } 975 }
973 _subscription.pause(); 976 _subscription.pause();
974 assert(_futureOrPrefetch == null); 977 assert(_futureOrPrefetch == null);
975 _futureOrPrefetch = data; 978 _futureOrPrefetch = data;
976 _state = _STATE_EXTRA_DATA; 979 _state = _STATE_EXTRA_DATA;
977 } 980 }
978 981
979 void _onError(Object error) { 982 void _onError(Object error, [StackTrace stackTrace]) {
980 if (_state == _STATE_MOVING) { 983 if (_state == _STATE_MOVING) {
981 _Future<bool> hasNext = _futureOrPrefetch; 984 _Future<bool> hasNext = _futureOrPrefetch;
982 // We have cancelOnError: true, so the subscription is canceled. 985 // We have cancelOnError: true, so the subscription is canceled.
983 _clear(); 986 _clear();
984 hasNext._completeError(error); 987 hasNext._completeError(error, stackTrace);
985 return; 988 return;
986 } 989 }
987 _subscription.pause(); 990 _subscription.pause();
988 assert(_futureOrPrefetch == null); 991 assert(_futureOrPrefetch == null);
989 _futureOrPrefetch = error; 992 _futureOrPrefetch = error;
990 _state = _STATE_EXTRA_ERROR; 993 _state = _STATE_EXTRA_ERROR;
991 } 994 }
992 995
993 void _onDone() { 996 void _onDone() {
994 if (_state == _STATE_MOVING) { 997 if (_state == _STATE_MOVING) {
995 _Future<bool> hasNext = _futureOrPrefetch; 998 _Future<bool> hasNext = _futureOrPrefetch;
996 _clear(); 999 _clear();
997 hasNext._complete(false); 1000 hasNext._complete(false);
998 return; 1001 return;
999 } 1002 }
1000 _subscription.pause(); 1003 _subscription.pause();
1001 _futureOrPrefetch = null; 1004 _futureOrPrefetch = null;
1002 _state = _STATE_EXTRA_DONE; 1005 _state = _STATE_EXTRA_DONE;
1003 } 1006 }
1004 } 1007 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698