OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
9 void _add(T data); | 9 void _add(T data); |
10 void _addError(Object error); | 10 void _addError(Object error, StackTrace stackTrace); |
11 void _close(); | 11 void _close(); |
12 } | 12 } |
13 | 13 |
14 /** | 14 /** |
15 * Abstract and private interface for a place to send events. | 15 * Abstract and private interface for a place to send events. |
16 * | 16 * |
17 * Used by event buffering to finally dispatch the pending event, where | 17 * Used by event buffering to finally dispatch the pending event, where |
18 * [_EventSink] is where the event first enters the stream subscription, | 18 * [_EventSink] is where the event first enters the stream subscription, |
19 * and may yet be buffered. | 19 * and may yet be buffered. |
20 */ | 20 */ |
21 abstract class _EventDispatch<T> { | 21 abstract class _EventDispatch<T> { |
22 void _sendData(T data); | 22 void _sendData(T data); |
23 void _sendError(Object error); | 23 void _sendError(Object error, StackTrace stackTrace); |
24 void _sendDone(); | 24 void _sendDone(); |
25 } | 25 } |
26 | 26 |
27 /** | 27 /** |
28 * Default implementation of stream subscription of buffering events. | 28 * Default implementation of stream subscription of buffering events. |
29 * | 29 * |
30 * The only public methods are those of [StreamSubscription], so instances of | 30 * The only public methods are those of [StreamSubscription], so instances of |
31 * [_BufferingStreamSubscription] can be returned directly as a | 31 * [_BufferingStreamSubscription] can be returned directly as a |
32 * [StreamSubscription] without exposing internal functionality. | 32 * [StreamSubscription] without exposing internal functionality. |
33 * | 33 * |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
71 * when `cancelOnError` is true. | 71 * when `cancelOnError` is true. |
72 */ | 72 */ |
73 static const int _STATE_CANCELED = 8; | 73 static const int _STATE_CANCELED = 8; |
74 static const int _STATE_IN_CALLBACK = 16; | 74 static const int _STATE_IN_CALLBACK = 16; |
75 static const int _STATE_HAS_PENDING = 32; | 75 static const int _STATE_HAS_PENDING = 32; |
76 static const int _STATE_PAUSE_COUNT = 64; | 76 static const int _STATE_PAUSE_COUNT = 64; |
77 static const int _STATE_PAUSE_COUNT_SHIFT = 6; | 77 static const int _STATE_PAUSE_COUNT_SHIFT = 6; |
78 | 78 |
79 /* Event handlers provided in constructor. */ | 79 /* Event handlers provided in constructor. */ |
80 _DataHandler<T> _onData; | 80 _DataHandler<T> _onData; |
81 _ErrorHandler _onError; | 81 Function _onError; |
82 _DoneHandler _onDone; | 82 _DoneHandler _onDone; |
83 final Zone _zone = Zone.current; | 83 final Zone _zone = Zone.current; |
84 | 84 |
85 /** Bit vector based on state-constants above. */ | 85 /** Bit vector based on state-constants above. */ |
86 int _state; | 86 int _state; |
87 | 87 |
88 /** | 88 /** |
89 * Queue of pending events. | 89 * Queue of pending events. |
90 * | 90 * |
91 * Is created when necessary, or set in constructor for preconfigured events. | 91 * Is created when necessary, or set in constructor for preconfigured events. |
92 */ | 92 */ |
93 _PendingEvents _pending; | 93 _PendingEvents _pending; |
94 | 94 |
95 _BufferingStreamSubscription(void onData(T data), | 95 _BufferingStreamSubscription(void onData(T data), |
96 void onError(error), | 96 Function onError, |
97 void onDone(), | 97 void onDone(), |
98 bool cancelOnError) | 98 bool cancelOnError) |
99 : _onData = Zone.current.registerUnaryCallback(onData), | 99 : _onData = Zone.current.registerUnaryCallback(onData), |
100 _onError = Zone.current.registerUnaryCallback(onError), | 100 _onError = _registerErrorCallback(onError), |
101 _onDone = Zone.current.registerCallback(onDone), | 101 _onDone = Zone.current.registerCallback(onDone), |
102 _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { | 102 _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { |
103 assert(_onData != null); | 103 assert(_onData != null); |
104 assert(_onError != null); | 104 assert(_onError != null); |
105 assert(_onDone != null); | 105 assert(_onDone != null); |
106 } | 106 } |
107 | 107 |
| 108 static _registerErrorCallback(Function errorCallback) { |
| 109 if (errorCallback is ZoneBinaryCallback) { |
| 110 return Zone.current.registerBinaryCallback(errorCallback); |
| 111 } else { |
| 112 return Zone.current.registerUnaryCallback(errorCallback); |
| 113 } |
| 114 } |
| 115 |
108 /** | 116 /** |
109 * Sets the subscription's pending events object. | 117 * Sets the subscription's pending events object. |
110 * | 118 * |
111 * This can only be done once. The pending events object is used for the | 119 * This can only be done once. The pending events object is used for the |
112 * rest of the subscription's life cycle. | 120 * rest of the subscription's life cycle. |
113 */ | 121 */ |
114 void _setPendingEvents(_PendingEvents pendingEvents) { | 122 void _setPendingEvents(_PendingEvents pendingEvents) { |
115 assert(_pending == null); | 123 assert(_pending == null); |
116 if (pendingEvents == null) return; | 124 if (pendingEvents == null) return; |
117 _pending = pendingEvents; | 125 _pending = pendingEvents; |
(...skipping 13 matching lines...) Expand all Loading... |
131 assert(_isCanceled); | 139 assert(_isCanceled); |
132 _PendingEvents events = _pending; | 140 _PendingEvents events = _pending; |
133 _pending = null; | 141 _pending = null; |
134 return events; | 142 return events; |
135 } | 143 } |
136 | 144 |
137 // StreamSubscription interface. | 145 // StreamSubscription interface. |
138 | 146 |
139 void onData(void handleData(T event)) { | 147 void onData(void handleData(T event)) { |
140 if (handleData == null) handleData = _nullDataHandler; | 148 if (handleData == null) handleData = _nullDataHandler; |
141 _onData = handleData; | 149 _onData = Zone.current.registerUnaryCallback(handleData); |
142 } | 150 } |
143 | 151 |
144 void onError(void handleError(error)) { | 152 void onError(Function handleError) { |
145 if (handleError == null) handleError = _nullErrorHandler; | 153 if (handleError == null) handleError = _nullErrorHandler; |
146 _onError = handleError; | 154 _onError = _registerErrorCallback(handleError); |
147 } | 155 } |
148 | 156 |
149 void onDone(void handleDone()) { | 157 void onDone(void handleDone()) { |
150 if (handleDone == null) handleDone = _nullDoneHandler; | 158 if (handleDone == null) handleDone = _nullDoneHandler; |
151 _onDone = handleDone; | 159 _onDone = Zone.current.registerCallback(handleDone); |
152 } | 160 } |
153 | 161 |
154 void pause([Future resumeSignal]) { | 162 void pause([Future resumeSignal]) { |
155 if (_isCanceled) return; | 163 if (_isCanceled) return; |
156 bool wasPaused = _isPaused; | 164 bool wasPaused = _isPaused; |
157 bool wasInputPaused = _isInputPaused; | 165 bool wasInputPaused = _isInputPaused; |
158 // Increment pause count and mark input paused (if it isn't already). | 166 // Increment pause count and mark input paused (if it isn't already). |
159 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | 167 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; |
160 if (resumeSignal != null) resumeSignal.whenComplete(resume); | 168 if (resumeSignal != null) resumeSignal.whenComplete(resume); |
161 if (!wasPaused && _pending != null) _pending.cancelSchedule(); | 169 if (!wasPaused && _pending != null) _pending.cancelSchedule(); |
(...skipping 27 matching lines...) Expand all Loading... |
189 _pending = null; | 197 _pending = null; |
190 _state &= ~_STATE_IN_CALLBACK; | 198 _state &= ~_STATE_IN_CALLBACK; |
191 } | 199 } |
192 } | 200 } |
193 | 201 |
194 Future asFuture([var futureValue]) { | 202 Future asFuture([var futureValue]) { |
195 _Future<T> result = new _Future<T>(); | 203 _Future<T> result = new _Future<T>(); |
196 | 204 |
197 // Overwrite the onDone and onError handlers. | 205 // Overwrite the onDone and onError handlers. |
198 _onDone = () { result._complete(futureValue); }; | 206 _onDone = () { result._complete(futureValue); }; |
199 _onError = (error) { | 207 _onError = (error, stackTrace) { |
200 cancel(); | 208 cancel(); |
201 result._completeError(error); | 209 result._completeError(error, stackTrace); |
202 }; | 210 }; |
203 | 211 |
204 return result; | 212 return result; |
205 } | 213 } |
206 | 214 |
207 // State management. | 215 // State management. |
208 | 216 |
209 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; | 217 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; |
210 bool get _isClosed => (_state & _STATE_CLOSED) != 0; | 218 bool get _isClosed => (_state & _STATE_CLOSED) != 0; |
211 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; | 219 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
252 void _add(T data) { | 260 void _add(T data) { |
253 assert(!_isClosed); | 261 assert(!_isClosed); |
254 if (_isCanceled) return; | 262 if (_isCanceled) return; |
255 if (_canFire) { | 263 if (_canFire) { |
256 _sendData(data); | 264 _sendData(data); |
257 } else { | 265 } else { |
258 _addPending(new _DelayedData(data)); | 266 _addPending(new _DelayedData(data)); |
259 } | 267 } |
260 } | 268 } |
261 | 269 |
262 void _addError(Object error) { | 270 void _addError(Object error, StackTrace stackTrace) { |
263 if (_isCanceled) return; | 271 if (_isCanceled) return; |
264 if (_canFire) { | 272 if (_canFire) { |
265 _sendError(error); // Reports cancel after sending. | 273 _sendError(error, stackTrace); // Reports cancel after sending. |
266 } else { | 274 } else { |
267 _addPending(new _DelayedError(error)); | 275 _addPending(new _DelayedError(error, stackTrace)); |
268 } | 276 } |
269 } | 277 } |
270 | 278 |
271 void _close() { | 279 void _close() { |
272 assert(!_isClosed); | 280 assert(!_isClosed); |
273 if (_isCanceled) return; | 281 if (_isCanceled) return; |
274 _state |= _STATE_CLOSED; | 282 _state |= _STATE_CLOSED; |
275 if (_canFire) { | 283 if (_canFire) { |
276 _sendDone(); | 284 _sendDone(); |
277 } else { | 285 } else { |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
321 assert(!_isCanceled); | 329 assert(!_isCanceled); |
322 assert(!_isPaused); | 330 assert(!_isPaused); |
323 assert(!_inCallback); | 331 assert(!_inCallback); |
324 bool wasInputPaused = _isInputPaused; | 332 bool wasInputPaused = _isInputPaused; |
325 _state |= _STATE_IN_CALLBACK; | 333 _state |= _STATE_IN_CALLBACK; |
326 _zone.runUnaryGuarded(_onData, data); | 334 _zone.runUnaryGuarded(_onData, data); |
327 _state &= ~_STATE_IN_CALLBACK; | 335 _state &= ~_STATE_IN_CALLBACK; |
328 _checkState(wasInputPaused); | 336 _checkState(wasInputPaused); |
329 } | 337 } |
330 | 338 |
331 void _sendError(var error) { | 339 void _sendError(var error, StackTrace stackTrace) { |
332 assert(!_isCanceled); | 340 assert(!_isCanceled); |
333 assert(!_isPaused); | 341 assert(!_isPaused); |
334 assert(!_inCallback); | 342 assert(!_inCallback); |
335 bool wasInputPaused = _isInputPaused; | 343 bool wasInputPaused = _isInputPaused; |
336 _state |= _STATE_IN_CALLBACK; | 344 _state |= _STATE_IN_CALLBACK; |
337 if (!_zone.inSameErrorZone(Zone.current)) { | 345 if (!_zone.inSameErrorZone(Zone.current)) { |
338 // Errors are not allowed to traverse zone boundaries. | 346 // Errors are not allowed to traverse zone boundaries. |
339 Zone.current.handleUncaughtError(error, getAttachedStackTrace(error)); | 347 Zone.current.handleUncaughtError(error, stackTrace); |
| 348 } else if (_onError is ZoneBinaryCallback) { |
| 349 _zone.runBinaryGuarded(_onError, error, stackTrace); |
340 } else { | 350 } else { |
341 _zone.runUnaryGuarded(_onError, error); | 351 _zone.runUnaryGuarded(_onError, error); |
342 } | 352 } |
343 _state &= ~_STATE_IN_CALLBACK; | 353 _state &= ~_STATE_IN_CALLBACK; |
344 if (_cancelOnError) { | 354 if (_cancelOnError) { |
345 _cancel(); | 355 _cancel(); |
346 } | 356 } |
347 _checkState(wasInputPaused); | 357 _checkState(wasInputPaused); |
348 } | 358 } |
349 | 359 |
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
417 } | 427 } |
418 | 428 |
419 // ------------------------------------------------------------------- | 429 // ------------------------------------------------------------------- |
420 // Common base class for single and multi-subscription streams. | 430 // Common base class for single and multi-subscription streams. |
421 // ------------------------------------------------------------------- | 431 // ------------------------------------------------------------------- |
422 abstract class _StreamImpl<T> extends Stream<T> { | 432 abstract class _StreamImpl<T> extends Stream<T> { |
423 // ------------------------------------------------------------------ | 433 // ------------------------------------------------------------------ |
424 // Stream interface. | 434 // Stream interface. |
425 | 435 |
426 StreamSubscription<T> listen(void onData(T data), | 436 StreamSubscription<T> listen(void onData(T data), |
427 { void onError(error), | 437 { Function onError, |
428 void onDone(), | 438 void onDone(), |
429 bool cancelOnError }) { | 439 bool cancelOnError }) { |
430 if (onData == null) onData = _nullDataHandler; | 440 if (onData == null) onData = _nullDataHandler; |
431 if (onError == null) onError = _nullErrorHandler; | 441 if (onError == null) onError = _nullErrorHandler; |
432 if (onDone == null) onDone = _nullDoneHandler; | 442 if (onDone == null) onDone = _nullDoneHandler; |
433 cancelOnError = identical(true, cancelOnError); | 443 cancelOnError = identical(true, cancelOnError); |
434 StreamSubscription subscription = | 444 StreamSubscription subscription = |
435 _createSubscription(onData, onError, onDone, cancelOnError); | 445 _createSubscription(onData, onError, onDone, cancelOnError); |
436 _onListen(subscription); | 446 _onListen(subscription); |
437 return subscription; | 447 return subscription; |
(...skipping 21 matching lines...) Expand all Loading... |
459 final _EventGenerator _pending; | 469 final _EventGenerator _pending; |
460 /** | 470 /** |
461 * Initializes the stream to have only the events provided by a | 471 * Initializes the stream to have only the events provided by a |
462 * [_PendingEvents]. | 472 * [_PendingEvents]. |
463 * | 473 * |
464 * A new [_PendingEvents] must be generated for each listen. | 474 * A new [_PendingEvents] must be generated for each listen. |
465 */ | 475 */ |
466 _GeneratedStreamImpl(this._pending); | 476 _GeneratedStreamImpl(this._pending); |
467 | 477 |
468 StreamSubscription _createSubscription(void onData(T data), | 478 StreamSubscription _createSubscription(void onData(T data), |
469 void onError(Object error), | 479 Function onError, |
470 void onDone(), | 480 void onDone(), |
471 bool cancelOnError) { | 481 bool cancelOnError) { |
472 _BufferingStreamSubscription<T> subscription = | 482 _BufferingStreamSubscription<T> subscription = |
473 new _BufferingStreamSubscription( | 483 new _BufferingStreamSubscription( |
474 onData, onError, onDone, cancelOnError); | 484 onData, onError, onDone, cancelOnError); |
475 subscription._setPendingEvents(_pending()); | 485 subscription._setPendingEvents(_pending()); |
476 return subscription; | 486 return subscription; |
477 } | 487 } |
478 } | 488 } |
479 | 489 |
(...skipping 15 matching lines...) Expand all Loading... |
495 // Send one event per call to moveNext. | 505 // Send one event per call to moveNext. |
496 // If moveNext returns true, send the current element as data. | 506 // If moveNext returns true, send the current element as data. |
497 // If moveNext returns false, send a done event and clear the _iterator. | 507 // If moveNext returns false, send a done event and clear the _iterator. |
498 // If moveNext throws an error, send an error and clear the _iterator. | 508 // If moveNext throws an error, send an error and clear the _iterator. |
499 // After an error, no further events will be sent. | 509 // After an error, no further events will be sent. |
500 bool isDone; | 510 bool isDone; |
501 try { | 511 try { |
502 isDone = !_iterator.moveNext(); | 512 isDone = !_iterator.moveNext(); |
503 } catch (e, s) { | 513 } catch (e, s) { |
504 _iterator = null; | 514 _iterator = null; |
505 dispatch._sendError(_asyncError(e, s)); | 515 dispatch._sendError(_asyncError(e, s), s); |
506 return; | 516 return; |
507 } | 517 } |
508 if (!isDone) { | 518 if (!isDone) { |
509 dispatch._sendData(_iterator.current); | 519 dispatch._sendData(_iterator.current); |
510 } else { | 520 } else { |
511 _iterator = null; | 521 _iterator = null; |
512 dispatch._sendDone(); | 522 dispatch._sendDone(); |
513 } | 523 } |
514 } | 524 } |
515 | 525 |
516 void clear() { | 526 void clear() { |
517 if (isScheduled) cancelSchedule(); | 527 if (isScheduled) cancelSchedule(); |
518 _iterator = null; | 528 _iterator = null; |
519 } | 529 } |
520 } | 530 } |
521 | 531 |
522 | 532 |
523 // Internal helpers. | 533 // Internal helpers. |
524 | 534 |
525 // Types of the different handlers on a stream. Types used to type fields. | 535 // Types of the different handlers on a stream. Types used to type fields. |
526 typedef void _DataHandler<T>(T value); | 536 typedef void _DataHandler<T>(T value); |
527 typedef void _ErrorHandler(error); | |
528 typedef void _DoneHandler(); | 537 typedef void _DoneHandler(); |
529 | 538 |
530 | 539 |
531 /** Default data handler, does nothing. */ | 540 /** Default data handler, does nothing. */ |
532 void _nullDataHandler(var value) {} | 541 void _nullDataHandler(var value) {} |
533 | 542 |
534 /** Default error handler, reports the error to the current zone's handler. */ | 543 /** Default error handler, reports the error to the current zone's handler. */ |
535 void _nullErrorHandler(error) { | 544 void _nullErrorHandler(error, [StackTrace stackTrace]) { |
536 Zone.current.handleUncaughtError(error, getAttachedStackTrace(error)); | 545 Zone.current.handleUncaughtError(error, stackTrace); |
537 } | 546 } |
538 | 547 |
539 /** Default done handler, does nothing. */ | 548 /** Default done handler, does nothing. */ |
540 void _nullDoneHandler() {} | 549 void _nullDoneHandler() {} |
541 | 550 |
542 | 551 |
543 /** A delayed event on a buffering stream subscription. */ | 552 /** A delayed event on a buffering stream subscription. */ |
544 abstract class _DelayedEvent { | 553 abstract class _DelayedEvent { |
545 /** Added as a linked list on the [StreamController]. */ | 554 /** Added as a linked list on the [StreamController]. */ |
546 _DelayedEvent next; | 555 _DelayedEvent next; |
547 /** Execute the delayed event on the [StreamController]. */ | 556 /** Execute the delayed event on the [StreamController]. */ |
548 void perform(_EventDispatch dispatch); | 557 void perform(_EventDispatch dispatch); |
549 } | 558 } |
550 | 559 |
551 /** A delayed data event. */ | 560 /** A delayed data event. */ |
552 class _DelayedData<T> extends _DelayedEvent { | 561 class _DelayedData<T> extends _DelayedEvent { |
553 final T value; | 562 final T value; |
554 _DelayedData(this.value); | 563 _DelayedData(this.value); |
555 void perform(_EventDispatch<T> dispatch) { | 564 void perform(_EventDispatch<T> dispatch) { |
556 dispatch._sendData(value); | 565 dispatch._sendData(value); |
557 } | 566 } |
558 } | 567 } |
559 | 568 |
560 /** A delayed error event. */ | 569 /** A delayed error event. */ |
561 class _DelayedError extends _DelayedEvent { | 570 class _DelayedError extends _DelayedEvent { |
562 final error; | 571 final error; |
563 _DelayedError(this.error); | 572 final StackTrace stackTrace; |
| 573 |
| 574 _DelayedError(this.error, this.stackTrace); |
564 void perform(_EventDispatch dispatch) { | 575 void perform(_EventDispatch dispatch) { |
565 dispatch._sendError(error); | 576 dispatch._sendError(error, stackTrace); |
566 } | 577 } |
567 } | 578 } |
568 | 579 |
569 /** A delayed done event. */ | 580 /** A delayed done event. */ |
570 class _DelayedDone implements _DelayedEvent { | 581 class _DelayedDone implements _DelayedEvent { |
571 const _DelayedDone(); | 582 const _DelayedDone(); |
572 void perform(_EventDispatch dispatch) { | 583 void perform(_EventDispatch dispatch) { |
573 dispatch._sendDone(); | 584 dispatch._sendDone(); |
574 } | 585 } |
575 | 586 |
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
697 | 708 |
698 typedef void _broadcastCallback(StreamSubscription subscription); | 709 typedef void _broadcastCallback(StreamSubscription subscription); |
699 | 710 |
700 /** | 711 /** |
701 * Dummy subscription that will never receive any events. | 712 * Dummy subscription that will never receive any events. |
702 */ | 713 */ |
703 class _DummyStreamSubscription<T> implements StreamSubscription<T> { | 714 class _DummyStreamSubscription<T> implements StreamSubscription<T> { |
704 int _pauseCounter = 0; | 715 int _pauseCounter = 0; |
705 | 716 |
706 void onData(void handleData(T data)) {} | 717 void onData(void handleData(T data)) {} |
707 void onError(void handleError(Object data)) {} | 718 void onError(Function handleError) {} |
708 void onDone(void handleDone()) {} | 719 void onDone(void handleDone()) {} |
709 | 720 |
710 void pause([Future resumeSignal]) { | 721 void pause([Future resumeSignal]) { |
711 _pauseCounter++; | 722 _pauseCounter++; |
712 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); | 723 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); |
713 } | 724 } |
714 void resume() { | 725 void resume() { |
715 if (_pauseCounter > 0) _pauseCounter--; | 726 if (_pauseCounter > 0) _pauseCounter--; |
716 } | 727 } |
717 void cancel() {} | 728 void cancel() {} |
(...skipping 16 matching lines...) Expand all Loading... |
734 void onCancelHandler(StreamSubscription subscription)) | 745 void onCancelHandler(StreamSubscription subscription)) |
735 : _onListenHandler = Zone.current.registerUnaryCallback(onListenHandler), | 746 : _onListenHandler = Zone.current.registerUnaryCallback(onListenHandler), |
736 _onCancelHandler = Zone.current.registerUnaryCallback(onCancelHandler), | 747 _onCancelHandler = Zone.current.registerUnaryCallback(onCancelHandler), |
737 _zone = Zone.current { | 748 _zone = Zone.current { |
738 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); | 749 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); |
739 } | 750 } |
740 | 751 |
741 bool get isBroadcast => true; | 752 bool get isBroadcast => true; |
742 | 753 |
743 StreamSubscription<T> listen(void onData(T data), | 754 StreamSubscription<T> listen(void onData(T data), |
744 { void onError(Object error), | 755 { Function onError, |
745 void onDone(), | 756 void onDone(), |
746 bool cancelOnError}) { | 757 bool cancelOnError}) { |
747 if (_controller == null) { | 758 if (_controller == null) { |
748 // Return a dummy subscription backed by nothing, since | 759 // Return a dummy subscription backed by nothing, since |
749 // it won't ever receive any events. | 760 // it won't ever receive any events. |
750 return new _DummyStreamSubscription<T>(); | 761 return new _DummyStreamSubscription<T>(); |
751 } | 762 } |
752 if (_subscription == null) { | 763 if (_subscription == null) { |
753 _subscription = _source.listen(_controller.add, | 764 _subscription = _source.listen(_controller.add, |
754 onError: _controller.addError, | 765 onError: _controller.addError, |
(...skipping 214 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
969 _state = _STATE_FOUND; | 980 _state = _STATE_FOUND; |
970 hasNext._complete(true); | 981 hasNext._complete(true); |
971 return; | 982 return; |
972 } | 983 } |
973 _subscription.pause(); | 984 _subscription.pause(); |
974 assert(_futureOrPrefetch == null); | 985 assert(_futureOrPrefetch == null); |
975 _futureOrPrefetch = data; | 986 _futureOrPrefetch = data; |
976 _state = _STATE_EXTRA_DATA; | 987 _state = _STATE_EXTRA_DATA; |
977 } | 988 } |
978 | 989 |
979 void _onError(Object error) { | 990 void _onError(Object error, [StackTrace stackTrace]) { |
980 if (_state == _STATE_MOVING) { | 991 if (_state == _STATE_MOVING) { |
981 _Future<bool> hasNext = _futureOrPrefetch; | 992 _Future<bool> hasNext = _futureOrPrefetch; |
982 // We have cancelOnError: true, so the subscription is canceled. | 993 // We have cancelOnError: true, so the subscription is canceled. |
983 _clear(); | 994 _clear(); |
984 hasNext._completeError(error); | 995 hasNext._completeError(error, stackTrace); |
985 return; | 996 return; |
986 } | 997 } |
987 _subscription.pause(); | 998 _subscription.pause(); |
988 assert(_futureOrPrefetch == null); | 999 assert(_futureOrPrefetch == null); |
989 _futureOrPrefetch = error; | 1000 _futureOrPrefetch = error; |
990 _state = _STATE_EXTRA_ERROR; | 1001 _state = _STATE_EXTRA_ERROR; |
991 } | 1002 } |
992 | 1003 |
993 void _onDone() { | 1004 void _onDone() { |
994 if (_state == _STATE_MOVING) { | 1005 if (_state == _STATE_MOVING) { |
995 _Future<bool> hasNext = _futureOrPrefetch; | 1006 _Future<bool> hasNext = _futureOrPrefetch; |
996 _clear(); | 1007 _clear(); |
997 hasNext._complete(false); | 1008 hasNext._complete(false); |
998 return; | 1009 return; |
999 } | 1010 } |
1000 _subscription.pause(); | 1011 _subscription.pause(); |
1001 _futureOrPrefetch = null; | 1012 _futureOrPrefetch = null; |
1002 _state = _STATE_EXTRA_DONE; | 1013 _state = _STATE_EXTRA_DONE; |
1003 } | 1014 } |
1004 } | 1015 } |
OLD | NEW |