| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
| 8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
| 9 void _add(T data); | 9 void _add(T data); |
| 10 void _addError(Object error); | 10 void _addError(Object error, StackTrace stackTrace); |
| 11 void _close(); | 11 void _close(); |
| 12 } | 12 } |
| 13 | 13 |
| 14 /** | 14 /** |
| 15 * Abstract and private interface for a place to send events. | 15 * Abstract and private interface for a place to send events. |
| 16 * | 16 * |
| 17 * Used by event buffering to finally dispatch the pending event, where | 17 * Used by event buffering to finally dispatch the pending event, where |
| 18 * [_EventSink] is where the event first enters the stream subscription, | 18 * [_EventSink] is where the event first enters the stream subscription, |
| 19 * and may yet be buffered. | 19 * and may yet be buffered. |
| 20 */ | 20 */ |
| 21 abstract class _EventDispatch<T> { | 21 abstract class _EventDispatch<T> { |
| 22 void _sendData(T data); | 22 void _sendData(T data); |
| 23 void _sendError(Object error); | 23 void _sendError(Object error, StackTrace stackTrace); |
| 24 void _sendDone(); | 24 void _sendDone(); |
| 25 } | 25 } |
| 26 | 26 |
| 27 /** | 27 /** |
| 28 * Default implementation of stream subscription of buffering events. | 28 * Default implementation of stream subscription of buffering events. |
| 29 * | 29 * |
| 30 * The only public methods are those of [StreamSubscription], so instances of | 30 * The only public methods are those of [StreamSubscription], so instances of |
| 31 * [_BufferingStreamSubscription] can be returned directly as a | 31 * [_BufferingStreamSubscription] can be returned directly as a |
| 32 * [StreamSubscription] without exposing internal functionality. | 32 * [StreamSubscription] without exposing internal functionality. |
| 33 * | 33 * |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 71 * when `cancelOnError` is true. | 71 * when `cancelOnError` is true. |
| 72 */ | 72 */ |
| 73 static const int _STATE_CANCELED = 8; | 73 static const int _STATE_CANCELED = 8; |
| 74 static const int _STATE_IN_CALLBACK = 16; | 74 static const int _STATE_IN_CALLBACK = 16; |
| 75 static const int _STATE_HAS_PENDING = 32; | 75 static const int _STATE_HAS_PENDING = 32; |
| 76 static const int _STATE_PAUSE_COUNT = 64; | 76 static const int _STATE_PAUSE_COUNT = 64; |
| 77 static const int _STATE_PAUSE_COUNT_SHIFT = 6; | 77 static const int _STATE_PAUSE_COUNT_SHIFT = 6; |
| 78 | 78 |
| 79 /* Event handlers provided in constructor. */ | 79 /* Event handlers provided in constructor. */ |
| 80 _DataHandler<T> _onData; | 80 _DataHandler<T> _onData; |
| 81 _ErrorHandler _onError; | 81 Function _onError; |
| 82 _DoneHandler _onDone; | 82 _DoneHandler _onDone; |
| 83 final Zone _zone = Zone.current; | 83 final Zone _zone = Zone.current; |
| 84 | 84 |
| 85 /** Bit vector based on state-constants above. */ | 85 /** Bit vector based on state-constants above. */ |
| 86 int _state; | 86 int _state; |
| 87 | 87 |
| 88 /** | 88 /** |
| 89 * Queue of pending events. | 89 * Queue of pending events. |
| 90 * | 90 * |
| 91 * Is created when necessary, or set in constructor for preconfigured events. | 91 * Is created when necessary, or set in constructor for preconfigured events. |
| 92 */ | 92 */ |
| 93 _PendingEvents _pending; | 93 _PendingEvents _pending; |
| 94 | 94 |
| 95 _BufferingStreamSubscription(void onData(T data), | 95 _BufferingStreamSubscription(void onData(T data), |
| 96 void onError(error), | 96 Function onError, |
| 97 void onDone(), | 97 void onDone(), |
| 98 bool cancelOnError) | 98 bool cancelOnError) |
| 99 : _onData = Zone.current.registerUnaryCallback(onData), | 99 : _onData = Zone.current.registerUnaryCallback(onData), |
| 100 _onError = Zone.current.registerUnaryCallback(onError), | 100 _onError = _registerErrorCallback(onError), |
| 101 _onDone = Zone.current.registerCallback(onDone), | 101 _onDone = Zone.current.registerCallback(onDone), |
| 102 _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { | 102 _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { |
| 103 assert(_onData != null); | 103 assert(_onData != null); |
| 104 assert(_onError != null); | 104 assert(_onError != null); |
| 105 assert(_onDone != null); | 105 assert(_onDone != null); |
| 106 } | 106 } |
| 107 | 107 |
| 108 static _registerErrorCallback(Function errorCallback) { |
| 109 if (errorCallback is ZoneBinaryCallback) { |
| 110 return Zone.current.registerBinaryCallback(errorCallback); |
| 111 } else { |
| 112 return Zone.current.registerUnaryCallback(errorCallback); |
| 113 } |
| 114 } |
| 115 |
| 108 /** | 116 /** |
| 109 * Sets the subscription's pending events object. | 117 * Sets the subscription's pending events object. |
| 110 * | 118 * |
| 111 * This can only be done once. The pending events object is used for the | 119 * This can only be done once. The pending events object is used for the |
| 112 * rest of the subscription's life cycle. | 120 * rest of the subscription's life cycle. |
| 113 */ | 121 */ |
| 114 void _setPendingEvents(_PendingEvents pendingEvents) { | 122 void _setPendingEvents(_PendingEvents pendingEvents) { |
| 115 assert(_pending == null); | 123 assert(_pending == null); |
| 116 if (pendingEvents == null) return; | 124 if (pendingEvents == null) return; |
| 117 _pending = pendingEvents; | 125 _pending = pendingEvents; |
| (...skipping 13 matching lines...) Expand all Loading... |
| 131 assert(_isCanceled); | 139 assert(_isCanceled); |
| 132 _PendingEvents events = _pending; | 140 _PendingEvents events = _pending; |
| 133 _pending = null; | 141 _pending = null; |
| 134 return events; | 142 return events; |
| 135 } | 143 } |
| 136 | 144 |
| 137 // StreamSubscription interface. | 145 // StreamSubscription interface. |
| 138 | 146 |
| 139 void onData(void handleData(T event)) { | 147 void onData(void handleData(T event)) { |
| 140 if (handleData == null) handleData = _nullDataHandler; | 148 if (handleData == null) handleData = _nullDataHandler; |
| 141 _onData = handleData; | 149 _onData = Zone.current.registerUnaryCallback(handleData); |
| 142 } | 150 } |
| 143 | 151 |
| 144 void onError(void handleError(error)) { | 152 void onError(Function handleError) { |
| 145 if (handleError == null) handleError = _nullErrorHandler; | 153 if (handleError == null) handleError = _nullErrorHandler; |
| 146 _onError = handleError; | 154 _onError = _registerErrorCallback(handleError); |
| 147 } | 155 } |
| 148 | 156 |
| 149 void onDone(void handleDone()) { | 157 void onDone(void handleDone()) { |
| 150 if (handleDone == null) handleDone = _nullDoneHandler; | 158 if (handleDone == null) handleDone = _nullDoneHandler; |
| 151 _onDone = handleDone; | 159 _onDone = Zone.current.registerCallback(handleDone); |
| 152 } | 160 } |
| 153 | 161 |
| 154 void pause([Future resumeSignal]) { | 162 void pause([Future resumeSignal]) { |
| 155 if (_isCanceled) return; | 163 if (_isCanceled) return; |
| 156 bool wasPaused = _isPaused; | 164 bool wasPaused = _isPaused; |
| 157 bool wasInputPaused = _isInputPaused; | 165 bool wasInputPaused = _isInputPaused; |
| 158 // Increment pause count and mark input paused (if it isn't already). | 166 // Increment pause count and mark input paused (if it isn't already). |
| 159 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | 167 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; |
| 160 if (resumeSignal != null) resumeSignal.whenComplete(resume); | 168 if (resumeSignal != null) resumeSignal.whenComplete(resume); |
| 161 if (!wasPaused && _pending != null) _pending.cancelSchedule(); | 169 if (!wasPaused && _pending != null) _pending.cancelSchedule(); |
| (...skipping 27 matching lines...) Expand all Loading... |
| 189 _pending = null; | 197 _pending = null; |
| 190 _state &= ~_STATE_IN_CALLBACK; | 198 _state &= ~_STATE_IN_CALLBACK; |
| 191 } | 199 } |
| 192 } | 200 } |
| 193 | 201 |
| 194 Future asFuture([var futureValue]) { | 202 Future asFuture([var futureValue]) { |
| 195 _Future<T> result = new _Future<T>(); | 203 _Future<T> result = new _Future<T>(); |
| 196 | 204 |
| 197 // Overwrite the onDone and onError handlers. | 205 // Overwrite the onDone and onError handlers. |
| 198 _onDone = () { result._complete(futureValue); }; | 206 _onDone = () { result._complete(futureValue); }; |
| 199 _onError = (error) { | 207 _onError = (error, stackTrace) { |
| 200 cancel(); | 208 cancel(); |
| 201 result._completeError(error); | 209 result._completeError(error, stackTrace); |
| 202 }; | 210 }; |
| 203 | 211 |
| 204 return result; | 212 return result; |
| 205 } | 213 } |
| 206 | 214 |
| 207 // State management. | 215 // State management. |
| 208 | 216 |
| 209 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; | 217 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; |
| 210 bool get _isClosed => (_state & _STATE_CLOSED) != 0; | 218 bool get _isClosed => (_state & _STATE_CLOSED) != 0; |
| 211 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; | 219 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 252 void _add(T data) { | 260 void _add(T data) { |
| 253 assert(!_isClosed); | 261 assert(!_isClosed); |
| 254 if (_isCanceled) return; | 262 if (_isCanceled) return; |
| 255 if (_canFire) { | 263 if (_canFire) { |
| 256 _sendData(data); | 264 _sendData(data); |
| 257 } else { | 265 } else { |
| 258 _addPending(new _DelayedData(data)); | 266 _addPending(new _DelayedData(data)); |
| 259 } | 267 } |
| 260 } | 268 } |
| 261 | 269 |
| 262 void _addError(Object error) { | 270 void _addError(Object error, StackTrace stackTrace) { |
| 263 if (_isCanceled) return; | 271 if (_isCanceled) return; |
| 264 if (_canFire) { | 272 if (_canFire) { |
| 265 _sendError(error); // Reports cancel after sending. | 273 _sendError(error, stackTrace); // Reports cancel after sending. |
| 266 } else { | 274 } else { |
| 267 _addPending(new _DelayedError(error)); | 275 _addPending(new _DelayedError(error, stackTrace)); |
| 268 } | 276 } |
| 269 } | 277 } |
| 270 | 278 |
| 271 void _close() { | 279 void _close() { |
| 272 assert(!_isClosed); | 280 assert(!_isClosed); |
| 273 if (_isCanceled) return; | 281 if (_isCanceled) return; |
| 274 _state |= _STATE_CLOSED; | 282 _state |= _STATE_CLOSED; |
| 275 if (_canFire) { | 283 if (_canFire) { |
| 276 _sendDone(); | 284 _sendDone(); |
| 277 } else { | 285 } else { |
| (...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 321 assert(!_isCanceled); | 329 assert(!_isCanceled); |
| 322 assert(!_isPaused); | 330 assert(!_isPaused); |
| 323 assert(!_inCallback); | 331 assert(!_inCallback); |
| 324 bool wasInputPaused = _isInputPaused; | 332 bool wasInputPaused = _isInputPaused; |
| 325 _state |= _STATE_IN_CALLBACK; | 333 _state |= _STATE_IN_CALLBACK; |
| 326 _zone.runUnaryGuarded(_onData, data); | 334 _zone.runUnaryGuarded(_onData, data); |
| 327 _state &= ~_STATE_IN_CALLBACK; | 335 _state &= ~_STATE_IN_CALLBACK; |
| 328 _checkState(wasInputPaused); | 336 _checkState(wasInputPaused); |
| 329 } | 337 } |
| 330 | 338 |
| 331 void _sendError(var error) { | 339 void _sendError(var error, StackTrace stackTrace) { |
| 332 assert(!_isCanceled); | 340 assert(!_isCanceled); |
| 333 assert(!_isPaused); | 341 assert(!_isPaused); |
| 334 assert(!_inCallback); | 342 assert(!_inCallback); |
| 335 bool wasInputPaused = _isInputPaused; | 343 bool wasInputPaused = _isInputPaused; |
| 336 _state |= _STATE_IN_CALLBACK; | 344 _state |= _STATE_IN_CALLBACK; |
| 337 if (!_zone.inSameErrorZone(Zone.current)) { | 345 if (!_zone.inSameErrorZone(Zone.current)) { |
| 338 // Errors are not allowed to traverse zone boundaries. | 346 // Errors are not allowed to traverse zone boundaries. |
| 339 Zone.current.handleUncaughtError(error, getAttachedStackTrace(error)); | 347 Zone.current.handleUncaughtError(error, stackTrace); |
| 348 } else if (_onError is ZoneBinaryCallback) { |
| 349 _zone.runBinaryGuarded(_onError, error, stackTrace); |
| 340 } else { | 350 } else { |
| 341 _zone.runUnaryGuarded(_onError, error); | 351 _zone.runUnaryGuarded(_onError, error); |
| 342 } | 352 } |
| 343 _state &= ~_STATE_IN_CALLBACK; | 353 _state &= ~_STATE_IN_CALLBACK; |
| 344 if (_cancelOnError) { | 354 if (_cancelOnError) { |
| 345 _cancel(); | 355 _cancel(); |
| 346 } | 356 } |
| 347 _checkState(wasInputPaused); | 357 _checkState(wasInputPaused); |
| 348 } | 358 } |
| 349 | 359 |
| (...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 417 } | 427 } |
| 418 | 428 |
| 419 // ------------------------------------------------------------------- | 429 // ------------------------------------------------------------------- |
| 420 // Common base class for single and multi-subscription streams. | 430 // Common base class for single and multi-subscription streams. |
| 421 // ------------------------------------------------------------------- | 431 // ------------------------------------------------------------------- |
| 422 abstract class _StreamImpl<T> extends Stream<T> { | 432 abstract class _StreamImpl<T> extends Stream<T> { |
| 423 // ------------------------------------------------------------------ | 433 // ------------------------------------------------------------------ |
| 424 // Stream interface. | 434 // Stream interface. |
| 425 | 435 |
| 426 StreamSubscription<T> listen(void onData(T data), | 436 StreamSubscription<T> listen(void onData(T data), |
| 427 { void onError(error), | 437 { Function onError, |
| 428 void onDone(), | 438 void onDone(), |
| 429 bool cancelOnError }) { | 439 bool cancelOnError }) { |
| 430 if (onData == null) onData = _nullDataHandler; | 440 if (onData == null) onData = _nullDataHandler; |
| 431 if (onError == null) onError = _nullErrorHandler; | 441 if (onError == null) onError = _nullErrorHandler; |
| 432 if (onDone == null) onDone = _nullDoneHandler; | 442 if (onDone == null) onDone = _nullDoneHandler; |
| 433 cancelOnError = identical(true, cancelOnError); | 443 cancelOnError = identical(true, cancelOnError); |
| 434 StreamSubscription subscription = | 444 StreamSubscription subscription = |
| 435 _createSubscription(onData, onError, onDone, cancelOnError); | 445 _createSubscription(onData, onError, onDone, cancelOnError); |
| 436 _onListen(subscription); | 446 _onListen(subscription); |
| 437 return subscription; | 447 return subscription; |
| (...skipping 21 matching lines...) Expand all Loading... |
| 459 final _EventGenerator _pending; | 469 final _EventGenerator _pending; |
| 460 /** | 470 /** |
| 461 * Initializes the stream to have only the events provided by a | 471 * Initializes the stream to have only the events provided by a |
| 462 * [_PendingEvents]. | 472 * [_PendingEvents]. |
| 463 * | 473 * |
| 464 * A new [_PendingEvents] must be generated for each listen. | 474 * A new [_PendingEvents] must be generated for each listen. |
| 465 */ | 475 */ |
| 466 _GeneratedStreamImpl(this._pending); | 476 _GeneratedStreamImpl(this._pending); |
| 467 | 477 |
| 468 StreamSubscription _createSubscription(void onData(T data), | 478 StreamSubscription _createSubscription(void onData(T data), |
| 469 void onError(Object error), | 479 Function onError, |
| 470 void onDone(), | 480 void onDone(), |
| 471 bool cancelOnError) { | 481 bool cancelOnError) { |
| 472 _BufferingStreamSubscription<T> subscription = | 482 _BufferingStreamSubscription<T> subscription = |
| 473 new _BufferingStreamSubscription( | 483 new _BufferingStreamSubscription( |
| 474 onData, onError, onDone, cancelOnError); | 484 onData, onError, onDone, cancelOnError); |
| 475 subscription._setPendingEvents(_pending()); | 485 subscription._setPendingEvents(_pending()); |
| 476 return subscription; | 486 return subscription; |
| 477 } | 487 } |
| 478 } | 488 } |
| 479 | 489 |
| (...skipping 15 matching lines...) Expand all Loading... |
| 495 // Send one event per call to moveNext. | 505 // Send one event per call to moveNext. |
| 496 // If moveNext returns true, send the current element as data. | 506 // If moveNext returns true, send the current element as data. |
| 497 // If moveNext returns false, send a done event and clear the _iterator. | 507 // If moveNext returns false, send a done event and clear the _iterator. |
| 498 // If moveNext throws an error, send an error and clear the _iterator. | 508 // If moveNext throws an error, send an error and clear the _iterator. |
| 499 // After an error, no further events will be sent. | 509 // After an error, no further events will be sent. |
| 500 bool isDone; | 510 bool isDone; |
| 501 try { | 511 try { |
| 502 isDone = !_iterator.moveNext(); | 512 isDone = !_iterator.moveNext(); |
| 503 } catch (e, s) { | 513 } catch (e, s) { |
| 504 _iterator = null; | 514 _iterator = null; |
| 505 dispatch._sendError(_asyncError(e, s)); | 515 dispatch._sendError(_asyncError(e, s), s); |
| 506 return; | 516 return; |
| 507 } | 517 } |
| 508 if (!isDone) { | 518 if (!isDone) { |
| 509 dispatch._sendData(_iterator.current); | 519 dispatch._sendData(_iterator.current); |
| 510 } else { | 520 } else { |
| 511 _iterator = null; | 521 _iterator = null; |
| 512 dispatch._sendDone(); | 522 dispatch._sendDone(); |
| 513 } | 523 } |
| 514 } | 524 } |
| 515 | 525 |
| 516 void clear() { | 526 void clear() { |
| 517 if (isScheduled) cancelSchedule(); | 527 if (isScheduled) cancelSchedule(); |
| 518 _iterator = null; | 528 _iterator = null; |
| 519 } | 529 } |
| 520 } | 530 } |
| 521 | 531 |
| 522 | 532 |
| 523 // Internal helpers. | 533 // Internal helpers. |
| 524 | 534 |
| 525 // Types of the different handlers on a stream. Types used to type fields. | 535 // Types of the different handlers on a stream. Types used to type fields. |
| 526 typedef void _DataHandler<T>(T value); | 536 typedef void _DataHandler<T>(T value); |
| 527 typedef void _ErrorHandler(error); | |
| 528 typedef void _DoneHandler(); | 537 typedef void _DoneHandler(); |
| 529 | 538 |
| 530 | 539 |
| 531 /** Default data handler, does nothing. */ | 540 /** Default data handler, does nothing. */ |
| 532 void _nullDataHandler(var value) {} | 541 void _nullDataHandler(var value) {} |
| 533 | 542 |
| 534 /** Default error handler, reports the error to the current zone's handler. */ | 543 /** Default error handler, reports the error to the current zone's handler. */ |
| 535 void _nullErrorHandler(error) { | 544 void _nullErrorHandler(error, [StackTrace stackTrace]) { |
| 536 Zone.current.handleUncaughtError(error, getAttachedStackTrace(error)); | 545 Zone.current.handleUncaughtError(error, stackTrace); |
| 537 } | 546 } |
| 538 | 547 |
| 539 /** Default done handler, does nothing. */ | 548 /** Default done handler, does nothing. */ |
| 540 void _nullDoneHandler() {} | 549 void _nullDoneHandler() {} |
| 541 | 550 |
| 542 | 551 |
| 543 /** A delayed event on a buffering stream subscription. */ | 552 /** A delayed event on a buffering stream subscription. */ |
| 544 abstract class _DelayedEvent { | 553 abstract class _DelayedEvent { |
| 545 /** Added as a linked list on the [StreamController]. */ | 554 /** Added as a linked list on the [StreamController]. */ |
| 546 _DelayedEvent next; | 555 _DelayedEvent next; |
| 547 /** Execute the delayed event on the [StreamController]. */ | 556 /** Execute the delayed event on the [StreamController]. */ |
| 548 void perform(_EventDispatch dispatch); | 557 void perform(_EventDispatch dispatch); |
| 549 } | 558 } |
| 550 | 559 |
| 551 /** A delayed data event. */ | 560 /** A delayed data event. */ |
| 552 class _DelayedData<T> extends _DelayedEvent { | 561 class _DelayedData<T> extends _DelayedEvent { |
| 553 final T value; | 562 final T value; |
| 554 _DelayedData(this.value); | 563 _DelayedData(this.value); |
| 555 void perform(_EventDispatch<T> dispatch) { | 564 void perform(_EventDispatch<T> dispatch) { |
| 556 dispatch._sendData(value); | 565 dispatch._sendData(value); |
| 557 } | 566 } |
| 558 } | 567 } |
| 559 | 568 |
| 560 /** A delayed error event. */ | 569 /** A delayed error event. */ |
| 561 class _DelayedError extends _DelayedEvent { | 570 class _DelayedError extends _DelayedEvent { |
| 562 final error; | 571 final error; |
| 563 _DelayedError(this.error); | 572 final StackTrace stackTrace; |
| 573 |
| 574 _DelayedError(this.error, this.stackTrace); |
| 564 void perform(_EventDispatch dispatch) { | 575 void perform(_EventDispatch dispatch) { |
| 565 dispatch._sendError(error); | 576 dispatch._sendError(error, stackTrace); |
| 566 } | 577 } |
| 567 } | 578 } |
| 568 | 579 |
| 569 /** A delayed done event. */ | 580 /** A delayed done event. */ |
| 570 class _DelayedDone implements _DelayedEvent { | 581 class _DelayedDone implements _DelayedEvent { |
| 571 const _DelayedDone(); | 582 const _DelayedDone(); |
| 572 void perform(_EventDispatch dispatch) { | 583 void perform(_EventDispatch dispatch) { |
| 573 dispatch._sendDone(); | 584 dispatch._sendDone(); |
| 574 } | 585 } |
| 575 | 586 |
| (...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 697 | 708 |
| 698 typedef void _broadcastCallback(StreamSubscription subscription); | 709 typedef void _broadcastCallback(StreamSubscription subscription); |
| 699 | 710 |
| 700 /** | 711 /** |
| 701 * Dummy subscription that will never receive any events. | 712 * Dummy subscription that will never receive any events. |
| 702 */ | 713 */ |
| 703 class _DummyStreamSubscription<T> implements StreamSubscription<T> { | 714 class _DummyStreamSubscription<T> implements StreamSubscription<T> { |
| 704 int _pauseCounter = 0; | 715 int _pauseCounter = 0; |
| 705 | 716 |
| 706 void onData(void handleData(T data)) {} | 717 void onData(void handleData(T data)) {} |
| 707 void onError(void handleError(Object data)) {} | 718 void onError(Function handleError) {} |
| 708 void onDone(void handleDone()) {} | 719 void onDone(void handleDone()) {} |
| 709 | 720 |
| 710 void pause([Future resumeSignal]) { | 721 void pause([Future resumeSignal]) { |
| 711 _pauseCounter++; | 722 _pauseCounter++; |
| 712 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); | 723 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); |
| 713 } | 724 } |
| 714 void resume() { | 725 void resume() { |
| 715 if (_pauseCounter > 0) _pauseCounter--; | 726 if (_pauseCounter > 0) _pauseCounter--; |
| 716 } | 727 } |
| 717 void cancel() {} | 728 void cancel() {} |
| (...skipping 16 matching lines...) Expand all Loading... |
| 734 void onCancelHandler(StreamSubscription subscription)) | 745 void onCancelHandler(StreamSubscription subscription)) |
| 735 : _onListenHandler = Zone.current.registerUnaryCallback(onListenHandler), | 746 : _onListenHandler = Zone.current.registerUnaryCallback(onListenHandler), |
| 736 _onCancelHandler = Zone.current.registerUnaryCallback(onCancelHandler), | 747 _onCancelHandler = Zone.current.registerUnaryCallback(onCancelHandler), |
| 737 _zone = Zone.current { | 748 _zone = Zone.current { |
| 738 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); | 749 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); |
| 739 } | 750 } |
| 740 | 751 |
| 741 bool get isBroadcast => true; | 752 bool get isBroadcast => true; |
| 742 | 753 |
| 743 StreamSubscription<T> listen(void onData(T data), | 754 StreamSubscription<T> listen(void onData(T data), |
| 744 { void onError(Object error), | 755 { Function onError, |
| 745 void onDone(), | 756 void onDone(), |
| 746 bool cancelOnError}) { | 757 bool cancelOnError}) { |
| 747 if (_controller == null) { | 758 if (_controller == null) { |
| 748 // Return a dummy subscription backed by nothing, since | 759 // Return a dummy subscription backed by nothing, since |
| 749 // it won't ever receive any events. | 760 // it won't ever receive any events. |
| 750 return new _DummyStreamSubscription<T>(); | 761 return new _DummyStreamSubscription<T>(); |
| 751 } | 762 } |
| 752 if (_subscription == null) { | 763 if (_subscription == null) { |
| 753 _subscription = _source.listen(_controller.add, | 764 _subscription = _source.listen(_controller.add, |
| 754 onError: _controller.addError, | 765 onError: _controller.addError, |
| (...skipping 214 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 969 _state = _STATE_FOUND; | 980 _state = _STATE_FOUND; |
| 970 hasNext._complete(true); | 981 hasNext._complete(true); |
| 971 return; | 982 return; |
| 972 } | 983 } |
| 973 _subscription.pause(); | 984 _subscription.pause(); |
| 974 assert(_futureOrPrefetch == null); | 985 assert(_futureOrPrefetch == null); |
| 975 _futureOrPrefetch = data; | 986 _futureOrPrefetch = data; |
| 976 _state = _STATE_EXTRA_DATA; | 987 _state = _STATE_EXTRA_DATA; |
| 977 } | 988 } |
| 978 | 989 |
| 979 void _onError(Object error) { | 990 void _onError(Object error, [StackTrace stackTrace]) { |
| 980 if (_state == _STATE_MOVING) { | 991 if (_state == _STATE_MOVING) { |
| 981 _Future<bool> hasNext = _futureOrPrefetch; | 992 _Future<bool> hasNext = _futureOrPrefetch; |
| 982 // We have cancelOnError: true, so the subscription is canceled. | 993 // We have cancelOnError: true, so the subscription is canceled. |
| 983 _clear(); | 994 _clear(); |
| 984 hasNext._completeError(error); | 995 hasNext._completeError(error, stackTrace); |
| 985 return; | 996 return; |
| 986 } | 997 } |
| 987 _subscription.pause(); | 998 _subscription.pause(); |
| 988 assert(_futureOrPrefetch == null); | 999 assert(_futureOrPrefetch == null); |
| 989 _futureOrPrefetch = error; | 1000 _futureOrPrefetch = error; |
| 990 _state = _STATE_EXTRA_ERROR; | 1001 _state = _STATE_EXTRA_ERROR; |
| 991 } | 1002 } |
| 992 | 1003 |
| 993 void _onDone() { | 1004 void _onDone() { |
| 994 if (_state == _STATE_MOVING) { | 1005 if (_state == _STATE_MOVING) { |
| 995 _Future<bool> hasNext = _futureOrPrefetch; | 1006 _Future<bool> hasNext = _futureOrPrefetch; |
| 996 _clear(); | 1007 _clear(); |
| 997 hasNext._complete(false); | 1008 hasNext._complete(false); |
| 998 return; | 1009 return; |
| 999 } | 1010 } |
| 1000 _subscription.pause(); | 1011 _subscription.pause(); |
| 1001 _futureOrPrefetch = null; | 1012 _futureOrPrefetch = null; |
| 1002 _state = _STATE_EXTRA_DONE; | 1013 _state = _STATE_EXTRA_DONE; |
| 1003 } | 1014 } |
| 1004 } | 1015 } |
| OLD | NEW |