Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(98)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 25094002: Adapt streams for additional stackTrace argument. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Upload Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Abstract and private interface for a place to put events. */ 7 /** Abstract and private interface for a place to put events. */
8 abstract class _EventSink<T> { 8 abstract class _EventSink<T> {
9 void _add(T data); 9 void _add(T data);
10 void _addError(Object error); 10 void _addError(Object error, StackTrace stackTrace);
11 void _close(); 11 void _close();
12 } 12 }
13 13
14 /** 14 /**
15 * Abstract and private interface for a place to send events. 15 * Abstract and private interface for a place to send events.
16 * 16 *
17 * Used by event buffering to finally dispatch the pending event, where 17 * Used by event buffering to finally dispatch the pending event, where
18 * [_EventSink] is where the event first enters the stream subscription, 18 * [_EventSink] is where the event first enters the stream subscription,
19 * and may yet be buffered. 19 * and may yet be buffered.
20 */ 20 */
21 abstract class _EventDispatch<T> { 21 abstract class _EventDispatch<T> {
22 void _sendData(T data); 22 void _sendData(T data);
23 void _sendError(Object error); 23 void _sendError(Object error, StackTrace stackTrace);
24 void _sendDone(); 24 void _sendDone();
25 } 25 }
26 26
27 /** 27 /**
28 * Default implementation of stream subscription of buffering events. 28 * Default implementation of stream subscription of buffering events.
29 * 29 *
30 * The only public methods are those of [StreamSubscription], so instances of 30 * The only public methods are those of [StreamSubscription], so instances of
31 * [_BufferingStreamSubscription] can be returned directly as a 31 * [_BufferingStreamSubscription] can be returned directly as a
32 * [StreamSubscription] without exposing internal functionality. 32 * [StreamSubscription] without exposing internal functionality.
33 * 33 *
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
71 * when `cancelOnError` is true. 71 * when `cancelOnError` is true.
72 */ 72 */
73 static const int _STATE_CANCELED = 8; 73 static const int _STATE_CANCELED = 8;
74 static const int _STATE_IN_CALLBACK = 16; 74 static const int _STATE_IN_CALLBACK = 16;
75 static const int _STATE_HAS_PENDING = 32; 75 static const int _STATE_HAS_PENDING = 32;
76 static const int _STATE_PAUSE_COUNT = 64; 76 static const int _STATE_PAUSE_COUNT = 64;
77 static const int _STATE_PAUSE_COUNT_SHIFT = 6; 77 static const int _STATE_PAUSE_COUNT_SHIFT = 6;
78 78
79 /* Event handlers provided in constructor. */ 79 /* Event handlers provided in constructor. */
80 _DataHandler<T> _onData; 80 _DataHandler<T> _onData;
81 _ErrorHandler _onError; 81 Function _onError;
82 _DoneHandler _onDone; 82 _DoneHandler _onDone;
83 final Zone _zone = Zone.current; 83 final Zone _zone = Zone.current;
84 84
85 /** Bit vector based on state-constants above. */ 85 /** Bit vector based on state-constants above. */
86 int _state; 86 int _state;
87 87
88 /** 88 /**
89 * Queue of pending events. 89 * Queue of pending events.
90 * 90 *
91 * Is created when necessary, or set in constructor for preconfigured events. 91 * Is created when necessary, or set in constructor for preconfigured events.
92 */ 92 */
93 _PendingEvents _pending; 93 _PendingEvents _pending;
94 94
95 _BufferingStreamSubscription(void onData(T data), 95 _BufferingStreamSubscription(void onData(T data),
96 void onError(error), 96 Function onError,
97 void onDone(), 97 void onDone(),
98 bool cancelOnError) 98 bool cancelOnError)
99 : _onData = Zone.current.registerUnaryCallback(onData), 99 : _onData = Zone.current.registerUnaryCallback(onData),
100 _onError = Zone.current.registerUnaryCallback(onError), 100 _onError = _registerErrorCallback(onError),
101 _onDone = Zone.current.registerCallback(onDone), 101 _onDone = Zone.current.registerCallback(onDone),
102 _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { 102 _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) {
103 assert(_onData != null); 103 assert(_onData != null);
104 assert(_onError != null); 104 assert(_onError != null);
105 assert(_onDone != null); 105 assert(_onDone != null);
106 } 106 }
107 107
108 static _registerErrorCallback(Function errorCallback) {
109 if (errorCallback is ZoneBinaryCallback) {
110 return Zone.current.registerBinaryCallback(errorCallback);
111 } else {
112 return Zone.current.registerUnaryCallback(errorCallback);
113 }
114 }
115
108 /** 116 /**
109 * Sets the subscription's pending events object. 117 * Sets the subscription's pending events object.
110 * 118 *
111 * This can only be done once. The pending events object is used for the 119 * This can only be done once. The pending events object is used for the
112 * rest of the subscription's life cycle. 120 * rest of the subscription's life cycle.
113 */ 121 */
114 void _setPendingEvents(_PendingEvents pendingEvents) { 122 void _setPendingEvents(_PendingEvents pendingEvents) {
115 assert(_pending == null); 123 assert(_pending == null);
116 if (pendingEvents == null) return; 124 if (pendingEvents == null) return;
117 _pending = pendingEvents; 125 _pending = pendingEvents;
(...skipping 13 matching lines...) Expand all
131 assert(_isCanceled); 139 assert(_isCanceled);
132 _PendingEvents events = _pending; 140 _PendingEvents events = _pending;
133 _pending = null; 141 _pending = null;
134 return events; 142 return events;
135 } 143 }
136 144
137 // StreamSubscription interface. 145 // StreamSubscription interface.
138 146
139 void onData(void handleData(T event)) { 147 void onData(void handleData(T event)) {
140 if (handleData == null) handleData = _nullDataHandler; 148 if (handleData == null) handleData = _nullDataHandler;
141 _onData = handleData; 149 _onData = Zone.current.registerUnaryCallback(handleData);
142 } 150 }
143 151
144 void onError(void handleError(error)) { 152 void onError(Function handleError) {
145 if (handleError == null) handleError = _nullErrorHandler; 153 if (handleError == null) handleError = _nullErrorHandler;
146 _onError = handleError; 154 _onError = _registerErrorCallback(handleError);
147 } 155 }
148 156
149 void onDone(void handleDone()) { 157 void onDone(void handleDone()) {
150 if (handleDone == null) handleDone = _nullDoneHandler; 158 if (handleDone == null) handleDone = _nullDoneHandler;
151 _onDone = handleDone; 159 _onDone = Zone.current.registerCallback(handleDone);
152 } 160 }
153 161
154 void pause([Future resumeSignal]) { 162 void pause([Future resumeSignal]) {
155 if (_isCanceled) return; 163 if (_isCanceled) return;
156 bool wasPaused = _isPaused; 164 bool wasPaused = _isPaused;
157 bool wasInputPaused = _isInputPaused; 165 bool wasInputPaused = _isInputPaused;
158 // Increment pause count and mark input paused (if it isn't already). 166 // Increment pause count and mark input paused (if it isn't already).
159 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; 167 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
160 if (resumeSignal != null) resumeSignal.whenComplete(resume); 168 if (resumeSignal != null) resumeSignal.whenComplete(resume);
161 if (!wasPaused && _pending != null) _pending.cancelSchedule(); 169 if (!wasPaused && _pending != null) _pending.cancelSchedule();
(...skipping 27 matching lines...) Expand all
189 _pending = null; 197 _pending = null;
190 _state &= ~_STATE_IN_CALLBACK; 198 _state &= ~_STATE_IN_CALLBACK;
191 } 199 }
192 } 200 }
193 201
194 Future asFuture([var futureValue]) { 202 Future asFuture([var futureValue]) {
195 _Future<T> result = new _Future<T>(); 203 _Future<T> result = new _Future<T>();
196 204
197 // Overwrite the onDone and onError handlers. 205 // Overwrite the onDone and onError handlers.
198 _onDone = () { result._complete(futureValue); }; 206 _onDone = () { result._complete(futureValue); };
199 _onError = (error) { 207 _onError = (error, stackTrace) {
200 cancel(); 208 cancel();
201 result._completeError(error); 209 result._completeError(error, stackTrace);
202 }; 210 };
203 211
204 return result; 212 return result;
205 } 213 }
206 214
207 // State management. 215 // State management.
208 216
209 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; 217 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0;
210 bool get _isClosed => (_state & _STATE_CLOSED) != 0; 218 bool get _isClosed => (_state & _STATE_CLOSED) != 0;
211 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; 219 bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
252 void _add(T data) { 260 void _add(T data) {
253 assert(!_isClosed); 261 assert(!_isClosed);
254 if (_isCanceled) return; 262 if (_isCanceled) return;
255 if (_canFire) { 263 if (_canFire) {
256 _sendData(data); 264 _sendData(data);
257 } else { 265 } else {
258 _addPending(new _DelayedData(data)); 266 _addPending(new _DelayedData(data));
259 } 267 }
260 } 268 }
261 269
262 void _addError(Object error) { 270 void _addError(Object error, StackTrace stackTrace) {
263 if (_isCanceled) return; 271 if (_isCanceled) return;
264 if (_canFire) { 272 if (_canFire) {
265 _sendError(error); // Reports cancel after sending. 273 _sendError(error, stackTrace); // Reports cancel after sending.
266 } else { 274 } else {
267 _addPending(new _DelayedError(error)); 275 _addPending(new _DelayedError(error, stackTrace));
268 } 276 }
269 } 277 }
270 278
271 void _close() { 279 void _close() {
272 assert(!_isClosed); 280 assert(!_isClosed);
273 if (_isCanceled) return; 281 if (_isCanceled) return;
274 _state |= _STATE_CLOSED; 282 _state |= _STATE_CLOSED;
275 if (_canFire) { 283 if (_canFire) {
276 _sendDone(); 284 _sendDone();
277 } else { 285 } else {
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
321 assert(!_isCanceled); 329 assert(!_isCanceled);
322 assert(!_isPaused); 330 assert(!_isPaused);
323 assert(!_inCallback); 331 assert(!_inCallback);
324 bool wasInputPaused = _isInputPaused; 332 bool wasInputPaused = _isInputPaused;
325 _state |= _STATE_IN_CALLBACK; 333 _state |= _STATE_IN_CALLBACK;
326 _zone.runUnaryGuarded(_onData, data); 334 _zone.runUnaryGuarded(_onData, data);
327 _state &= ~_STATE_IN_CALLBACK; 335 _state &= ~_STATE_IN_CALLBACK;
328 _checkState(wasInputPaused); 336 _checkState(wasInputPaused);
329 } 337 }
330 338
331 void _sendError(var error) { 339 void _sendError(var error, StackTrace stackTrace) {
332 assert(!_isCanceled); 340 assert(!_isCanceled);
333 assert(!_isPaused); 341 assert(!_isPaused);
334 assert(!_inCallback); 342 assert(!_inCallback);
335 bool wasInputPaused = _isInputPaused; 343 bool wasInputPaused = _isInputPaused;
336 _state |= _STATE_IN_CALLBACK; 344 _state |= _STATE_IN_CALLBACK;
337 if (!_zone.inSameErrorZone(Zone.current)) { 345 if (!_zone.inSameErrorZone(Zone.current)) {
338 // Errors are not allowed to traverse zone boundaries. 346 // Errors are not allowed to traverse zone boundaries.
339 Zone.current.handleUncaughtError(error, getAttachedStackTrace(error)); 347 Zone.current.handleUncaughtError(error, stackTrace);
348 } else if (_onError is ZoneBinaryCallback) {
349 _zone.runBinaryGuarded(_onError, error, stackTrace);
340 } else { 350 } else {
341 _zone.runUnaryGuarded(_onError, error); 351 _zone.runUnaryGuarded(_onError, error);
342 } 352 }
343 _state &= ~_STATE_IN_CALLBACK; 353 _state &= ~_STATE_IN_CALLBACK;
344 if (_cancelOnError) { 354 if (_cancelOnError) {
345 _cancel(); 355 _cancel();
346 } 356 }
347 _checkState(wasInputPaused); 357 _checkState(wasInputPaused);
348 } 358 }
349 359
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after
417 } 427 }
418 428
419 // ------------------------------------------------------------------- 429 // -------------------------------------------------------------------
420 // Common base class for single and multi-subscription streams. 430 // Common base class for single and multi-subscription streams.
421 // ------------------------------------------------------------------- 431 // -------------------------------------------------------------------
422 abstract class _StreamImpl<T> extends Stream<T> { 432 abstract class _StreamImpl<T> extends Stream<T> {
423 // ------------------------------------------------------------------ 433 // ------------------------------------------------------------------
424 // Stream interface. 434 // Stream interface.
425 435
426 StreamSubscription<T> listen(void onData(T data), 436 StreamSubscription<T> listen(void onData(T data),
427 { void onError(error), 437 { Function onError,
428 void onDone(), 438 void onDone(),
429 bool cancelOnError }) { 439 bool cancelOnError }) {
430 if (onData == null) onData = _nullDataHandler; 440 if (onData == null) onData = _nullDataHandler;
431 if (onError == null) onError = _nullErrorHandler; 441 if (onError == null) onError = _nullErrorHandler;
432 if (onDone == null) onDone = _nullDoneHandler; 442 if (onDone == null) onDone = _nullDoneHandler;
433 cancelOnError = identical(true, cancelOnError); 443 cancelOnError = identical(true, cancelOnError);
434 StreamSubscription subscription = 444 StreamSubscription subscription =
435 _createSubscription(onData, onError, onDone, cancelOnError); 445 _createSubscription(onData, onError, onDone, cancelOnError);
436 _onListen(subscription); 446 _onListen(subscription);
437 return subscription; 447 return subscription;
(...skipping 21 matching lines...) Expand all
459 final _EventGenerator _pending; 469 final _EventGenerator _pending;
460 /** 470 /**
461 * Initializes the stream to have only the events provided by a 471 * Initializes the stream to have only the events provided by a
462 * [_PendingEvents]. 472 * [_PendingEvents].
463 * 473 *
464 * A new [_PendingEvents] must be generated for each listen. 474 * A new [_PendingEvents] must be generated for each listen.
465 */ 475 */
466 _GeneratedStreamImpl(this._pending); 476 _GeneratedStreamImpl(this._pending);
467 477
468 StreamSubscription _createSubscription(void onData(T data), 478 StreamSubscription _createSubscription(void onData(T data),
469 void onError(Object error), 479 Function onError,
470 void onDone(), 480 void onDone(),
471 bool cancelOnError) { 481 bool cancelOnError) {
472 _BufferingStreamSubscription<T> subscription = 482 _BufferingStreamSubscription<T> subscription =
473 new _BufferingStreamSubscription( 483 new _BufferingStreamSubscription(
474 onData, onError, onDone, cancelOnError); 484 onData, onError, onDone, cancelOnError);
475 subscription._setPendingEvents(_pending()); 485 subscription._setPendingEvents(_pending());
476 return subscription; 486 return subscription;
477 } 487 }
478 } 488 }
479 489
(...skipping 15 matching lines...) Expand all
495 // Send one event per call to moveNext. 505 // Send one event per call to moveNext.
496 // If moveNext returns true, send the current element as data. 506 // If moveNext returns true, send the current element as data.
497 // If moveNext returns false, send a done event and clear the _iterator. 507 // If moveNext returns false, send a done event and clear the _iterator.
498 // If moveNext throws an error, send an error and clear the _iterator. 508 // If moveNext throws an error, send an error and clear the _iterator.
499 // After an error, no further events will be sent. 509 // After an error, no further events will be sent.
500 bool isDone; 510 bool isDone;
501 try { 511 try {
502 isDone = !_iterator.moveNext(); 512 isDone = !_iterator.moveNext();
503 } catch (e, s) { 513 } catch (e, s) {
504 _iterator = null; 514 _iterator = null;
505 dispatch._sendError(_asyncError(e, s)); 515 dispatch._sendError(_asyncError(e, s), s);
506 return; 516 return;
507 } 517 }
508 if (!isDone) { 518 if (!isDone) {
509 dispatch._sendData(_iterator.current); 519 dispatch._sendData(_iterator.current);
510 } else { 520 } else {
511 _iterator = null; 521 _iterator = null;
512 dispatch._sendDone(); 522 dispatch._sendDone();
513 } 523 }
514 } 524 }
515 525
516 void clear() { 526 void clear() {
517 if (isScheduled) cancelSchedule(); 527 if (isScheduled) cancelSchedule();
518 _iterator = null; 528 _iterator = null;
519 } 529 }
520 } 530 }
521 531
522 532
523 // Internal helpers. 533 // Internal helpers.
524 534
525 // Types of the different handlers on a stream. Types used to type fields. 535 // Types of the different handlers on a stream. Types used to type fields.
526 typedef void _DataHandler<T>(T value); 536 typedef void _DataHandler<T>(T value);
527 typedef void _ErrorHandler(error);
528 typedef void _DoneHandler(); 537 typedef void _DoneHandler();
529 538
530 539
531 /** Default data handler, does nothing. */ 540 /** Default data handler, does nothing. */
532 void _nullDataHandler(var value) {} 541 void _nullDataHandler(var value) {}
533 542
534 /** Default error handler, reports the error to the current zone's handler. */ 543 /** Default error handler, reports the error to the current zone's handler. */
535 void _nullErrorHandler(error) { 544 void _nullErrorHandler(error, [StackTrace stackTrace]) {
536 Zone.current.handleUncaughtError(error, getAttachedStackTrace(error)); 545 Zone.current.handleUncaughtError(error, stackTrace);
537 } 546 }
538 547
539 /** Default done handler, does nothing. */ 548 /** Default done handler, does nothing. */
540 void _nullDoneHandler() {} 549 void _nullDoneHandler() {}
541 550
542 551
543 /** A delayed event on a buffering stream subscription. */ 552 /** A delayed event on a buffering stream subscription. */
544 abstract class _DelayedEvent { 553 abstract class _DelayedEvent {
545 /** Added as a linked list on the [StreamController]. */ 554 /** Added as a linked list on the [StreamController]. */
546 _DelayedEvent next; 555 _DelayedEvent next;
547 /** Execute the delayed event on the [StreamController]. */ 556 /** Execute the delayed event on the [StreamController]. */
548 void perform(_EventDispatch dispatch); 557 void perform(_EventDispatch dispatch);
549 } 558 }
550 559
551 /** A delayed data event. */ 560 /** A delayed data event. */
552 class _DelayedData<T> extends _DelayedEvent { 561 class _DelayedData<T> extends _DelayedEvent {
553 final T value; 562 final T value;
554 _DelayedData(this.value); 563 _DelayedData(this.value);
555 void perform(_EventDispatch<T> dispatch) { 564 void perform(_EventDispatch<T> dispatch) {
556 dispatch._sendData(value); 565 dispatch._sendData(value);
557 } 566 }
558 } 567 }
559 568
560 /** A delayed error event. */ 569 /** A delayed error event. */
561 class _DelayedError extends _DelayedEvent { 570 class _DelayedError extends _DelayedEvent {
562 final error; 571 final error;
563 _DelayedError(this.error); 572 final StackTrace stackTrace;
573
574 _DelayedError(this.error, this.stackTrace);
564 void perform(_EventDispatch dispatch) { 575 void perform(_EventDispatch dispatch) {
565 dispatch._sendError(error); 576 dispatch._sendError(error, stackTrace);
566 } 577 }
567 } 578 }
568 579
569 /** A delayed done event. */ 580 /** A delayed done event. */
570 class _DelayedDone implements _DelayedEvent { 581 class _DelayedDone implements _DelayedEvent {
571 const _DelayedDone(); 582 const _DelayedDone();
572 void perform(_EventDispatch dispatch) { 583 void perform(_EventDispatch dispatch) {
573 dispatch._sendDone(); 584 dispatch._sendDone();
574 } 585 }
575 586
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
697 708
698 typedef void _broadcastCallback(StreamSubscription subscription); 709 typedef void _broadcastCallback(StreamSubscription subscription);
699 710
700 /** 711 /**
701 * Dummy subscription that will never receive any events. 712 * Dummy subscription that will never receive any events.
702 */ 713 */
703 class _DummyStreamSubscription<T> implements StreamSubscription<T> { 714 class _DummyStreamSubscription<T> implements StreamSubscription<T> {
704 int _pauseCounter = 0; 715 int _pauseCounter = 0;
705 716
706 void onData(void handleData(T data)) {} 717 void onData(void handleData(T data)) {}
707 void onError(void handleError(Object data)) {} 718 void onError(Function handleError) {}
708 void onDone(void handleDone()) {} 719 void onDone(void handleDone()) {}
709 720
710 void pause([Future resumeSignal]) { 721 void pause([Future resumeSignal]) {
711 _pauseCounter++; 722 _pauseCounter++;
712 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); 723 if (resumeSignal != null) resumeSignal.then((_) { resume(); });
713 } 724 }
714 void resume() { 725 void resume() {
715 if (_pauseCounter > 0) _pauseCounter--; 726 if (_pauseCounter > 0) _pauseCounter--;
716 } 727 }
717 void cancel() {} 728 void cancel() {}
(...skipping 16 matching lines...) Expand all
734 void onCancelHandler(StreamSubscription subscription)) 745 void onCancelHandler(StreamSubscription subscription))
735 : _onListenHandler = Zone.current.registerUnaryCallback(onListenHandler), 746 : _onListenHandler = Zone.current.registerUnaryCallback(onListenHandler),
736 _onCancelHandler = Zone.current.registerUnaryCallback(onCancelHandler), 747 _onCancelHandler = Zone.current.registerUnaryCallback(onCancelHandler),
737 _zone = Zone.current { 748 _zone = Zone.current {
738 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); 749 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel);
739 } 750 }
740 751
741 bool get isBroadcast => true; 752 bool get isBroadcast => true;
742 753
743 StreamSubscription<T> listen(void onData(T data), 754 StreamSubscription<T> listen(void onData(T data),
744 { void onError(Object error), 755 { Function onError,
745 void onDone(), 756 void onDone(),
746 bool cancelOnError}) { 757 bool cancelOnError}) {
747 if (_controller == null) { 758 if (_controller == null) {
748 // Return a dummy subscription backed by nothing, since 759 // Return a dummy subscription backed by nothing, since
749 // it won't ever receive any events. 760 // it won't ever receive any events.
750 return new _DummyStreamSubscription<T>(); 761 return new _DummyStreamSubscription<T>();
751 } 762 }
752 if (_subscription == null) { 763 if (_subscription == null) {
753 _subscription = _source.listen(_controller.add, 764 _subscription = _source.listen(_controller.add,
754 onError: _controller.addError, 765 onError: _controller.addError,
(...skipping 214 matching lines...) Expand 10 before | Expand all | Expand 10 after
969 _state = _STATE_FOUND; 980 _state = _STATE_FOUND;
970 hasNext._complete(true); 981 hasNext._complete(true);
971 return; 982 return;
972 } 983 }
973 _subscription.pause(); 984 _subscription.pause();
974 assert(_futureOrPrefetch == null); 985 assert(_futureOrPrefetch == null);
975 _futureOrPrefetch = data; 986 _futureOrPrefetch = data;
976 _state = _STATE_EXTRA_DATA; 987 _state = _STATE_EXTRA_DATA;
977 } 988 }
978 989
979 void _onError(Object error) { 990 void _onError(Object error, [StackTrace stackTrace]) {
980 if (_state == _STATE_MOVING) { 991 if (_state == _STATE_MOVING) {
981 _Future<bool> hasNext = _futureOrPrefetch; 992 _Future<bool> hasNext = _futureOrPrefetch;
982 // We have cancelOnError: true, so the subscription is canceled. 993 // We have cancelOnError: true, so the subscription is canceled.
983 _clear(); 994 _clear();
984 hasNext._completeError(error); 995 hasNext._completeError(error, stackTrace);
985 return; 996 return;
986 } 997 }
987 _subscription.pause(); 998 _subscription.pause();
988 assert(_futureOrPrefetch == null); 999 assert(_futureOrPrefetch == null);
989 _futureOrPrefetch = error; 1000 _futureOrPrefetch = error;
990 _state = _STATE_EXTRA_ERROR; 1001 _state = _STATE_EXTRA_ERROR;
991 } 1002 }
992 1003
993 void _onDone() { 1004 void _onDone() {
994 if (_state == _STATE_MOVING) { 1005 if (_state == _STATE_MOVING) {
995 _Future<bool> hasNext = _futureOrPrefetch; 1006 _Future<bool> hasNext = _futureOrPrefetch;
996 _clear(); 1007 _clear();
997 hasNext._complete(false); 1008 hasNext._complete(false);
998 return; 1009 return;
999 } 1010 }
1000 _subscription.pause(); 1011 _subscription.pause();
1001 _futureOrPrefetch = null; 1012 _futureOrPrefetch = null;
1002 _state = _STATE_EXTRA_DONE; 1013 _state = _STATE_EXTRA_DONE;
1003 } 1014 }
1004 } 1015 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698