| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
| 8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
| 9 void _add(T data); | 9 void _add(T data); |
| 10 void _addError(Object error, StackTrace stackTrace); | 10 void _addError(Object error, StackTrace stackTrace); |
| (...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 99 | 99 |
| 100 // TODO(floitsch): reuse another field | 100 // TODO(floitsch): reuse another field |
| 101 /** The future [_onCancel] may return. */ | 101 /** The future [_onCancel] may return. */ |
| 102 Future _cancelFuture; | 102 Future _cancelFuture; |
| 103 | 103 |
| 104 /** | 104 /** |
| 105 * Queue of pending events. | 105 * Queue of pending events. |
| 106 * | 106 * |
| 107 * Is created when necessary, or set in constructor for preconfigured events. | 107 * Is created when necessary, or set in constructor for preconfigured events. |
| 108 */ | 108 */ |
| 109 _PendingEvents _pending; | 109 _PendingEvents<T> _pending; |
| 110 | 110 |
| 111 _BufferingStreamSubscription(void onData(T data), | 111 _BufferingStreamSubscription(void onData(T data), |
| 112 Function onError, | 112 Function onError, |
| 113 void onDone(), | 113 void onDone(), |
| 114 bool cancelOnError) | 114 bool cancelOnError) |
| 115 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { | 115 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { |
| 116 this.onData(onData); | 116 this.onData(onData); |
| 117 this.onError(onError); | 117 this.onError(onError); |
| 118 this.onDone(onDone); | 118 this.onDone(onDone); |
| 119 } | 119 } |
| 120 | 120 |
| 121 /** | 121 /** |
| 122 * Sets the subscription's pending events object. | 122 * Sets the subscription's pending events object. |
| 123 * | 123 * |
| 124 * This can only be done once. The pending events object is used for the | 124 * This can only be done once. The pending events object is used for the |
| 125 * rest of the subscription's life cycle. | 125 * rest of the subscription's life cycle. |
| 126 */ | 126 */ |
| 127 void _setPendingEvents(_PendingEvents pendingEvents) { | 127 void _setPendingEvents(_PendingEvents<T> pendingEvents) { |
| 128 assert(_pending == null); | 128 assert(_pending == null); |
| 129 if (pendingEvents == null) return; | 129 if (pendingEvents == null) return; |
| 130 _pending = pendingEvents; | 130 _pending = pendingEvents; |
| 131 if (!pendingEvents.isEmpty) { | 131 if (!pendingEvents.isEmpty) { |
| 132 _state |= _STATE_HAS_PENDING; | 132 _state |= _STATE_HAS_PENDING; |
| 133 _pending.schedule(this); | 133 _pending.schedule(this); |
| 134 } | 134 } |
| 135 } | 135 } |
| 136 | 136 |
| 137 /** | |
| 138 * Extracts the pending events from a canceled stream. | |
| 139 * | |
| 140 * This can only be done during the [_onCancel] method call. After that, | |
| 141 * any remaining pending events will be cleared. | |
| 142 */ | |
| 143 _PendingEvents _extractPending() { | |
| 144 assert(_isCanceled); | |
| 145 _PendingEvents events = _pending; | |
| 146 _pending = null; | |
| 147 return events; | |
| 148 } | |
| 149 | |
| 150 // StreamSubscription interface. | 137 // StreamSubscription interface. |
| 151 | 138 |
| 152 void onData(void handleData(T event)) { | 139 void onData(void handleData(T event)) { |
| 153 if (handleData == null) handleData = _nullDataHandler; | 140 if (handleData == null) handleData = _nullDataHandler; |
| 154 _onData = _zone.registerUnaryCallback(handleData); | 141 // TODO(floitsch): the return type should be 'void', and the type |
| 142 // should be inferred. |
| 143 _onData = _zone.registerUnaryCallback/*<dynamic, T>*/(handleData); |
| 155 } | 144 } |
| 156 | 145 |
| 157 void onError(Function handleError) { | 146 void onError(Function handleError) { |
| 158 if (handleError == null) handleError = _nullErrorHandler; | 147 if (handleError == null) handleError = _nullErrorHandler; |
| 159 _onError = _registerErrorHandler(handleError, _zone); | 148 _onError = _registerErrorHandler/*<T>*/(handleError, _zone); |
| 160 } | 149 } |
| 161 | 150 |
| 162 void onDone(void handleDone()) { | 151 void onDone(void handleDone()) { |
| 163 if (handleDone == null) handleDone = _nullDoneHandler; | 152 if (handleDone == null) handleDone = _nullDoneHandler; |
| 164 _onDone = _zone.registerCallback(handleDone); | 153 _onDone = _zone.registerCallback(handleDone); |
| 165 } | 154 } |
| 166 | 155 |
| 167 void pause([Future resumeSignal]) { | 156 void pause([Future resumeSignal]) { |
| 168 if (_isCanceled) return; | 157 if (_isCanceled) return; |
| 169 bool wasPaused = _isPaused; | 158 bool wasPaused = _isPaused; |
| (...skipping 25 matching lines...) Expand all Loading... |
| 195 Future cancel() { | 184 Future cancel() { |
| 196 // The user doesn't want to receive any further events. If there is an | 185 // The user doesn't want to receive any further events. If there is an |
| 197 // error or done event pending (waiting for the cancel to be done) discard | 186 // error or done event pending (waiting for the cancel to be done) discard |
| 198 // that event. | 187 // that event. |
| 199 _state &= ~_STATE_WAIT_FOR_CANCEL; | 188 _state &= ~_STATE_WAIT_FOR_CANCEL; |
| 200 if (_isCanceled) return _cancelFuture; | 189 if (_isCanceled) return _cancelFuture; |
| 201 _cancel(); | 190 _cancel(); |
| 202 return _cancelFuture; | 191 return _cancelFuture; |
| 203 } | 192 } |
| 204 | 193 |
| 205 Future asFuture([var futureValue]) { | 194 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { |
| 206 _Future<T> result = new _Future<T>(); | 195 _Future/*<E>*/ result = new _Future/*<E>*/(); |
| 207 | 196 |
| 208 // Overwrite the onDone and onError handlers. | 197 // Overwrite the onDone and onError handlers. |
| 209 _onDone = () { result._complete(futureValue); }; | 198 _onDone = () { result._complete(futureValue); }; |
| 210 _onError = (error, stackTrace) { | 199 _onError = (error, stackTrace) { |
| 211 cancel(); | 200 cancel(); |
| 212 result._completeError(error, stackTrace); | 201 result._completeError(error, stackTrace); |
| 213 }; | 202 }; |
| 214 | 203 |
| 215 return result; | 204 return result; |
| 216 } | 205 } |
| (...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 262 } | 251 } |
| 263 | 252 |
| 264 // _EventSink interface. | 253 // _EventSink interface. |
| 265 | 254 |
| 266 void _add(T data) { | 255 void _add(T data) { |
| 267 assert(!_isClosed); | 256 assert(!_isClosed); |
| 268 if (_isCanceled) return; | 257 if (_isCanceled) return; |
| 269 if (_canFire) { | 258 if (_canFire) { |
| 270 _sendData(data); | 259 _sendData(data); |
| 271 } else { | 260 } else { |
| 272 _addPending(new _DelayedData(data)); | 261 _addPending(new _DelayedData<dynamic /*=T*/>(data)); |
| 273 } | 262 } |
| 274 } | 263 } |
| 275 | 264 |
| 276 void _addError(Object error, StackTrace stackTrace) { | 265 void _addError(Object error, StackTrace stackTrace) { |
| 277 if (_isCanceled) return; | 266 if (_isCanceled) return; |
| 278 if (_canFire) { | 267 if (_canFire) { |
| 279 _sendError(error, stackTrace); // Reports cancel after sending. | 268 _sendError(error, stackTrace); // Reports cancel after sending. |
| 280 } else { | 269 } else { |
| 281 _addPending(new _DelayedError(error, stackTrace)); | 270 _addPending(new _DelayedError(error, stackTrace)); |
| 282 } | 271 } |
| (...skipping 29 matching lines...) Expand all Loading... |
| 312 | 301 |
| 313 // Handle pending events. | 302 // Handle pending events. |
| 314 | 303 |
| 315 /** | 304 /** |
| 316 * Add a pending event. | 305 * Add a pending event. |
| 317 * | 306 * |
| 318 * If the subscription is not paused, this also schedules a firing | 307 * If the subscription is not paused, this also schedules a firing |
| 319 * of pending events later (if necessary). | 308 * of pending events later (if necessary). |
| 320 */ | 309 */ |
| 321 void _addPending(_DelayedEvent event) { | 310 void _addPending(_DelayedEvent event) { |
| 322 _StreamImplEvents pending = _pending; | 311 _StreamImplEvents<T> pending = _pending; |
| 323 if (_pending == null) pending = _pending = new _StreamImplEvents(); | 312 if (_pending == null) { |
| 313 pending = _pending = new _StreamImplEvents<dynamic /*=T*/>(); |
| 314 } |
| 324 pending.add(event); | 315 pending.add(event); |
| 325 if (!_hasPending) { | 316 if (!_hasPending) { |
| 326 _state |= _STATE_HAS_PENDING; | 317 _state |= _STATE_HAS_PENDING; |
| 327 if (!_isPaused) { | 318 if (!_isPaused) { |
| 328 _pending.schedule(this); | 319 _pending.schedule(this); |
| 329 } | 320 } |
| 330 } | 321 } |
| 331 } | 322 } |
| 332 | 323 |
| 333 /* _EventDispatch interface. */ | 324 /* _EventDispatch interface. */ |
| 334 | 325 |
| 335 void _sendData(T data) { | 326 void _sendData(T data) { |
| 336 assert(!_isCanceled); | 327 assert(!_isCanceled); |
| 337 assert(!_isPaused); | 328 assert(!_isPaused); |
| 338 assert(!_inCallback); | 329 assert(!_inCallback); |
| 339 bool wasInputPaused = _isInputPaused; | 330 bool wasInputPaused = _isInputPaused; |
| 340 _state |= _STATE_IN_CALLBACK; | 331 _state |= _STATE_IN_CALLBACK; |
| 341 _zone.runUnaryGuarded(_onData, data); | 332 _zone.runUnaryGuarded(_onData, data); |
| 342 _state &= ~_STATE_IN_CALLBACK; | 333 _state &= ~_STATE_IN_CALLBACK; |
| 343 _checkState(wasInputPaused); | 334 _checkState(wasInputPaused); |
| 344 } | 335 } |
| 345 | 336 |
| 346 void _sendError(Object error, StackTrace stackTrace) { | 337 void _sendError(var error, StackTrace stackTrace) { |
| 347 assert(!_isCanceled); | 338 assert(!_isCanceled); |
| 348 assert(!_isPaused); | 339 assert(!_isPaused); |
| 349 assert(!_inCallback); | 340 assert(!_inCallback); |
| 350 bool wasInputPaused = _isInputPaused; | 341 bool wasInputPaused = _isInputPaused; |
| 351 | 342 |
| 352 void sendError() { | 343 void sendError() { |
| 353 // If the subscription has been canceled while waiting for the cancel | 344 // If the subscription has been canceled while waiting for the cancel |
| 354 // future to finish we must not report the error. | 345 // future to finish we must not report the error. |
| 355 if (_isCanceled && !_waitsForCancel) return; | 346 if (_isCanceled && !_waitsForCancel) return; |
| 356 _state |= _STATE_IN_CALLBACK; | 347 _state |= _STATE_IN_CALLBACK; |
| 357 if (_onError is ZoneBinaryCallback) { | 348 if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) { |
| 358 _zone.runBinaryGuarded(_onError, error, stackTrace); | 349 ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError |
| 350 as Object /*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/; |
| 351 _zone.runBinaryGuarded(errorCallback, error, stackTrace); |
| 359 } else { | 352 } else { |
| 360 _zone.runUnaryGuarded(_onError, error); | 353 _zone.runUnaryGuarded/*<dynamic, dynamic>*/( |
| 354 _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error); |
| 361 } | 355 } |
| 362 _state &= ~_STATE_IN_CALLBACK; | 356 _state &= ~_STATE_IN_CALLBACK; |
| 363 } | 357 } |
| 364 | 358 |
| 365 if (_cancelOnError) { | 359 if (_cancelOnError) { |
| 366 _state |= _STATE_WAIT_FOR_CANCEL; | 360 _state |= _STATE_WAIT_FOR_CANCEL; |
| 367 _cancel(); | 361 _cancel(); |
| 368 if (_cancelFuture is Future) { | 362 if (_cancelFuture is Future) { |
| 369 _cancelFuture.whenComplete(sendError); | 363 _cancelFuture.whenComplete(sendError); |
| 370 } else { | 364 } else { |
| (...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 463 // ------------------------------------------------------------------- | 457 // ------------------------------------------------------------------- |
| 464 abstract class _StreamImpl<T> extends Stream<T> { | 458 abstract class _StreamImpl<T> extends Stream<T> { |
| 465 // ------------------------------------------------------------------ | 459 // ------------------------------------------------------------------ |
| 466 // Stream interface. | 460 // Stream interface. |
| 467 | 461 |
| 468 StreamSubscription<T> listen(void onData(T data), | 462 StreamSubscription<T> listen(void onData(T data), |
| 469 { Function onError, | 463 { Function onError, |
| 470 void onDone(), | 464 void onDone(), |
| 471 bool cancelOnError }) { | 465 bool cancelOnError }) { |
| 472 cancelOnError = identical(true, cancelOnError); | 466 cancelOnError = identical(true, cancelOnError); |
| 473 StreamSubscription subscription = | 467 StreamSubscription<T> subscription = |
| 474 _createSubscription(onData, onError, onDone, cancelOnError); | 468 _createSubscription(onData, onError, onDone, cancelOnError); |
| 475 _onListen(subscription); | 469 _onListen(subscription); |
| 476 return subscription; | 470 return subscription; |
| 477 } | 471 } |
| 478 | 472 |
| 479 // ------------------------------------------------------------------- | 473 // ------------------------------------------------------------------- |
| 480 /** Create a subscription object. Called by [subcribe]. */ | 474 /** Create a subscription object. Called by [subcribe]. */ |
| 481 StreamSubscription<T> _createSubscription( | 475 StreamSubscription<T> _createSubscription( |
| 482 void onData(T data), | 476 void onData(T data), |
| 483 Function onError, | 477 Function onError, |
| 484 void onDone(), | 478 void onDone(), |
| 485 bool cancelOnError) { | 479 bool cancelOnError) { |
| 486 return new _BufferingStreamSubscription<T>(onData, onError, onDone, | 480 return new _BufferingStreamSubscription<T>(onData, onError, onDone, |
| 487 cancelOnError); | 481 cancelOnError); |
| 488 } | 482 } |
| 489 | 483 |
| 490 /** Hook called when the subscription has been created. */ | 484 /** Hook called when the subscription has been created. */ |
| 491 void _onListen(StreamSubscription subscription) {} | 485 void _onListen(StreamSubscription subscription) {} |
| 492 } | 486 } |
| 493 | 487 |
| 494 typedef _PendingEvents _EventGenerator(); | 488 typedef _PendingEvents<T> _EventGenerator<T>(); |
| 495 | 489 |
| 496 /** Stream that generates its own events. */ | 490 /** Stream that generates its own events. */ |
| 497 class _GeneratedStreamImpl<T> extends _StreamImpl<T> { | 491 class _GeneratedStreamImpl<T> extends _StreamImpl<T> { |
| 498 final _EventGenerator _pending; | 492 final _EventGenerator<T> _pending; |
| 499 bool _isUsed = false; | 493 bool _isUsed = false; |
| 500 /** | 494 /** |
| 501 * Initializes the stream to have only the events provided by a | 495 * Initializes the stream to have only the events provided by a |
| 502 * [_PendingEvents]. | 496 * [_PendingEvents]. |
| 503 * | 497 * |
| 504 * A new [_PendingEvents] must be generated for each listen. | 498 * A new [_PendingEvents] must be generated for each listen. |
| 505 */ | 499 */ |
| 506 _GeneratedStreamImpl(this._pending); | 500 _GeneratedStreamImpl(this._pending); |
| 507 | 501 |
| 508 StreamSubscription<T> _createSubscription( | 502 StreamSubscription<T> _createSubscription( |
| 509 void onData(T data), | 503 void onData(T data), |
| 510 Function onError, | 504 Function onError, |
| 511 void onDone(), | 505 void onDone(), |
| 512 bool cancelOnError) { | 506 bool cancelOnError) { |
| 513 if (_isUsed) throw new StateError("Stream has already been listened to."); | 507 if (_isUsed) throw new StateError("Stream has already been listened to."); |
| 514 _isUsed = true; | 508 _isUsed = true; |
| 515 return new _BufferingStreamSubscription( | 509 return new _BufferingStreamSubscription<T>( |
| 516 onData, onError, onDone, cancelOnError).._setPendingEvents(_pending()); | 510 onData, onError, onDone, cancelOnError).._setPendingEvents(_pending()); |
| 517 } | 511 } |
| 518 } | 512 } |
| 519 | 513 |
| 520 | 514 |
| 521 /** Pending events object that gets its events from an [Iterable]. */ | 515 /** Pending events object that gets its events from an [Iterable]. */ |
| 522 class _IterablePendingEvents<T> extends _PendingEvents { | 516 class _IterablePendingEvents<T> extends _PendingEvents<T> { |
| 523 // The iterator providing data for data events. | 517 // The iterator providing data for data events. |
| 524 // Set to null when iteration has completed. | 518 // Set to null when iteration has completed. |
| 525 Iterator<T> _iterator; | 519 Iterator<T> _iterator; |
| 526 | 520 |
| 527 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; | 521 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; |
| 528 | 522 |
| 529 bool get isEmpty => _iterator == null; | 523 bool get isEmpty => _iterator == null; |
| 530 | 524 |
| 531 void handleNext(_EventDispatch dispatch) { | 525 void handleNext(_EventDispatch<T> dispatch) { |
| 532 if (_iterator == null) { | 526 if (_iterator == null) { |
| 533 throw new StateError("No events pending."); | 527 throw new StateError("No events pending."); |
| 534 } | 528 } |
| 535 // Send one event per call to moveNext. | 529 // Send one event per call to moveNext. |
| 536 // If moveNext returns true, send the current element as data. | 530 // If moveNext returns true, send the current element as data. |
| 537 // If moveNext returns false, send a done event and clear the _iterator. | 531 // If moveNext returns false, send a done event and clear the _iterator. |
| 538 // If moveNext throws an error, send an error and clear the _iterator. | 532 // If moveNext throws an error, send an error and clear the _iterator. |
| 539 // After an error, no further events will be sent. | 533 // After an error, no further events will be sent. |
| 540 bool isDone; | 534 bool isDone; |
| 541 try { | 535 try { |
| (...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 615 } | 609 } |
| 616 | 610 |
| 617 _DelayedEvent get next => null; | 611 _DelayedEvent get next => null; |
| 618 | 612 |
| 619 void set next(_DelayedEvent _) { | 613 void set next(_DelayedEvent _) { |
| 620 throw new StateError("No events after a done."); | 614 throw new StateError("No events after a done."); |
| 621 } | 615 } |
| 622 } | 616 } |
| 623 | 617 |
| 624 /** Superclass for provider of pending events. */ | 618 /** Superclass for provider of pending events. */ |
| 625 abstract class _PendingEvents { | 619 abstract class _PendingEvents<T> { |
| 626 // No async event has been scheduled. | 620 // No async event has been scheduled. |
| 627 static const int _STATE_UNSCHEDULED = 0; | 621 static const int _STATE_UNSCHEDULED = 0; |
| 628 // An async event has been scheduled to run a function. | 622 // An async event has been scheduled to run a function. |
| 629 static const int _STATE_SCHEDULED = 1; | 623 static const int _STATE_SCHEDULED = 1; |
| 630 // An async event has been scheduled, but it will do nothing when it runs. | 624 // An async event has been scheduled, but it will do nothing when it runs. |
| 631 // Async events can't be preempted. | 625 // Async events can't be preempted. |
| 632 static const int _STATE_CANCELED = 3; | 626 static const int _STATE_CANCELED = 3; |
| 633 | 627 |
| 634 /** | 628 /** |
| 635 * State of being scheduled. | 629 * State of being scheduled. |
| (...skipping 13 matching lines...) Expand all Loading... |
| 649 | 643 |
| 650 bool get isScheduled => _state == _STATE_SCHEDULED; | 644 bool get isScheduled => _state == _STATE_SCHEDULED; |
| 651 bool get _eventScheduled => _state >= _STATE_SCHEDULED; | 645 bool get _eventScheduled => _state >= _STATE_SCHEDULED; |
| 652 | 646 |
| 653 /** | 647 /** |
| 654 * Schedule an event to run later. | 648 * Schedule an event to run later. |
| 655 * | 649 * |
| 656 * If called more than once, it should be called with the same dispatch as | 650 * If called more than once, it should be called with the same dispatch as |
| 657 * argument each time. It may reuse an earlier argument in some cases. | 651 * argument each time. It may reuse an earlier argument in some cases. |
| 658 */ | 652 */ |
| 659 void schedule(_EventDispatch dispatch) { | 653 void schedule(_EventDispatch<T> dispatch) { |
| 660 if (isScheduled) return; | 654 if (isScheduled) return; |
| 661 assert(!isEmpty); | 655 assert(!isEmpty); |
| 662 if (_eventScheduled) { | 656 if (_eventScheduled) { |
| 663 assert(_state == _STATE_CANCELED); | 657 assert(_state == _STATE_CANCELED); |
| 664 _state = _STATE_SCHEDULED; | 658 _state = _STATE_SCHEDULED; |
| 665 return; | 659 return; |
| 666 } | 660 } |
| 667 scheduleMicrotask(() { | 661 scheduleMicrotask(() { |
| 668 int oldState = _state; | 662 int oldState = _state; |
| 669 _state = _STATE_UNSCHEDULED; | 663 _state = _STATE_UNSCHEDULED; |
| 670 if (oldState == _STATE_CANCELED) return; | 664 if (oldState == _STATE_CANCELED) return; |
| 671 handleNext(dispatch); | 665 handleNext(dispatch); |
| 672 }); | 666 }); |
| 673 _state = _STATE_SCHEDULED; | 667 _state = _STATE_SCHEDULED; |
| 674 } | 668 } |
| 675 | 669 |
| 676 void cancelSchedule() { | 670 void cancelSchedule() { |
| 677 if (isScheduled) _state = _STATE_CANCELED; | 671 if (isScheduled) _state = _STATE_CANCELED; |
| 678 } | 672 } |
| 679 | 673 |
| 680 void handleNext(_EventDispatch dispatch); | 674 void handleNext(_EventDispatch<T> dispatch); |
| 681 | 675 |
| 682 /** Throw away any pending events and cancel scheduled events. */ | 676 /** Throw away any pending events and cancel scheduled events. */ |
| 683 void clear(); | 677 void clear(); |
| 684 } | 678 } |
| 685 | 679 |
| 686 | 680 |
| 687 /** Class holding pending events for a [_StreamImpl]. */ | 681 /** Class holding pending events for a [_StreamImpl]. */ |
| 688 class _StreamImplEvents extends _PendingEvents { | 682 class _StreamImplEvents<T> extends _PendingEvents<T> { |
| 689 /// Single linked list of [_DelayedEvent] objects. | 683 /// Single linked list of [_DelayedEvent] objects. |
| 690 _DelayedEvent firstPendingEvent = null; | 684 _DelayedEvent firstPendingEvent = null; |
| 691 /// Last element in the list of pending events. New events are added after it. | 685 /// Last element in the list of pending events. New events are added after it. |
| 692 _DelayedEvent lastPendingEvent = null; | 686 _DelayedEvent lastPendingEvent = null; |
| 693 | 687 |
| 694 bool get isEmpty => lastPendingEvent == null; | 688 bool get isEmpty => lastPendingEvent == null; |
| 695 | 689 |
| 696 void add(_DelayedEvent event) { | 690 void add(_DelayedEvent event) { |
| 697 if (lastPendingEvent == null) { | 691 if (lastPendingEvent == null) { |
| 698 firstPendingEvent = lastPendingEvent = event; | 692 firstPendingEvent = lastPendingEvent = event; |
| 699 } else { | 693 } else { |
| 700 lastPendingEvent = lastPendingEvent.next = event; | 694 lastPendingEvent = lastPendingEvent.next = event; |
| 701 } | 695 } |
| 702 } | 696 } |
| 703 | 697 |
| 704 void handleNext(_EventDispatch dispatch) { | 698 void handleNext(_EventDispatch<T> dispatch) { |
| 705 assert(!isScheduled); | 699 assert(!isScheduled); |
| 706 _DelayedEvent event = firstPendingEvent; | 700 _DelayedEvent event = firstPendingEvent; |
| 707 firstPendingEvent = event.next; | 701 firstPendingEvent = event.next; |
| 708 if (firstPendingEvent == null) { | 702 if (firstPendingEvent == null) { |
| 709 lastPendingEvent = null; | 703 lastPendingEvent = null; |
| 710 } | 704 } |
| 711 event.perform(dispatch); | 705 event.perform(dispatch); |
| 712 } | 706 } |
| 713 | 707 |
| 714 void clear() { | 708 void clear() { |
| (...skipping 14 matching lines...) Expand all Loading... |
| 729 | 723 |
| 730 void _insertBefore(_BroadcastLinkedList newNext) { | 724 void _insertBefore(_BroadcastLinkedList newNext) { |
| 731 _BroadcastLinkedList newPrevious = newNext._previous; | 725 _BroadcastLinkedList newPrevious = newNext._previous; |
| 732 newPrevious._next = this; | 726 newPrevious._next = this; |
| 733 newNext._previous = _previous; | 727 newNext._previous = _previous; |
| 734 _previous._next = newNext; | 728 _previous._next = newNext; |
| 735 _previous = newPrevious; | 729 _previous = newPrevious; |
| 736 } | 730 } |
| 737 } | 731 } |
| 738 | 732 |
| 739 typedef void _broadcastCallback(StreamSubscription subscription); | 733 typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription); |
| 740 | 734 |
| 741 /** | 735 /** |
| 742 * Done subscription that will send one done event as soon as possible. | 736 * Done subscription that will send one done event as soon as possible. |
| 743 */ | 737 */ |
| 744 class _DoneStreamSubscription<T> implements StreamSubscription<T> { | 738 class _DoneStreamSubscription<T> implements StreamSubscription<T> { |
| 745 static const int _DONE_SENT = 1; | 739 static const int _DONE_SENT = 1; |
| 746 static const int _SCHEDULED = 2; | 740 static const int _SCHEDULED = 2; |
| 747 static const int _PAUSED = 4; | 741 static const int _PAUSED = 4; |
| 748 | 742 |
| 749 final Zone _zone; | 743 final Zone _zone; |
| (...skipping 27 matching lines...) Expand all Loading... |
| 777 if (isPaused) { | 771 if (isPaused) { |
| 778 _state -= _PAUSED; | 772 _state -= _PAUSED; |
| 779 if (!isPaused && !_isSent) { | 773 if (!isPaused && !_isSent) { |
| 780 _schedule(); | 774 _schedule(); |
| 781 } | 775 } |
| 782 } | 776 } |
| 783 } | 777 } |
| 784 | 778 |
| 785 Future cancel() => null; | 779 Future cancel() => null; |
| 786 | 780 |
| 787 Future asFuture([futureValue]) { | 781 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { |
| 788 _Future result = new _Future(); | 782 _Future/*<E>*/ result = new _Future/*<E>*/(); |
| 789 _onDone = () { result._completeWithValue(null); }; | 783 _onDone = () { result._completeWithValue(null); }; |
| 790 return result; | 784 return result; |
| 791 } | 785 } |
| 792 | 786 |
| 793 void _sendDone() { | 787 void _sendDone() { |
| 794 _state &= ~_SCHEDULED; | 788 _state &= ~_SCHEDULED; |
| 795 if (isPaused) return; | 789 if (isPaused) return; |
| 796 _state |= _DONE_SENT; | 790 _state |= _DONE_SENT; |
| 797 if (_onDone != null) _zone.runGuarded(_onDone); | 791 if (_onDone != null) _zone.runGuarded(_onDone); |
| 798 } | 792 } |
| 799 } | 793 } |
| 800 | 794 |
| 801 class _AsBroadcastStream<T> extends Stream<T> { | 795 class _AsBroadcastStream<T> extends Stream<T> { |
| 802 final Stream<T> _source; | 796 final Stream<T> _source; |
| 803 final _broadcastCallback _onListenHandler; | 797 final _BroadcastCallback<T> _onListenHandler; |
| 804 final _broadcastCallback _onCancelHandler; | 798 final _BroadcastCallback<T> _onCancelHandler; |
| 805 final Zone _zone; | 799 final Zone _zone; |
| 806 | 800 |
| 807 _AsBroadcastStreamController<T> _controller; | 801 _AsBroadcastStreamController<T> _controller; |
| 808 StreamSubscription<T> _subscription; | 802 StreamSubscription<T> _subscription; |
| 809 | 803 |
| 810 _AsBroadcastStream(this._source, | 804 _AsBroadcastStream(this._source, |
| 811 void onListenHandler(StreamSubscription subscription), | 805 void onListenHandler(StreamSubscription<T> subscription), |
| 812 void onCancelHandler(StreamSubscription subscription)) | 806 void onCancelHandler(StreamSubscription<T> subscription)) |
| 813 : _onListenHandler = Zone.current.registerUnaryCallback(onListenHandler), | 807 // TODO(floitsch): the return type should be void and should be |
| 814 _onCancelHandler = Zone.current.registerUnaryCallback(onCancelHandler), | 808 // inferred. |
| 809 : _onListenHandler = Zone.current.registerUnaryCallback |
| 810 /*<dynamic, StreamSubscription<T>>*/(onListenHandler), |
| 811 _onCancelHandler = Zone.current.registerUnaryCallback |
| 812 /*<dynamic, StreamSubscription<T>>*/(onCancelHandler), |
| 815 _zone = Zone.current { | 813 _zone = Zone.current { |
| 816 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); | 814 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); |
| 817 } | 815 } |
| 818 | 816 |
| 819 bool get isBroadcast => true; | 817 bool get isBroadcast => true; |
| 820 | 818 |
| 821 StreamSubscription<T> listen(void onData(T data), | 819 StreamSubscription<T> listen(void onData(T data), |
| 822 { Function onError, | 820 { Function onError, |
| 823 void onDone(), | 821 void onDone(), |
| 824 bool cancelOnError}) { | 822 bool cancelOnError}) { |
| 825 if (_controller == null || _controller.isClosed) { | 823 if (_controller == null || _controller.isClosed) { |
| 826 // Return a dummy subscription backed by nothing, since | 824 // Return a dummy subscription backed by nothing, since |
| 827 // it will only ever send one done event. | 825 // it will only ever send one done event. |
| 828 return new _DoneStreamSubscription<T>(onDone); | 826 return new _DoneStreamSubscription<T>(onDone); |
| 829 } | 827 } |
| 830 if (_subscription == null) { | 828 if (_subscription == null) { |
| 831 _subscription = _source.listen(_controller.add, | 829 _subscription = _source.listen(_controller.add, |
| 832 onError: _controller.addError, | 830 onError: _controller.addError, |
| 833 onDone: _controller.close); | 831 onDone: _controller.close); |
| 834 } | 832 } |
| 835 cancelOnError = identical(true, cancelOnError); | 833 cancelOnError = identical(true, cancelOnError); |
| 836 return _controller._subscribe(onData, onError, onDone, cancelOnError); | 834 return _controller._subscribe(onData, onError, onDone, cancelOnError); |
| 837 } | 835 } |
| 838 | 836 |
| 839 void _onCancel() { | 837 void _onCancel() { |
| 840 bool shutdown = (_controller == null) || _controller.isClosed; | 838 bool shutdown = (_controller == null) || _controller.isClosed; |
| 841 if (_onCancelHandler != null) { | 839 if (_onCancelHandler != null) { |
| 842 _zone.runUnary(_onCancelHandler, new _BroadcastSubscriptionWrapper(this)); | 840 _zone.runUnary( |
| 841 _onCancelHandler, new _BroadcastSubscriptionWrapper<T>(this)); |
| 843 } | 842 } |
| 844 if (shutdown) { | 843 if (shutdown) { |
| 845 if (_subscription != null) { | 844 if (_subscription != null) { |
| 846 _subscription.cancel(); | 845 _subscription.cancel(); |
| 847 _subscription = null; | 846 _subscription = null; |
| 848 } | 847 } |
| 849 } | 848 } |
| 850 } | 849 } |
| 851 | 850 |
| 852 void _onListen() { | 851 void _onListen() { |
| 853 if (_onListenHandler != null) { | 852 if (_onListenHandler != null) { |
| 854 _zone.runUnary(_onListenHandler, new _BroadcastSubscriptionWrapper(this)); | 853 _zone.runUnary( |
| 854 _onListenHandler, new _BroadcastSubscriptionWrapper<T>(this)); |
| 855 } | 855 } |
| 856 } | 856 } |
| 857 | 857 |
| 858 // Methods called from _BroadcastSubscriptionWrapper. | 858 // Methods called from _BroadcastSubscriptionWrapper. |
| 859 void _cancelSubscription() { | 859 void _cancelSubscription() { |
| 860 if (_subscription == null) return; | 860 if (_subscription == null) return; |
| 861 // Called by [_controller] when it has no subscribers left. | 861 // Called by [_controller] when it has no subscribers left. |
| 862 StreamSubscription subscription = _subscription; | 862 StreamSubscription subscription = _subscription; |
| 863 _subscription = null; | 863 _subscription = null; |
| 864 _controller = null; // Marks the stream as no longer listenable. | 864 _controller = null; // Marks the stream as no longer listenable. |
| (...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 914 | 914 |
| 915 Future cancel() { | 915 Future cancel() { |
| 916 _stream._cancelSubscription(); | 916 _stream._cancelSubscription(); |
| 917 return null; | 917 return null; |
| 918 } | 918 } |
| 919 | 919 |
| 920 bool get isPaused { | 920 bool get isPaused { |
| 921 return _stream._isSubscriptionPaused; | 921 return _stream._isSubscriptionPaused; |
| 922 } | 922 } |
| 923 | 923 |
| 924 Future asFuture([var futureValue]) { | 924 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { |
| 925 throw new UnsupportedError( | 925 throw new UnsupportedError( |
| 926 "Cannot change handlers of asBroadcastStream source subscription."); | 926 "Cannot change handlers of asBroadcastStream source subscription."); |
| 927 } | 927 } |
| 928 } | 928 } |
| 929 | 929 |
| 930 | 930 |
| 931 /** | 931 /** |
| 932 * Simple implementation of [StreamIterator]. | 932 * Simple implementation of [StreamIterator]. |
| 933 */ | 933 */ |
| 934 class _StreamIteratorImpl<T> implements StreamIterator<T> { | 934 class _StreamIteratorImpl<T> implements StreamIterator<T> { |
| (...skipping 30 matching lines...) Expand all Loading... |
| 965 /// The current element represented by the most recent call to moveNext. | 965 /// The current element represented by the most recent call to moveNext. |
| 966 /// | 966 /// |
| 967 /// Is null between the time moveNext is called and its future completes. | 967 /// Is null between the time moveNext is called and its future completes. |
| 968 T _current = null; | 968 T _current = null; |
| 969 | 969 |
| 970 /// The future returned by the most recent call to [moveNext]. | 970 /// The future returned by the most recent call to [moveNext]. |
| 971 /// | 971 /// |
| 972 /// Also used to store the next value/error in case the stream provides an | 972 /// Also used to store the next value/error in case the stream provides an |
| 973 /// event before [moveNext] is called again. In that case, the stream will | 973 /// event before [moveNext] is called again. In that case, the stream will |
| 974 /// be paused to prevent further events. | 974 /// be paused to prevent further events. |
| 975 var _futureOrPrefetch = null; | 975 var/*Future<bool> or T*/ _futureOrPrefetch = null; |
| 976 | 976 |
| 977 /// The current state. | 977 /// The current state. |
| 978 int _state = _STATE_FOUND; | 978 int _state = _STATE_FOUND; |
| 979 | 979 |
| 980 _StreamIteratorImpl(final Stream<T> stream) { | 980 _StreamIteratorImpl(final Stream<T> stream) { |
| 981 _subscription = stream.listen(_onData, | 981 _subscription = stream.listen(_onData, |
| 982 onError: _onError, | 982 onError: _onError, |
| 983 onDone: _onDone, | 983 onDone: _onDone, |
| 984 cancelOnError: true); | 984 cancelOnError: true); |
| 985 } | 985 } |
| 986 | 986 |
| 987 T get current => _current; | 987 T get current => _current; |
| 988 | 988 |
| 989 Future<bool> moveNext() { | 989 Future<bool> moveNext() { |
| 990 if (_state == _STATE_DONE) { | 990 if (_state == _STATE_DONE) { |
| 991 return new _Future<bool>.immediate(false); | 991 return new _Future<bool>.immediate(false); |
| 992 } | 992 } |
| 993 if (_state == _STATE_MOVING) { | 993 if (_state == _STATE_MOVING) { |
| 994 throw new StateError("Already waiting for next."); | 994 throw new StateError("Already waiting for next."); |
| 995 } | 995 } |
| 996 if (_state == _STATE_FOUND) { | 996 if (_state == _STATE_FOUND) { |
| 997 _state = _STATE_MOVING; | 997 _state = _STATE_MOVING; |
| 998 _current = null; | 998 _current = null; |
| 999 _futureOrPrefetch = new _Future<bool>(); | 999 var result = new _Future<bool>(); |
| 1000 return _futureOrPrefetch; | 1000 _futureOrPrefetch = result; |
| 1001 return result; |
| 1001 } else { | 1002 } else { |
| 1002 assert(_state >= _STATE_EXTRA_DATA); | 1003 assert(_state >= _STATE_EXTRA_DATA); |
| 1003 switch (_state) { | 1004 switch (_state) { |
| 1004 case _STATE_EXTRA_DATA: | 1005 case _STATE_EXTRA_DATA: |
| 1005 _state = _STATE_FOUND; | 1006 _state = _STATE_FOUND; |
| 1006 _current = _futureOrPrefetch; | 1007 _current = _futureOrPrefetch as Object /*=T*/; |
| 1007 _futureOrPrefetch = null; | 1008 _futureOrPrefetch = null; |
| 1008 _subscription.resume(); | 1009 _subscription.resume(); |
| 1009 return new _Future<bool>.immediate(true); | 1010 return new _Future<bool>.immediate(true); |
| 1010 case _STATE_EXTRA_ERROR: | 1011 case _STATE_EXTRA_ERROR: |
| 1011 AsyncError prefetch = _futureOrPrefetch; | 1012 AsyncError prefetch = _futureOrPrefetch; |
| 1012 _clear(); | 1013 _clear(); |
| 1013 return new _Future<bool>.immediateError(prefetch.error, | 1014 return new _Future<bool>.immediateError(prefetch.error, |
| 1014 prefetch.stackTrace); | 1015 prefetch.stackTrace); |
| 1015 case _STATE_EXTRA_DONE: | 1016 case _STATE_EXTRA_DONE: |
| 1016 _clear(); | 1017 _clear(); |
| 1017 return new _Future<bool>.immediate(false); | 1018 return new _Future<bool>.immediate(false); |
| 1018 } | 1019 } |
| 1019 } | 1020 } |
| 1020 } | 1021 } |
| 1021 | 1022 |
| 1022 /** Clears up the internal state when the iterator ends. */ | 1023 /** Clears up the internal state when the iterator ends. */ |
| 1023 void _clear() { | 1024 void _clear() { |
| 1024 _subscription = null; | 1025 _subscription = null; |
| 1025 _futureOrPrefetch = null; | 1026 _futureOrPrefetch = null; |
| 1026 _current = null; | 1027 _current = null; |
| 1027 _state = _STATE_DONE; | 1028 _state = _STATE_DONE; |
| 1028 } | 1029 } |
| 1029 | 1030 |
| 1030 Future cancel() { | 1031 Future cancel() { |
| 1031 StreamSubscription subscription = _subscription; | 1032 StreamSubscription subscription = _subscription; |
| 1032 // Cherry pick of: https://codereview.chromium.org//896793002 | |
| 1033 if (subscription == null) return null; | 1033 if (subscription == null) return null; |
| 1034 if (_state == _STATE_MOVING) { | 1034 if (_state == _STATE_MOVING) { |
| 1035 _Future<bool> hasNext = _futureOrPrefetch; | 1035 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; |
| 1036 _clear(); | 1036 _clear(); |
| 1037 hasNext._complete(false); | 1037 hasNext._complete(false); |
| 1038 } else { | 1038 } else { |
| 1039 _clear(); | 1039 _clear(); |
| 1040 } | 1040 } |
| 1041 return subscription.cancel(); | 1041 return subscription.cancel(); |
| 1042 } | 1042 } |
| 1043 | 1043 |
| 1044 void _onData(T data) { | 1044 void _onData(T data) { |
| 1045 if (_state == _STATE_MOVING) { | 1045 if (_state == _STATE_MOVING) { |
| 1046 _current = data; | 1046 _current = data; |
| 1047 _Future<bool> hasNext = _futureOrPrefetch; | 1047 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; |
| 1048 _futureOrPrefetch = null; | 1048 _futureOrPrefetch = null; |
| 1049 _state = _STATE_FOUND; | 1049 _state = _STATE_FOUND; |
| 1050 hasNext._complete(true); | 1050 hasNext._complete(true); |
| 1051 return; | 1051 return; |
| 1052 } | 1052 } |
| 1053 _subscription.pause(); | 1053 _subscription.pause(); |
| 1054 assert(_futureOrPrefetch == null); | 1054 assert(_futureOrPrefetch == null); |
| 1055 _futureOrPrefetch = data; | 1055 _futureOrPrefetch = data; |
| 1056 _state = _STATE_EXTRA_DATA; | 1056 _state = _STATE_EXTRA_DATA; |
| 1057 } | 1057 } |
| 1058 | 1058 |
| 1059 void _onError(Object error, [StackTrace stackTrace]) { | 1059 void _onError(Object error, [StackTrace stackTrace]) { |
| 1060 if (_state == _STATE_MOVING) { | 1060 if (_state == _STATE_MOVING) { |
| 1061 _Future<bool> hasNext = _futureOrPrefetch; | 1061 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; |
| 1062 // We have cancelOnError: true, so the subscription is canceled. | 1062 // We have cancelOnError: true, so the subscription is canceled. |
| 1063 _clear(); | 1063 _clear(); |
| 1064 hasNext._completeError(error, stackTrace); | 1064 hasNext._completeError(error, stackTrace); |
| 1065 return; | 1065 return; |
| 1066 } | 1066 } |
| 1067 _subscription.pause(); | 1067 _subscription.pause(); |
| 1068 assert(_futureOrPrefetch == null); | 1068 assert(_futureOrPrefetch == null); |
| 1069 _futureOrPrefetch = new AsyncError(error, stackTrace); | 1069 _futureOrPrefetch = new AsyncError(error, stackTrace); |
| 1070 _state = _STATE_EXTRA_ERROR; | 1070 _state = _STATE_EXTRA_ERROR; |
| 1071 } | 1071 } |
| 1072 | 1072 |
| 1073 void _onDone() { | 1073 void _onDone() { |
| 1074 if (_state == _STATE_MOVING) { | 1074 if (_state == _STATE_MOVING) { |
| 1075 _Future<bool> hasNext = _futureOrPrefetch; | 1075 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; |
| 1076 _clear(); | 1076 _clear(); |
| 1077 hasNext._complete(false); | 1077 hasNext._complete(false); |
| 1078 return; | 1078 return; |
| 1079 } | 1079 } |
| 1080 _subscription.pause(); | 1080 _subscription.pause(); |
| 1081 _futureOrPrefetch = null; | 1081 _futureOrPrefetch = null; |
| 1082 _state = _STATE_EXTRA_DONE; | 1082 _state = _STATE_EXTRA_DONE; |
| 1083 } | 1083 } |
| 1084 } | 1084 } |
| 1085 |
| 1086 /** An empty broadcast stream, sending a done event as soon as possible. */ |
| 1087 class _EmptyStream<T> extends Stream<T> { |
| 1088 const _EmptyStream() : super._internal(); |
| 1089 bool get isBroadcast => true; |
| 1090 StreamSubscription<T> listen(void onData(T data), |
| 1091 {Function onError, |
| 1092 void onDone(), |
| 1093 bool cancelOnError}) { |
| 1094 return new _DoneStreamSubscription<T>(onDone); |
| 1095 } |
| 1096 } |
| OLD | NEW |