OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
9 void _add(T data); | 9 void _add(T data); |
10 void _addError(Object error); | 10 void _addError(Object error, StackTrace stackTrace); |
11 void _close(); | 11 void _close(); |
12 } | 12 } |
13 | 13 |
14 /** | 14 /** |
15 * Abstract and private interface for a place to send events. | 15 * Abstract and private interface for a place to send events. |
16 * | 16 * |
17 * Used by event buffering to finally dispatch the pending event, where | 17 * Used by event buffering to finally dispatch the pending event, where |
18 * [_EventSink] is where the event first enters the stream subscription, | 18 * [_EventSink] is where the event first enters the stream subscription, |
19 * and may yet be buffered. | 19 * and may yet be buffered. |
20 */ | 20 */ |
21 abstract class _EventDispatch<T> { | 21 abstract class _EventDispatch<T> { |
22 void _sendData(T data); | 22 void _sendData(T data); |
23 void _sendError(Object error); | 23 void _sendError(Object error, StackTrace stackTrace); |
24 void _sendDone(); | 24 void _sendDone(); |
25 } | 25 } |
26 | 26 |
27 /** | 27 /** |
28 * Default implementation of stream subscription of buffering events. | 28 * Default implementation of stream subscription of buffering events. |
29 * | 29 * |
30 * The only public methods are those of [StreamSubscription], so instances of | 30 * The only public methods are those of [StreamSubscription], so instances of |
31 * [_BufferingStreamSubscription] can be returned directly as a | 31 * [_BufferingStreamSubscription] can be returned directly as a |
32 * [StreamSubscription] without exposing internal functionality. | 32 * [StreamSubscription] without exposing internal functionality. |
33 * | 33 * |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
71 * when `cancelOnError` is true. | 71 * when `cancelOnError` is true. |
72 */ | 72 */ |
73 static const int _STATE_CANCELED = 8; | 73 static const int _STATE_CANCELED = 8; |
74 static const int _STATE_IN_CALLBACK = 16; | 74 static const int _STATE_IN_CALLBACK = 16; |
75 static const int _STATE_HAS_PENDING = 32; | 75 static const int _STATE_HAS_PENDING = 32; |
76 static const int _STATE_PAUSE_COUNT = 64; | 76 static const int _STATE_PAUSE_COUNT = 64; |
77 static const int _STATE_PAUSE_COUNT_SHIFT = 6; | 77 static const int _STATE_PAUSE_COUNT_SHIFT = 6; |
78 | 78 |
79 /* Event handlers provided in constructor. */ | 79 /* Event handlers provided in constructor. */ |
80 _DataHandler<T> _onData; | 80 _DataHandler<T> _onData; |
81 _ErrorHandler _onError; | 81 Function _onError; |
82 _DoneHandler _onDone; | 82 _DoneHandler _onDone; |
83 final Zone _zone = Zone.current; | 83 final Zone _zone = Zone.current; |
84 | 84 |
85 /** Bit vector based on state-constants above. */ | 85 /** Bit vector based on state-constants above. */ |
86 int _state; | 86 int _state; |
87 | 87 |
88 /** | 88 /** |
89 * Queue of pending events. | 89 * Queue of pending events. |
90 * | 90 * |
91 * Is created when necessary, or set in constructor for preconfigured events. | 91 * Is created when necessary, or set in constructor for preconfigured events. |
92 */ | 92 */ |
93 _PendingEvents _pending; | 93 _PendingEvents _pending; |
94 | 94 |
95 _BufferingStreamSubscription(void onData(T data), | 95 _BufferingStreamSubscription(void onData(T data), |
96 void onError(error), | 96 Function onError, |
97 void onDone(), | 97 void onDone(), |
98 bool cancelOnError) | 98 bool cancelOnError) |
99 : _onData = Zone.current.registerUnaryCallback(onData), | 99 : _onData = Zone.current.registerUnaryCallback(onData), |
100 _onError = Zone.current.registerUnaryCallback(onError), | 100 _onError = _registerErrorHandler(onError, Zone.current), |
101 _onDone = Zone.current.registerCallback(onDone), | 101 _onDone = Zone.current.registerCallback(onDone), |
102 _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { | 102 _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { |
103 assert(_onData != null); | 103 assert(_onData != null); |
104 assert(_onError != null); | 104 assert(_onError != null); |
105 assert(_onDone != null); | 105 assert(_onDone != null); |
106 } | 106 } |
107 | 107 |
108 /** | 108 /** |
109 * Sets the subscription's pending events object. | 109 * Sets the subscription's pending events object. |
110 * | 110 * |
(...skipping 20 matching lines...) Expand all Loading... |
131 assert(_isCanceled); | 131 assert(_isCanceled); |
132 _PendingEvents events = _pending; | 132 _PendingEvents events = _pending; |
133 _pending = null; | 133 _pending = null; |
134 return events; | 134 return events; |
135 } | 135 } |
136 | 136 |
137 // StreamSubscription interface. | 137 // StreamSubscription interface. |
138 | 138 |
139 void onData(void handleData(T event)) { | 139 void onData(void handleData(T event)) { |
140 if (handleData == null) handleData = _nullDataHandler; | 140 if (handleData == null) handleData = _nullDataHandler; |
141 _onData = handleData; | 141 _onData = Zone.current.registerUnaryCallback(handleData); |
142 } | 142 } |
143 | 143 |
144 void onError(void handleError(error)) { | 144 void onError(Function handleError) { |
145 if (handleError == null) handleError = _nullErrorHandler; | 145 if (handleError == null) handleError = _nullErrorHandler; |
146 _onError = handleError; | 146 _onError = _registerErrorHandler(handleError, Zone.current); |
147 } | 147 } |
148 | 148 |
149 void onDone(void handleDone()) { | 149 void onDone(void handleDone()) { |
150 if (handleDone == null) handleDone = _nullDoneHandler; | 150 if (handleDone == null) handleDone = _nullDoneHandler; |
151 _onDone = handleDone; | 151 _onDone = Zone.current.registerCallback(handleDone); |
152 } | 152 } |
153 | 153 |
154 void pause([Future resumeSignal]) { | 154 void pause([Future resumeSignal]) { |
155 if (_isCanceled) return; | 155 if (_isCanceled) return; |
156 bool wasPaused = _isPaused; | 156 bool wasPaused = _isPaused; |
157 bool wasInputPaused = _isInputPaused; | 157 bool wasInputPaused = _isInputPaused; |
158 // Increment pause count and mark input paused (if it isn't already). | 158 // Increment pause count and mark input paused (if it isn't already). |
159 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | 159 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; |
160 if (resumeSignal != null) resumeSignal.whenComplete(resume); | 160 if (resumeSignal != null) resumeSignal.whenComplete(resume); |
161 if (!wasPaused && _pending != null) _pending.cancelSchedule(); | 161 if (!wasPaused && _pending != null) _pending.cancelSchedule(); |
(...skipping 27 matching lines...) Expand all Loading... |
189 _pending = null; | 189 _pending = null; |
190 _state &= ~_STATE_IN_CALLBACK; | 190 _state &= ~_STATE_IN_CALLBACK; |
191 } | 191 } |
192 } | 192 } |
193 | 193 |
194 Future asFuture([var futureValue]) { | 194 Future asFuture([var futureValue]) { |
195 _Future<T> result = new _Future<T>(); | 195 _Future<T> result = new _Future<T>(); |
196 | 196 |
197 // Overwrite the onDone and onError handlers. | 197 // Overwrite the onDone and onError handlers. |
198 _onDone = () { result._complete(futureValue); }; | 198 _onDone = () { result._complete(futureValue); }; |
199 _onError = (error) { | 199 _onError = (error, stackTrace) { |
200 cancel(); | 200 cancel(); |
201 result._completeError(error); | 201 result._completeError(error, stackTrace); |
202 }; | 202 }; |
203 | 203 |
204 return result; | 204 return result; |
205 } | 205 } |
206 | 206 |
207 // State management. | 207 // State management. |
208 | 208 |
209 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; | 209 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; |
210 bool get _isClosed => (_state & _STATE_CLOSED) != 0; | 210 bool get _isClosed => (_state & _STATE_CLOSED) != 0; |
211 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; | 211 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
252 void _add(T data) { | 252 void _add(T data) { |
253 assert(!_isClosed); | 253 assert(!_isClosed); |
254 if (_isCanceled) return; | 254 if (_isCanceled) return; |
255 if (_canFire) { | 255 if (_canFire) { |
256 _sendData(data); | 256 _sendData(data); |
257 } else { | 257 } else { |
258 _addPending(new _DelayedData(data)); | 258 _addPending(new _DelayedData(data)); |
259 } | 259 } |
260 } | 260 } |
261 | 261 |
262 void _addError(Object error) { | 262 void _addError(Object error, StackTrace stackTrace) { |
263 if (_isCanceled) return; | 263 if (_isCanceled) return; |
264 if (_canFire) { | 264 if (_canFire) { |
265 _sendError(error); // Reports cancel after sending. | 265 _sendError(error, stackTrace); // Reports cancel after sending. |
266 } else { | 266 } else { |
267 _addPending(new _DelayedError(error)); | 267 _addPending(new _DelayedError(error, stackTrace)); |
268 } | 268 } |
269 } | 269 } |
270 | 270 |
271 void _close() { | 271 void _close() { |
272 assert(!_isClosed); | 272 assert(!_isClosed); |
273 if (_isCanceled) return; | 273 if (_isCanceled) return; |
274 _state |= _STATE_CLOSED; | 274 _state |= _STATE_CLOSED; |
275 if (_canFire) { | 275 if (_canFire) { |
276 _sendDone(); | 276 _sendDone(); |
277 } else { | 277 } else { |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
321 assert(!_isCanceled); | 321 assert(!_isCanceled); |
322 assert(!_isPaused); | 322 assert(!_isPaused); |
323 assert(!_inCallback); | 323 assert(!_inCallback); |
324 bool wasInputPaused = _isInputPaused; | 324 bool wasInputPaused = _isInputPaused; |
325 _state |= _STATE_IN_CALLBACK; | 325 _state |= _STATE_IN_CALLBACK; |
326 _zone.runUnaryGuarded(_onData, data); | 326 _zone.runUnaryGuarded(_onData, data); |
327 _state &= ~_STATE_IN_CALLBACK; | 327 _state &= ~_STATE_IN_CALLBACK; |
328 _checkState(wasInputPaused); | 328 _checkState(wasInputPaused); |
329 } | 329 } |
330 | 330 |
331 void _sendError(var error) { | 331 void _sendError(var error, StackTrace stackTrace) { |
332 assert(!_isCanceled); | 332 assert(!_isCanceled); |
333 assert(!_isPaused); | 333 assert(!_isPaused); |
334 assert(!_inCallback); | 334 assert(!_inCallback); |
335 bool wasInputPaused = _isInputPaused; | 335 bool wasInputPaused = _isInputPaused; |
336 _state |= _STATE_IN_CALLBACK; | 336 _state |= _STATE_IN_CALLBACK; |
337 if (!_zone.inSameErrorZone(Zone.current)) { | 337 if (!_zone.inSameErrorZone(Zone.current)) { |
338 // Errors are not allowed to traverse zone boundaries. | 338 // Errors are not allowed to traverse zone boundaries. |
339 Zone.current.handleUncaughtError(error, getAttachedStackTrace(error)); | 339 Zone.current.handleUncaughtError(error, stackTrace); |
| 340 } else if (_onError is ZoneBinaryCallback) { |
| 341 _zone.runBinaryGuarded(_onError, error, stackTrace); |
340 } else { | 342 } else { |
341 _zone.runUnaryGuarded(_onError, error); | 343 _zone.runUnaryGuarded(_onError, error); |
342 } | 344 } |
343 _state &= ~_STATE_IN_CALLBACK; | 345 _state &= ~_STATE_IN_CALLBACK; |
344 if (_cancelOnError) { | 346 if (_cancelOnError) { |
345 _cancel(); | 347 _cancel(); |
346 } | 348 } |
347 _checkState(wasInputPaused); | 349 _checkState(wasInputPaused); |
348 } | 350 } |
349 | 351 |
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
417 } | 419 } |
418 | 420 |
419 // ------------------------------------------------------------------- | 421 // ------------------------------------------------------------------- |
420 // Common base class for single and multi-subscription streams. | 422 // Common base class for single and multi-subscription streams. |
421 // ------------------------------------------------------------------- | 423 // ------------------------------------------------------------------- |
422 abstract class _StreamImpl<T> extends Stream<T> { | 424 abstract class _StreamImpl<T> extends Stream<T> { |
423 // ------------------------------------------------------------------ | 425 // ------------------------------------------------------------------ |
424 // Stream interface. | 426 // Stream interface. |
425 | 427 |
426 StreamSubscription<T> listen(void onData(T data), | 428 StreamSubscription<T> listen(void onData(T data), |
427 { void onError(error), | 429 { Function onError, |
428 void onDone(), | 430 void onDone(), |
429 bool cancelOnError }) { | 431 bool cancelOnError }) { |
430 if (onData == null) onData = _nullDataHandler; | 432 if (onData == null) onData = _nullDataHandler; |
431 if (onError == null) onError = _nullErrorHandler; | 433 if (onError == null) onError = _nullErrorHandler; |
432 if (onDone == null) onDone = _nullDoneHandler; | 434 if (onDone == null) onDone = _nullDoneHandler; |
433 cancelOnError = identical(true, cancelOnError); | 435 cancelOnError = identical(true, cancelOnError); |
434 StreamSubscription subscription = | 436 StreamSubscription subscription = |
435 _createSubscription(onData, onError, onDone, cancelOnError); | 437 _createSubscription(onData, onError, onDone, cancelOnError); |
436 _onListen(subscription); | 438 _onListen(subscription); |
437 return subscription; | 439 return subscription; |
(...skipping 21 matching lines...) Expand all Loading... |
459 final _EventGenerator _pending; | 461 final _EventGenerator _pending; |
460 /** | 462 /** |
461 * Initializes the stream to have only the events provided by a | 463 * Initializes the stream to have only the events provided by a |
462 * [_PendingEvents]. | 464 * [_PendingEvents]. |
463 * | 465 * |
464 * A new [_PendingEvents] must be generated for each listen. | 466 * A new [_PendingEvents] must be generated for each listen. |
465 */ | 467 */ |
466 _GeneratedStreamImpl(this._pending); | 468 _GeneratedStreamImpl(this._pending); |
467 | 469 |
468 StreamSubscription _createSubscription(void onData(T data), | 470 StreamSubscription _createSubscription(void onData(T data), |
469 void onError(Object error), | 471 Function onError, |
470 void onDone(), | 472 void onDone(), |
471 bool cancelOnError) { | 473 bool cancelOnError) { |
472 _BufferingStreamSubscription<T> subscription = | 474 _BufferingStreamSubscription<T> subscription = |
473 new _BufferingStreamSubscription( | 475 new _BufferingStreamSubscription( |
474 onData, onError, onDone, cancelOnError); | 476 onData, onError, onDone, cancelOnError); |
475 subscription._setPendingEvents(_pending()); | 477 subscription._setPendingEvents(_pending()); |
476 return subscription; | 478 return subscription; |
477 } | 479 } |
478 } | 480 } |
479 | 481 |
(...skipping 15 matching lines...) Expand all Loading... |
495 // Send one event per call to moveNext. | 497 // Send one event per call to moveNext. |
496 // If moveNext returns true, send the current element as data. | 498 // If moveNext returns true, send the current element as data. |
497 // If moveNext returns false, send a done event and clear the _iterator. | 499 // If moveNext returns false, send a done event and clear the _iterator. |
498 // If moveNext throws an error, send an error and clear the _iterator. | 500 // If moveNext throws an error, send an error and clear the _iterator. |
499 // After an error, no further events will be sent. | 501 // After an error, no further events will be sent. |
500 bool isDone; | 502 bool isDone; |
501 try { | 503 try { |
502 isDone = !_iterator.moveNext(); | 504 isDone = !_iterator.moveNext(); |
503 } catch (e, s) { | 505 } catch (e, s) { |
504 _iterator = null; | 506 _iterator = null; |
505 dispatch._sendError(_asyncError(e, s)); | 507 dispatch._sendError(_asyncError(e, s), s); |
506 return; | 508 return; |
507 } | 509 } |
508 if (!isDone) { | 510 if (!isDone) { |
509 dispatch._sendData(_iterator.current); | 511 dispatch._sendData(_iterator.current); |
510 } else { | 512 } else { |
511 _iterator = null; | 513 _iterator = null; |
512 dispatch._sendDone(); | 514 dispatch._sendDone(); |
513 } | 515 } |
514 } | 516 } |
515 | 517 |
516 void clear() { | 518 void clear() { |
517 if (isScheduled) cancelSchedule(); | 519 if (isScheduled) cancelSchedule(); |
518 _iterator = null; | 520 _iterator = null; |
519 } | 521 } |
520 } | 522 } |
521 | 523 |
522 | 524 |
523 // Internal helpers. | 525 // Internal helpers. |
524 | 526 |
525 // Types of the different handlers on a stream. Types used to type fields. | 527 // Types of the different handlers on a stream. Types used to type fields. |
526 typedef void _DataHandler<T>(T value); | 528 typedef void _DataHandler<T>(T value); |
527 typedef void _ErrorHandler(error); | |
528 typedef void _DoneHandler(); | 529 typedef void _DoneHandler(); |
529 | 530 |
530 | 531 |
531 /** Default data handler, does nothing. */ | 532 /** Default data handler, does nothing. */ |
532 void _nullDataHandler(var value) {} | 533 void _nullDataHandler(var value) {} |
533 | 534 |
534 /** Default error handler, reports the error to the current zone's handler. */ | 535 /** Default error handler, reports the error to the current zone's handler. */ |
535 void _nullErrorHandler(error) { | 536 void _nullErrorHandler(error, [StackTrace stackTrace]) { |
536 Zone.current.handleUncaughtError(error, getAttachedStackTrace(error)); | 537 Zone.current.handleUncaughtError(error, stackTrace); |
537 } | 538 } |
538 | 539 |
539 /** Default done handler, does nothing. */ | 540 /** Default done handler, does nothing. */ |
540 void _nullDoneHandler() {} | 541 void _nullDoneHandler() {} |
541 | 542 |
542 | 543 |
543 /** A delayed event on a buffering stream subscription. */ | 544 /** A delayed event on a buffering stream subscription. */ |
544 abstract class _DelayedEvent { | 545 abstract class _DelayedEvent { |
545 /** Added as a linked list on the [StreamController]. */ | 546 /** Added as a linked list on the [StreamController]. */ |
546 _DelayedEvent next; | 547 _DelayedEvent next; |
547 /** Execute the delayed event on the [StreamController]. */ | 548 /** Execute the delayed event on the [StreamController]. */ |
548 void perform(_EventDispatch dispatch); | 549 void perform(_EventDispatch dispatch); |
549 } | 550 } |
550 | 551 |
551 /** A delayed data event. */ | 552 /** A delayed data event. */ |
552 class _DelayedData<T> extends _DelayedEvent { | 553 class _DelayedData<T> extends _DelayedEvent { |
553 final T value; | 554 final T value; |
554 _DelayedData(this.value); | 555 _DelayedData(this.value); |
555 void perform(_EventDispatch<T> dispatch) { | 556 void perform(_EventDispatch<T> dispatch) { |
556 dispatch._sendData(value); | 557 dispatch._sendData(value); |
557 } | 558 } |
558 } | 559 } |
559 | 560 |
560 /** A delayed error event. */ | 561 /** A delayed error event. */ |
561 class _DelayedError extends _DelayedEvent { | 562 class _DelayedError extends _DelayedEvent { |
562 final error; | 563 final error; |
563 _DelayedError(this.error); | 564 final StackTrace stackTrace; |
| 565 |
| 566 _DelayedError(this.error, this.stackTrace); |
564 void perform(_EventDispatch dispatch) { | 567 void perform(_EventDispatch dispatch) { |
565 dispatch._sendError(error); | 568 dispatch._sendError(error, stackTrace); |
566 } | 569 } |
567 } | 570 } |
568 | 571 |
569 /** A delayed done event. */ | 572 /** A delayed done event. */ |
570 class _DelayedDone implements _DelayedEvent { | 573 class _DelayedDone implements _DelayedEvent { |
571 const _DelayedDone(); | 574 const _DelayedDone(); |
572 void perform(_EventDispatch dispatch) { | 575 void perform(_EventDispatch dispatch) { |
573 dispatch._sendDone(); | 576 dispatch._sendDone(); |
574 } | 577 } |
575 | 578 |
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
697 | 700 |
698 typedef void _broadcastCallback(StreamSubscription subscription); | 701 typedef void _broadcastCallback(StreamSubscription subscription); |
699 | 702 |
700 /** | 703 /** |
701 * Dummy subscription that will never receive any events. | 704 * Dummy subscription that will never receive any events. |
702 */ | 705 */ |
703 class _DummyStreamSubscription<T> implements StreamSubscription<T> { | 706 class _DummyStreamSubscription<T> implements StreamSubscription<T> { |
704 int _pauseCounter = 0; | 707 int _pauseCounter = 0; |
705 | 708 |
706 void onData(void handleData(T data)) {} | 709 void onData(void handleData(T data)) {} |
707 void onError(void handleError(Object data)) {} | 710 void onError(Function handleError) {} |
708 void onDone(void handleDone()) {} | 711 void onDone(void handleDone()) {} |
709 | 712 |
710 void pause([Future resumeSignal]) { | 713 void pause([Future resumeSignal]) { |
711 _pauseCounter++; | 714 _pauseCounter++; |
712 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); | 715 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); |
713 } | 716 } |
714 void resume() { | 717 void resume() { |
715 if (_pauseCounter > 0) _pauseCounter--; | 718 if (_pauseCounter > 0) _pauseCounter--; |
716 } | 719 } |
717 void cancel() {} | 720 void cancel() {} |
(...skipping 16 matching lines...) Expand all Loading... |
734 void onCancelHandler(StreamSubscription subscription)) | 737 void onCancelHandler(StreamSubscription subscription)) |
735 : _onListenHandler = Zone.current.registerUnaryCallback(onListenHandler), | 738 : _onListenHandler = Zone.current.registerUnaryCallback(onListenHandler), |
736 _onCancelHandler = Zone.current.registerUnaryCallback(onCancelHandler), | 739 _onCancelHandler = Zone.current.registerUnaryCallback(onCancelHandler), |
737 _zone = Zone.current { | 740 _zone = Zone.current { |
738 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); | 741 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); |
739 } | 742 } |
740 | 743 |
741 bool get isBroadcast => true; | 744 bool get isBroadcast => true; |
742 | 745 |
743 StreamSubscription<T> listen(void onData(T data), | 746 StreamSubscription<T> listen(void onData(T data), |
744 { void onError(Object error), | 747 { Function onError, |
745 void onDone(), | 748 void onDone(), |
746 bool cancelOnError}) { | 749 bool cancelOnError}) { |
747 if (_controller == null) { | 750 if (_controller == null) { |
748 // Return a dummy subscription backed by nothing, since | 751 // Return a dummy subscription backed by nothing, since |
749 // it won't ever receive any events. | 752 // it won't ever receive any events. |
750 return new _DummyStreamSubscription<T>(); | 753 return new _DummyStreamSubscription<T>(); |
751 } | 754 } |
752 if (_subscription == null) { | 755 if (_subscription == null) { |
753 _subscription = _source.listen(_controller.add, | 756 _subscription = _source.listen(_controller.add, |
754 onError: _controller.addError, | 757 onError: _controller.addError, |
(...skipping 214 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
969 _state = _STATE_FOUND; | 972 _state = _STATE_FOUND; |
970 hasNext._complete(true); | 973 hasNext._complete(true); |
971 return; | 974 return; |
972 } | 975 } |
973 _subscription.pause(); | 976 _subscription.pause(); |
974 assert(_futureOrPrefetch == null); | 977 assert(_futureOrPrefetch == null); |
975 _futureOrPrefetch = data; | 978 _futureOrPrefetch = data; |
976 _state = _STATE_EXTRA_DATA; | 979 _state = _STATE_EXTRA_DATA; |
977 } | 980 } |
978 | 981 |
979 void _onError(Object error) { | 982 void _onError(Object error, [StackTrace stackTrace]) { |
980 if (_state == _STATE_MOVING) { | 983 if (_state == _STATE_MOVING) { |
981 _Future<bool> hasNext = _futureOrPrefetch; | 984 _Future<bool> hasNext = _futureOrPrefetch; |
982 // We have cancelOnError: true, so the subscription is canceled. | 985 // We have cancelOnError: true, so the subscription is canceled. |
983 _clear(); | 986 _clear(); |
984 hasNext._completeError(error); | 987 hasNext._completeError(error, stackTrace); |
985 return; | 988 return; |
986 } | 989 } |
987 _subscription.pause(); | 990 _subscription.pause(); |
988 assert(_futureOrPrefetch == null); | 991 assert(_futureOrPrefetch == null); |
989 _futureOrPrefetch = error; | 992 _futureOrPrefetch = error; |
990 _state = _STATE_EXTRA_ERROR; | 993 _state = _STATE_EXTRA_ERROR; |
991 } | 994 } |
992 | 995 |
993 void _onDone() { | 996 void _onDone() { |
994 if (_state == _STATE_MOVING) { | 997 if (_state == _STATE_MOVING) { |
995 _Future<bool> hasNext = _futureOrPrefetch; | 998 _Future<bool> hasNext = _futureOrPrefetch; |
996 _clear(); | 999 _clear(); |
997 hasNext._complete(false); | 1000 hasNext._complete(false); |
998 return; | 1001 return; |
999 } | 1002 } |
1000 _subscription.pause(); | 1003 _subscription.pause(); |
1001 _futureOrPrefetch = null; | 1004 _futureOrPrefetch = null; |
1002 _state = _STATE_EXTRA_DONE; | 1005 _state = _STATE_EXTRA_DONE; |
1003 } | 1006 } |
1004 } | 1007 } |
OLD | NEW |