| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
| 8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
| 9 void _add(T data); | 9 void _add(T data); |
| 10 void _addError(Object error); | 10 void _addError(Object error, StackTrace stackTrace); |
| 11 void _close(); | 11 void _close(); |
| 12 } | 12 } |
| 13 | 13 |
| 14 /** | 14 /** |
| 15 * Abstract and private interface for a place to send events. | 15 * Abstract and private interface for a place to send events. |
| 16 * | 16 * |
| 17 * Used by event buffering to finally dispatch the pending event, where | 17 * Used by event buffering to finally dispatch the pending event, where |
| 18 * [_EventSink] is where the event first enters the stream subscription, | 18 * [_EventSink] is where the event first enters the stream subscription, |
| 19 * and may yet be buffered. | 19 * and may yet be buffered. |
| 20 */ | 20 */ |
| 21 abstract class _EventDispatch<T> { | 21 abstract class _EventDispatch<T> { |
| 22 void _sendData(T data); | 22 void _sendData(T data); |
| 23 void _sendError(Object error); | 23 void _sendError(Object error, StackTrace stackTrace); |
| 24 void _sendDone(); | 24 void _sendDone(); |
| 25 } | 25 } |
| 26 | 26 |
| 27 /** | 27 /** |
| 28 * Default implementation of stream subscription of buffering events. | 28 * Default implementation of stream subscription of buffering events. |
| 29 * | 29 * |
| 30 * The only public methods are those of [StreamSubscription], so instances of | 30 * The only public methods are those of [StreamSubscription], so instances of |
| 31 * [_BufferingStreamSubscription] can be returned directly as a | 31 * [_BufferingStreamSubscription] can be returned directly as a |
| 32 * [StreamSubscription] without exposing internal functionality. | 32 * [StreamSubscription] without exposing internal functionality. |
| 33 * | 33 * |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 71 * when `cancelOnError` is true. | 71 * when `cancelOnError` is true. |
| 72 */ | 72 */ |
| 73 static const int _STATE_CANCELED = 8; | 73 static const int _STATE_CANCELED = 8; |
| 74 static const int _STATE_IN_CALLBACK = 16; | 74 static const int _STATE_IN_CALLBACK = 16; |
| 75 static const int _STATE_HAS_PENDING = 32; | 75 static const int _STATE_HAS_PENDING = 32; |
| 76 static const int _STATE_PAUSE_COUNT = 64; | 76 static const int _STATE_PAUSE_COUNT = 64; |
| 77 static const int _STATE_PAUSE_COUNT_SHIFT = 6; | 77 static const int _STATE_PAUSE_COUNT_SHIFT = 6; |
| 78 | 78 |
| 79 /* Event handlers provided in constructor. */ | 79 /* Event handlers provided in constructor. */ |
| 80 _DataHandler<T> _onData; | 80 _DataHandler<T> _onData; |
| 81 _ErrorHandler _onError; | 81 Function _onError; |
| 82 _DoneHandler _onDone; | 82 _DoneHandler _onDone; |
| 83 final Zone _zone = Zone.current; | 83 final Zone _zone = Zone.current; |
| 84 | 84 |
| 85 /** Bit vector based on state-constants above. */ | 85 /** Bit vector based on state-constants above. */ |
| 86 int _state; | 86 int _state; |
| 87 | 87 |
| 88 /** | 88 /** |
| 89 * Queue of pending events. | 89 * Queue of pending events. |
| 90 * | 90 * |
| 91 * Is created when necessary, or set in constructor for preconfigured events. | 91 * Is created when necessary, or set in constructor for preconfigured events. |
| 92 */ | 92 */ |
| 93 _PendingEvents _pending; | 93 _PendingEvents _pending; |
| 94 | 94 |
| 95 _BufferingStreamSubscription(void onData(T data), | 95 _BufferingStreamSubscription(void onData(T data), |
| 96 void onError(error), | 96 Function onError, |
| 97 void onDone(), | 97 void onDone(), |
| 98 bool cancelOnError) | 98 bool cancelOnError) |
| 99 : _onData = Zone.current.registerUnaryCallback(onData), | 99 : _onData = Zone.current.registerUnaryCallback(onData), |
| 100 _onError = Zone.current.registerUnaryCallback(onError), | 100 _onError = _registerErrorHandler(onError, Zone.current), |
| 101 _onDone = Zone.current.registerCallback(onDone), | 101 _onDone = Zone.current.registerCallback(onDone), |
| 102 _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { | 102 _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { |
| 103 assert(_onData != null); | 103 assert(_onData != null); |
| 104 assert(_onError != null); | 104 assert(_onError != null); |
| 105 assert(_onDone != null); | 105 assert(_onDone != null); |
| 106 } | 106 } |
| 107 | 107 |
| 108 /** | 108 /** |
| 109 * Sets the subscription's pending events object. | 109 * Sets the subscription's pending events object. |
| 110 * | 110 * |
| (...skipping 20 matching lines...) Expand all Loading... |
| 131 assert(_isCanceled); | 131 assert(_isCanceled); |
| 132 _PendingEvents events = _pending; | 132 _PendingEvents events = _pending; |
| 133 _pending = null; | 133 _pending = null; |
| 134 return events; | 134 return events; |
| 135 } | 135 } |
| 136 | 136 |
| 137 // StreamSubscription interface. | 137 // StreamSubscription interface. |
| 138 | 138 |
| 139 void onData(void handleData(T event)) { | 139 void onData(void handleData(T event)) { |
| 140 if (handleData == null) handleData = _nullDataHandler; | 140 if (handleData == null) handleData = _nullDataHandler; |
| 141 _onData = handleData; | 141 _onData = Zone.current.registerUnaryCallback(handleData); |
| 142 } | 142 } |
| 143 | 143 |
| 144 void onError(void handleError(error)) { | 144 void onError(Function handleError) { |
| 145 if (handleError == null) handleError = _nullErrorHandler; | 145 if (handleError == null) handleError = _nullErrorHandler; |
| 146 _onError = handleError; | 146 _onError = _registerErrorHandler(handleError, Zone.current); |
| 147 } | 147 } |
| 148 | 148 |
| 149 void onDone(void handleDone()) { | 149 void onDone(void handleDone()) { |
| 150 if (handleDone == null) handleDone = _nullDoneHandler; | 150 if (handleDone == null) handleDone = _nullDoneHandler; |
| 151 _onDone = handleDone; | 151 _onDone = Zone.current.registerCallback(handleDone); |
| 152 } | 152 } |
| 153 | 153 |
| 154 void pause([Future resumeSignal]) { | 154 void pause([Future resumeSignal]) { |
| 155 if (_isCanceled) return; | 155 if (_isCanceled) return; |
| 156 bool wasPaused = _isPaused; | 156 bool wasPaused = _isPaused; |
| 157 bool wasInputPaused = _isInputPaused; | 157 bool wasInputPaused = _isInputPaused; |
| 158 // Increment pause count and mark input paused (if it isn't already). | 158 // Increment pause count and mark input paused (if it isn't already). |
| 159 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | 159 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; |
| 160 if (resumeSignal != null) resumeSignal.whenComplete(resume); | 160 if (resumeSignal != null) resumeSignal.whenComplete(resume); |
| 161 if (!wasPaused && _pending != null) _pending.cancelSchedule(); | 161 if (!wasPaused && _pending != null) _pending.cancelSchedule(); |
| (...skipping 27 matching lines...) Expand all Loading... |
| 189 _pending = null; | 189 _pending = null; |
| 190 _state &= ~_STATE_IN_CALLBACK; | 190 _state &= ~_STATE_IN_CALLBACK; |
| 191 } | 191 } |
| 192 } | 192 } |
| 193 | 193 |
| 194 Future asFuture([var futureValue]) { | 194 Future asFuture([var futureValue]) { |
| 195 _Future<T> result = new _Future<T>(); | 195 _Future<T> result = new _Future<T>(); |
| 196 | 196 |
| 197 // Overwrite the onDone and onError handlers. | 197 // Overwrite the onDone and onError handlers. |
| 198 _onDone = () { result._complete(futureValue); }; | 198 _onDone = () { result._complete(futureValue); }; |
| 199 _onError = (error) { | 199 _onError = (error, stackTrace) { |
| 200 cancel(); | 200 cancel(); |
| 201 result._completeError(error); | 201 result._completeError(error, stackTrace); |
| 202 }; | 202 }; |
| 203 | 203 |
| 204 return result; | 204 return result; |
| 205 } | 205 } |
| 206 | 206 |
| 207 // State management. | 207 // State management. |
| 208 | 208 |
| 209 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; | 209 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; |
| 210 bool get _isClosed => (_state & _STATE_CLOSED) != 0; | 210 bool get _isClosed => (_state & _STATE_CLOSED) != 0; |
| 211 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; | 211 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 252 void _add(T data) { | 252 void _add(T data) { |
| 253 assert(!_isClosed); | 253 assert(!_isClosed); |
| 254 if (_isCanceled) return; | 254 if (_isCanceled) return; |
| 255 if (_canFire) { | 255 if (_canFire) { |
| 256 _sendData(data); | 256 _sendData(data); |
| 257 } else { | 257 } else { |
| 258 _addPending(new _DelayedData(data)); | 258 _addPending(new _DelayedData(data)); |
| 259 } | 259 } |
| 260 } | 260 } |
| 261 | 261 |
| 262 void _addError(Object error) { | 262 void _addError(Object error, StackTrace stackTrace) { |
| 263 if (_isCanceled) return; | 263 if (_isCanceled) return; |
| 264 if (_canFire) { | 264 if (_canFire) { |
| 265 _sendError(error); // Reports cancel after sending. | 265 _sendError(error, stackTrace); // Reports cancel after sending. |
| 266 } else { | 266 } else { |
| 267 _addPending(new _DelayedError(error)); | 267 _addPending(new _DelayedError(error, stackTrace)); |
| 268 } | 268 } |
| 269 } | 269 } |
| 270 | 270 |
| 271 void _close() { | 271 void _close() { |
| 272 assert(!_isClosed); | 272 assert(!_isClosed); |
| 273 if (_isCanceled) return; | 273 if (_isCanceled) return; |
| 274 _state |= _STATE_CLOSED; | 274 _state |= _STATE_CLOSED; |
| 275 if (_canFire) { | 275 if (_canFire) { |
| 276 _sendDone(); | 276 _sendDone(); |
| 277 } else { | 277 } else { |
| (...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 321 assert(!_isCanceled); | 321 assert(!_isCanceled); |
| 322 assert(!_isPaused); | 322 assert(!_isPaused); |
| 323 assert(!_inCallback); | 323 assert(!_inCallback); |
| 324 bool wasInputPaused = _isInputPaused; | 324 bool wasInputPaused = _isInputPaused; |
| 325 _state |= _STATE_IN_CALLBACK; | 325 _state |= _STATE_IN_CALLBACK; |
| 326 _zone.runUnaryGuarded(_onData, data); | 326 _zone.runUnaryGuarded(_onData, data); |
| 327 _state &= ~_STATE_IN_CALLBACK; | 327 _state &= ~_STATE_IN_CALLBACK; |
| 328 _checkState(wasInputPaused); | 328 _checkState(wasInputPaused); |
| 329 } | 329 } |
| 330 | 330 |
| 331 void _sendError(var error) { | 331 void _sendError(var error, StackTrace stackTrace) { |
| 332 assert(!_isCanceled); | 332 assert(!_isCanceled); |
| 333 assert(!_isPaused); | 333 assert(!_isPaused); |
| 334 assert(!_inCallback); | 334 assert(!_inCallback); |
| 335 bool wasInputPaused = _isInputPaused; | 335 bool wasInputPaused = _isInputPaused; |
| 336 _state |= _STATE_IN_CALLBACK; | 336 _state |= _STATE_IN_CALLBACK; |
| 337 if (!_zone.inSameErrorZone(Zone.current)) { | 337 if (!_zone.inSameErrorZone(Zone.current)) { |
| 338 // Errors are not allowed to traverse zone boundaries. | 338 // Errors are not allowed to traverse zone boundaries. |
| 339 Zone.current.handleUncaughtError(error, getAttachedStackTrace(error)); | 339 Zone.current.handleUncaughtError(error, stackTrace); |
| 340 } else if (_onError is ZoneBinaryCallback) { |
| 341 _zone.runBinaryGuarded(_onError, error, stackTrace); |
| 340 } else { | 342 } else { |
| 341 _zone.runUnaryGuarded(_onError, error); | 343 _zone.runUnaryGuarded(_onError, error); |
| 342 } | 344 } |
| 343 _state &= ~_STATE_IN_CALLBACK; | 345 _state &= ~_STATE_IN_CALLBACK; |
| 344 if (_cancelOnError) { | 346 if (_cancelOnError) { |
| 345 _cancel(); | 347 _cancel(); |
| 346 } | 348 } |
| 347 _checkState(wasInputPaused); | 349 _checkState(wasInputPaused); |
| 348 } | 350 } |
| 349 | 351 |
| (...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 417 } | 419 } |
| 418 | 420 |
| 419 // ------------------------------------------------------------------- | 421 // ------------------------------------------------------------------- |
| 420 // Common base class for single and multi-subscription streams. | 422 // Common base class for single and multi-subscription streams. |
| 421 // ------------------------------------------------------------------- | 423 // ------------------------------------------------------------------- |
| 422 abstract class _StreamImpl<T> extends Stream<T> { | 424 abstract class _StreamImpl<T> extends Stream<T> { |
| 423 // ------------------------------------------------------------------ | 425 // ------------------------------------------------------------------ |
| 424 // Stream interface. | 426 // Stream interface. |
| 425 | 427 |
| 426 StreamSubscription<T> listen(void onData(T data), | 428 StreamSubscription<T> listen(void onData(T data), |
| 427 { void onError(error), | 429 { Function onError, |
| 428 void onDone(), | 430 void onDone(), |
| 429 bool cancelOnError }) { | 431 bool cancelOnError }) { |
| 430 if (onData == null) onData = _nullDataHandler; | 432 if (onData == null) onData = _nullDataHandler; |
| 431 if (onError == null) onError = _nullErrorHandler; | 433 if (onError == null) onError = _nullErrorHandler; |
| 432 if (onDone == null) onDone = _nullDoneHandler; | 434 if (onDone == null) onDone = _nullDoneHandler; |
| 433 cancelOnError = identical(true, cancelOnError); | 435 cancelOnError = identical(true, cancelOnError); |
| 434 StreamSubscription subscription = | 436 StreamSubscription subscription = |
| 435 _createSubscription(onData, onError, onDone, cancelOnError); | 437 _createSubscription(onData, onError, onDone, cancelOnError); |
| 436 _onListen(subscription); | 438 _onListen(subscription); |
| 437 return subscription; | 439 return subscription; |
| (...skipping 21 matching lines...) Expand all Loading... |
| 459 final _EventGenerator _pending; | 461 final _EventGenerator _pending; |
| 460 /** | 462 /** |
| 461 * Initializes the stream to have only the events provided by a | 463 * Initializes the stream to have only the events provided by a |
| 462 * [_PendingEvents]. | 464 * [_PendingEvents]. |
| 463 * | 465 * |
| 464 * A new [_PendingEvents] must be generated for each listen. | 466 * A new [_PendingEvents] must be generated for each listen. |
| 465 */ | 467 */ |
| 466 _GeneratedStreamImpl(this._pending); | 468 _GeneratedStreamImpl(this._pending); |
| 467 | 469 |
| 468 StreamSubscription _createSubscription(void onData(T data), | 470 StreamSubscription _createSubscription(void onData(T data), |
| 469 void onError(Object error), | 471 Function onError, |
| 470 void onDone(), | 472 void onDone(), |
| 471 bool cancelOnError) { | 473 bool cancelOnError) { |
| 472 _BufferingStreamSubscription<T> subscription = | 474 _BufferingStreamSubscription<T> subscription = |
| 473 new _BufferingStreamSubscription( | 475 new _BufferingStreamSubscription( |
| 474 onData, onError, onDone, cancelOnError); | 476 onData, onError, onDone, cancelOnError); |
| 475 subscription._setPendingEvents(_pending()); | 477 subscription._setPendingEvents(_pending()); |
| 476 return subscription; | 478 return subscription; |
| 477 } | 479 } |
| 478 } | 480 } |
| 479 | 481 |
| (...skipping 15 matching lines...) Expand all Loading... |
| 495 // Send one event per call to moveNext. | 497 // Send one event per call to moveNext. |
| 496 // If moveNext returns true, send the current element as data. | 498 // If moveNext returns true, send the current element as data. |
| 497 // If moveNext returns false, send a done event and clear the _iterator. | 499 // If moveNext returns false, send a done event and clear the _iterator. |
| 498 // If moveNext throws an error, send an error and clear the _iterator. | 500 // If moveNext throws an error, send an error and clear the _iterator. |
| 499 // After an error, no further events will be sent. | 501 // After an error, no further events will be sent. |
| 500 bool isDone; | 502 bool isDone; |
| 501 try { | 503 try { |
| 502 isDone = !_iterator.moveNext(); | 504 isDone = !_iterator.moveNext(); |
| 503 } catch (e, s) { | 505 } catch (e, s) { |
| 504 _iterator = null; | 506 _iterator = null; |
| 505 dispatch._sendError(_asyncError(e, s)); | 507 dispatch._sendError(_asyncError(e, s), s); |
| 506 return; | 508 return; |
| 507 } | 509 } |
| 508 if (!isDone) { | 510 if (!isDone) { |
| 509 dispatch._sendData(_iterator.current); | 511 dispatch._sendData(_iterator.current); |
| 510 } else { | 512 } else { |
| 511 _iterator = null; | 513 _iterator = null; |
| 512 dispatch._sendDone(); | 514 dispatch._sendDone(); |
| 513 } | 515 } |
| 514 } | 516 } |
| 515 | 517 |
| 516 void clear() { | 518 void clear() { |
| 517 if (isScheduled) cancelSchedule(); | 519 if (isScheduled) cancelSchedule(); |
| 518 _iterator = null; | 520 _iterator = null; |
| 519 } | 521 } |
| 520 } | 522 } |
| 521 | 523 |
| 522 | 524 |
| 523 // Internal helpers. | 525 // Internal helpers. |
| 524 | 526 |
| 525 // Types of the different handlers on a stream. Types used to type fields. | 527 // Types of the different handlers on a stream. Types used to type fields. |
| 526 typedef void _DataHandler<T>(T value); | 528 typedef void _DataHandler<T>(T value); |
| 527 typedef void _ErrorHandler(error); | |
| 528 typedef void _DoneHandler(); | 529 typedef void _DoneHandler(); |
| 529 | 530 |
| 530 | 531 |
| 531 /** Default data handler, does nothing. */ | 532 /** Default data handler, does nothing. */ |
| 532 void _nullDataHandler(var value) {} | 533 void _nullDataHandler(var value) {} |
| 533 | 534 |
| 534 /** Default error handler, reports the error to the current zone's handler. */ | 535 /** Default error handler, reports the error to the current zone's handler. */ |
| 535 void _nullErrorHandler(error) { | 536 void _nullErrorHandler(error, [StackTrace stackTrace]) { |
| 536 Zone.current.handleUncaughtError(error, getAttachedStackTrace(error)); | 537 Zone.current.handleUncaughtError(error, stackTrace); |
| 537 } | 538 } |
| 538 | 539 |
| 539 /** Default done handler, does nothing. */ | 540 /** Default done handler, does nothing. */ |
| 540 void _nullDoneHandler() {} | 541 void _nullDoneHandler() {} |
| 541 | 542 |
| 542 | 543 |
| 543 /** A delayed event on a buffering stream subscription. */ | 544 /** A delayed event on a buffering stream subscription. */ |
| 544 abstract class _DelayedEvent { | 545 abstract class _DelayedEvent { |
| 545 /** Added as a linked list on the [StreamController]. */ | 546 /** Added as a linked list on the [StreamController]. */ |
| 546 _DelayedEvent next; | 547 _DelayedEvent next; |
| 547 /** Execute the delayed event on the [StreamController]. */ | 548 /** Execute the delayed event on the [StreamController]. */ |
| 548 void perform(_EventDispatch dispatch); | 549 void perform(_EventDispatch dispatch); |
| 549 } | 550 } |
| 550 | 551 |
| 551 /** A delayed data event. */ | 552 /** A delayed data event. */ |
| 552 class _DelayedData<T> extends _DelayedEvent { | 553 class _DelayedData<T> extends _DelayedEvent { |
| 553 final T value; | 554 final T value; |
| 554 _DelayedData(this.value); | 555 _DelayedData(this.value); |
| 555 void perform(_EventDispatch<T> dispatch) { | 556 void perform(_EventDispatch<T> dispatch) { |
| 556 dispatch._sendData(value); | 557 dispatch._sendData(value); |
| 557 } | 558 } |
| 558 } | 559 } |
| 559 | 560 |
| 560 /** A delayed error event. */ | 561 /** A delayed error event. */ |
| 561 class _DelayedError extends _DelayedEvent { | 562 class _DelayedError extends _DelayedEvent { |
| 562 final error; | 563 final error; |
| 563 _DelayedError(this.error); | 564 final StackTrace stackTrace; |
| 565 |
| 566 _DelayedError(this.error, this.stackTrace); |
| 564 void perform(_EventDispatch dispatch) { | 567 void perform(_EventDispatch dispatch) { |
| 565 dispatch._sendError(error); | 568 dispatch._sendError(error, stackTrace); |
| 566 } | 569 } |
| 567 } | 570 } |
| 568 | 571 |
| 569 /** A delayed done event. */ | 572 /** A delayed done event. */ |
| 570 class _DelayedDone implements _DelayedEvent { | 573 class _DelayedDone implements _DelayedEvent { |
| 571 const _DelayedDone(); | 574 const _DelayedDone(); |
| 572 void perform(_EventDispatch dispatch) { | 575 void perform(_EventDispatch dispatch) { |
| 573 dispatch._sendDone(); | 576 dispatch._sendDone(); |
| 574 } | 577 } |
| 575 | 578 |
| (...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 697 | 700 |
| 698 typedef void _broadcastCallback(StreamSubscription subscription); | 701 typedef void _broadcastCallback(StreamSubscription subscription); |
| 699 | 702 |
| 700 /** | 703 /** |
| 701 * Dummy subscription that will never receive any events. | 704 * Dummy subscription that will never receive any events. |
| 702 */ | 705 */ |
| 703 class _DummyStreamSubscription<T> implements StreamSubscription<T> { | 706 class _DummyStreamSubscription<T> implements StreamSubscription<T> { |
| 704 int _pauseCounter = 0; | 707 int _pauseCounter = 0; |
| 705 | 708 |
| 706 void onData(void handleData(T data)) {} | 709 void onData(void handleData(T data)) {} |
| 707 void onError(void handleError(Object data)) {} | 710 void onError(Function handleError) {} |
| 708 void onDone(void handleDone()) {} | 711 void onDone(void handleDone()) {} |
| 709 | 712 |
| 710 void pause([Future resumeSignal]) { | 713 void pause([Future resumeSignal]) { |
| 711 _pauseCounter++; | 714 _pauseCounter++; |
| 712 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); | 715 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); |
| 713 } | 716 } |
| 714 void resume() { | 717 void resume() { |
| 715 if (_pauseCounter > 0) _pauseCounter--; | 718 if (_pauseCounter > 0) _pauseCounter--; |
| 716 } | 719 } |
| 717 void cancel() {} | 720 void cancel() {} |
| (...skipping 16 matching lines...) Expand all Loading... |
| 734 void onCancelHandler(StreamSubscription subscription)) | 737 void onCancelHandler(StreamSubscription subscription)) |
| 735 : _onListenHandler = Zone.current.registerUnaryCallback(onListenHandler), | 738 : _onListenHandler = Zone.current.registerUnaryCallback(onListenHandler), |
| 736 _onCancelHandler = Zone.current.registerUnaryCallback(onCancelHandler), | 739 _onCancelHandler = Zone.current.registerUnaryCallback(onCancelHandler), |
| 737 _zone = Zone.current { | 740 _zone = Zone.current { |
| 738 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); | 741 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); |
| 739 } | 742 } |
| 740 | 743 |
| 741 bool get isBroadcast => true; | 744 bool get isBroadcast => true; |
| 742 | 745 |
| 743 StreamSubscription<T> listen(void onData(T data), | 746 StreamSubscription<T> listen(void onData(T data), |
| 744 { void onError(Object error), | 747 { Function onError, |
| 745 void onDone(), | 748 void onDone(), |
| 746 bool cancelOnError}) { | 749 bool cancelOnError}) { |
| 747 if (_controller == null) { | 750 if (_controller == null) { |
| 748 // Return a dummy subscription backed by nothing, since | 751 // Return a dummy subscription backed by nothing, since |
| 749 // it won't ever receive any events. | 752 // it won't ever receive any events. |
| 750 return new _DummyStreamSubscription<T>(); | 753 return new _DummyStreamSubscription<T>(); |
| 751 } | 754 } |
| 752 if (_subscription == null) { | 755 if (_subscription == null) { |
| 753 _subscription = _source.listen(_controller.add, | 756 _subscription = _source.listen(_controller.add, |
| 754 onError: _controller.addError, | 757 onError: _controller.addError, |
| (...skipping 214 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 969 _state = _STATE_FOUND; | 972 _state = _STATE_FOUND; |
| 970 hasNext._complete(true); | 973 hasNext._complete(true); |
| 971 return; | 974 return; |
| 972 } | 975 } |
| 973 _subscription.pause(); | 976 _subscription.pause(); |
| 974 assert(_futureOrPrefetch == null); | 977 assert(_futureOrPrefetch == null); |
| 975 _futureOrPrefetch = data; | 978 _futureOrPrefetch = data; |
| 976 _state = _STATE_EXTRA_DATA; | 979 _state = _STATE_EXTRA_DATA; |
| 977 } | 980 } |
| 978 | 981 |
| 979 void _onError(Object error) { | 982 void _onError(Object error, [StackTrace stackTrace]) { |
| 980 if (_state == _STATE_MOVING) { | 983 if (_state == _STATE_MOVING) { |
| 981 _Future<bool> hasNext = _futureOrPrefetch; | 984 _Future<bool> hasNext = _futureOrPrefetch; |
| 982 // We have cancelOnError: true, so the subscription is canceled. | 985 // We have cancelOnError: true, so the subscription is canceled. |
| 983 _clear(); | 986 _clear(); |
| 984 hasNext._completeError(error); | 987 hasNext._completeError(error, stackTrace); |
| 985 return; | 988 return; |
| 986 } | 989 } |
| 987 _subscription.pause(); | 990 _subscription.pause(); |
| 988 assert(_futureOrPrefetch == null); | 991 assert(_futureOrPrefetch == null); |
| 989 _futureOrPrefetch = error; | 992 _futureOrPrefetch = error; |
| 990 _state = _STATE_EXTRA_ERROR; | 993 _state = _STATE_EXTRA_ERROR; |
| 991 } | 994 } |
| 992 | 995 |
| 993 void _onDone() { | 996 void _onDone() { |
| 994 if (_state == _STATE_MOVING) { | 997 if (_state == _STATE_MOVING) { |
| 995 _Future<bool> hasNext = _futureOrPrefetch; | 998 _Future<bool> hasNext = _futureOrPrefetch; |
| 996 _clear(); | 999 _clear(); |
| 997 hasNext._complete(false); | 1000 hasNext._complete(false); |
| 998 return; | 1001 return; |
| 999 } | 1002 } |
| 1000 _subscription.pause(); | 1003 _subscription.pause(); |
| 1001 _futureOrPrefetch = null; | 1004 _futureOrPrefetch = null; |
| 1002 _state = _STATE_EXTRA_DONE; | 1005 _state = _STATE_EXTRA_DONE; |
| 1003 } | 1006 } |
| 1004 } | 1007 } |
| OLD | NEW |