| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 84 * | 84 * |
| 85 * The [onListen] callback is called when the stream | 85 * The [onListen] callback is called when the stream |
| 86 * receives its listener and [onCancel] when the listener ends | 86 * receives its listener and [onCancel] when the listener ends |
| 87 * its subscription. If [onCancel] needs to perform an asynchronous operation, | 87 * its subscription. If [onCancel] needs to perform an asynchronous operation, |
| 88 * [onCancel] should return a future that completes when the cancel operation | 88 * [onCancel] should return a future that completes when the cancel operation |
| 89 * is done. | 89 * is done. |
| 90 * | 90 * |
| 91 * If the stream is canceled before the controller needs new data the | 91 * If the stream is canceled before the controller needs new data the |
| 92 * [onResume] call might not be executed. | 92 * [onResume] call might not be executed. |
| 93 */ | 93 */ |
| 94 factory StreamController({void onListen(), | 94 factory StreamController( |
| 95 void onPause(), | 95 {void onListen(), |
| 96 void onResume(), | 96 void onPause(), |
| 97 onCancel(), | 97 void onResume(), |
| 98 bool sync: false}) { | 98 onCancel(), |
| 99 bool sync: false}) { |
| 99 return sync | 100 return sync |
| 100 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) | 101 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) |
| 101 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); | 102 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); |
| 102 } | 103 } |
| 103 | 104 |
| 104 /** | 105 /** |
| 105 * A controller where [stream] can be listened to more than once. | 106 * A controller where [stream] can be listened to more than once. |
| 106 * | 107 * |
| 107 * The [Stream] returned by [stream] is a broadcast stream. | 108 * The [Stream] returned by [stream] is a broadcast stream. |
| 108 * It can be listened to more than once. | 109 * It can be listened to more than once. |
| 109 * | 110 * |
| 110 * A Stream should be inert until a subscriber starts listening on it (using | 111 * A Stream should be inert until a subscriber starts listening on it (using |
| 111 * the [onListen] callback to start producing events). Streams should not | 112 * the [onListen] callback to start producing events). Streams should not |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 145 * A listener must be subscribed both when the event is initiated | 146 * A listener must be subscribed both when the event is initiated |
| 146 * (that is, when [add] is called) | 147 * (that is, when [add] is called) |
| 147 * and when the event is later delivered, | 148 * and when the event is later delivered, |
| 148 * in order to receive the event. | 149 * in order to receive the event. |
| 149 * | 150 * |
| 150 * The [onListen] callback is called when the first listener is subscribed, | 151 * The [onListen] callback is called when the first listener is subscribed, |
| 151 * and the [onCancel] is called when there are no longer any active listeners. | 152 * and the [onCancel] is called when there are no longer any active listeners. |
| 152 * If a listener is added again later, after the [onCancel] was called, | 153 * If a listener is added again later, after the [onCancel] was called, |
| 153 * the [onListen] will be called again. | 154 * the [onListen] will be called again. |
| 154 */ | 155 */ |
| 155 factory StreamController.broadcast({void onListen(), | 156 factory StreamController.broadcast( |
| 156 void onCancel(), | 157 {void onListen(), void onCancel(), bool sync: false}) { |
| 157 bool sync: false}) { | |
| 158 return sync | 158 return sync |
| 159 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) | 159 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) |
| 160 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); | 160 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); |
| 161 } | 161 } |
| 162 | 162 |
| 163 /** | 163 /** |
| 164 * The callback which is called when the stream is listened to. | 164 * The callback which is called when the stream is listened to. |
| 165 * | 165 * |
| 166 * May be set to `null`, in which case no callback will happen. | 166 * May be set to `null`, in which case no callback will happen. |
| 167 */ | 167 */ |
| (...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 255 * returned future. | 255 * returned future. |
| 256 * | 256 * |
| 257 * If [cancelOnError] is true, only the first error on [source] is | 257 * If [cancelOnError] is true, only the first error on [source] is |
| 258 * forwarded to the controller's stream, and the `addStream` ends | 258 * forwarded to the controller's stream, and the `addStream` ends |
| 259 * after this. If [cancelOnError] is false, all errors are forwarded | 259 * after this. If [cancelOnError] is false, all errors are forwarded |
| 260 * and only a done event will end the `addStream`. | 260 * and only a done event will end the `addStream`. |
| 261 */ | 261 */ |
| 262 Future addStream(Stream<T> source, {bool cancelOnError: true}); | 262 Future addStream(Stream<T> source, {bool cancelOnError: true}); |
| 263 } | 263 } |
| 264 | 264 |
| 265 | |
| 266 /** | 265 /** |
| 267 * A stream controller that delivers its events synchronously. | 266 * A stream controller that delivers its events synchronously. |
| 268 * | 267 * |
| 269 * A synchronous stream controller is intended for cases where | 268 * A synchronous stream controller is intended for cases where |
| 270 * an already asynchronous event triggers an event on a stream. | 269 * an already asynchronous event triggers an event on a stream. |
| 271 * | 270 * |
| 272 * Instead of adding the event to the stream in a later microtask, | 271 * Instead of adding the event to the stream in a later microtask, |
| 273 * causing extra latency, the event is instead fired immediately by the | 272 * causing extra latency, the event is instead fired immediately by the |
| 274 * synchronous stream controller, as if the stream event was | 273 * synchronous stream controller, as if the stream event was |
| 275 * the current event or microtask. | 274 * the current event or microtask. |
| (...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 355 * Closes the controller's stream. | 354 * Closes the controller's stream. |
| 356 * | 355 * |
| 357 * As [StreamController.close], but must not be called while an event is | 356 * As [StreamController.close], but must not be called while an event is |
| 358 * being added by [add], [addError] or [close]. | 357 * being added by [add], [addError] or [close]. |
| 359 */ | 358 */ |
| 360 Future close(); | 359 Future close(); |
| 361 } | 360 } |
| 362 | 361 |
| 363 abstract class _StreamControllerLifecycle<T> { | 362 abstract class _StreamControllerLifecycle<T> { |
| 364 StreamSubscription<T> _subscribe( | 363 StreamSubscription<T> _subscribe( |
| 365 void onData(T data), | 364 void onData(T data), Function onError, void onDone(), bool cancelOnError); |
| 366 Function onError, | |
| 367 void onDone(), | |
| 368 bool cancelOnError); | |
| 369 void _recordPause(StreamSubscription<T> subscription) {} | 365 void _recordPause(StreamSubscription<T> subscription) {} |
| 370 void _recordResume(StreamSubscription<T> subscription) {} | 366 void _recordResume(StreamSubscription<T> subscription) {} |
| 371 Future _recordCancel(StreamSubscription<T> subscription) => null; | 367 Future _recordCancel(StreamSubscription<T> subscription) => null; |
| 372 } | 368 } |
| 373 | 369 |
| 374 /** | 370 /** |
| 375 * Default implementation of [StreamController]. | 371 * Default implementation of [StreamController]. |
| 376 * | 372 * |
| 377 * Controls a stream that only supports a single controller. | 373 * Controls a stream that only supports a single controller. |
| 378 */ | 374 */ |
| 379 abstract class _StreamController<T> implements StreamController<T>, | 375 abstract class _StreamController<T> |
| 380 _StreamControllerLifecycle<T>, | 376 implements |
| 381 _EventSink<T>, | 377 StreamController<T>, |
| 382 _EventDispatch<T> { | 378 _StreamControllerLifecycle<T>, |
| 379 _EventSink<T>, |
| 380 _EventDispatch<T> { |
| 383 // The states are bit-flags. More than one can be set at a time. | 381 // The states are bit-flags. More than one can be set at a time. |
| 384 // | 382 // |
| 385 // The "subscription state" goes through the states: | 383 // The "subscription state" goes through the states: |
| 386 // initial -> subscribed -> canceled. | 384 // initial -> subscribed -> canceled. |
| 387 // These are mutually exclusive. | 385 // These are mutually exclusive. |
| 388 // The "closed" state records whether the [close] method has been called | 386 // The "closed" state records whether the [close] method has been called |
| 389 // on the controller. This can be done at any time. If done before | 387 // on the controller. This can be done at any time. If done before |
| 390 // subscription, the done event is queued. If done after cancel, the done | 388 // subscription, the done event is queued. If done after cancel, the done |
| 391 // event is ignored (just as any other event after a cancel). | 389 // event is ignored (just as any other event after a cancel). |
| 392 | 390 |
| (...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 443 // TODO(lrn): Could this be stored in the varData field too, if it's not | 441 // TODO(lrn): Could this be stored in the varData field too, if it's not |
| 444 // accessed until the call to "close"? Then we need to special case if it's | 442 // accessed until the call to "close"? Then we need to special case if it's |
| 445 // accessed earlier, or if close is called before subscribing. | 443 // accessed earlier, or if close is called before subscribing. |
| 446 _Future _doneFuture; | 444 _Future _doneFuture; |
| 447 | 445 |
| 448 ControllerCallback onListen; | 446 ControllerCallback onListen; |
| 449 ControllerCallback onPause; | 447 ControllerCallback onPause; |
| 450 ControllerCallback onResume; | 448 ControllerCallback onResume; |
| 451 ControllerCancelCallback onCancel; | 449 ControllerCancelCallback onCancel; |
| 452 | 450 |
| 453 _StreamController(this.onListen, | 451 _StreamController(this.onListen, this.onPause, this.onResume, this.onCancel); |
| 454 this.onPause, | |
| 455 this.onResume, | |
| 456 this.onCancel); | |
| 457 | 452 |
| 458 // Return a new stream every time. The streams are equal, but not identical. | 453 // Return a new stream every time. The streams are equal, but not identical. |
| 459 Stream<T> get stream => new _ControllerStream<T>(this); | 454 Stream<T> get stream => new _ControllerStream<T>(this); |
| 460 | 455 |
| 461 /** | 456 /** |
| 462 * Returns a view of this object that only exposes the [StreamSink] interface. | 457 * Returns a view of this object that only exposes the [StreamSink] interface. |
| 463 */ | 458 */ |
| 464 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); | 459 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); |
| 465 | 460 |
| 466 /** | 461 /** |
| 467 * Whether a listener has existed and been canceled. | 462 * Whether a listener has existed and been canceled. |
| 468 * | 463 * |
| 469 * After this, adding more events will be ignored. | 464 * After this, adding more events will be ignored. |
| 470 */ | 465 */ |
| 471 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; | 466 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; |
| 472 | 467 |
| 473 /** Whether there is an active listener. */ | 468 /** Whether there is an active listener. */ |
| 474 bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0; | 469 bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0; |
| 475 | 470 |
| 476 /** Whether there has not been a listener yet. */ | 471 /** Whether there has not been a listener yet. */ |
| 477 bool get _isInitialState => | 472 bool get _isInitialState => |
| 478 (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL; | 473 (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL; |
| 479 | 474 |
| 480 bool get isClosed => (_state & _STATE_CLOSED) != 0; | 475 bool get isClosed => (_state & _STATE_CLOSED) != 0; |
| 481 | 476 |
| 482 bool get isPaused => hasListener ? _subscription._isInputPaused | 477 bool get isPaused => |
| 483 : !_isCanceled; | 478 hasListener ? _subscription._isInputPaused : !_isCanceled; |
| 484 | 479 |
| 485 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0; | 480 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0; |
| 486 | 481 |
| 487 /** New events may not be added after close, or during addStream. */ | 482 /** New events may not be added after close, or during addStream. */ |
| 488 bool get _mayAddEvent => (_state < _STATE_CLOSED); | 483 bool get _mayAddEvent => (_state < _STATE_CLOSED); |
| 489 | 484 |
| 490 // Returns the pending events. | 485 // Returns the pending events. |
| 491 // Pending events are events added before a subscription exists. | 486 // Pending events are events added before a subscription exists. |
| 492 // They are added to the subscription when it is created. | 487 // They are added to the subscription when it is created. |
| 493 // Pending events, if any, are kept in the _varData field until the | 488 // Pending events, if any, are kept in the _varData field until the |
| 494 // stream is listened to. | 489 // stream is listened to. |
| 495 // While adding a stream, pending events are moved into the | 490 // While adding a stream, pending events are moved into the |
| 496 // state object to allow the state object to use the _varData field. | 491 // state object to allow the state object to use the _varData field. |
| 497 _PendingEvents<T> get _pendingEvents { | 492 _PendingEvents<T> get _pendingEvents { |
| 498 assert(_isInitialState); | 493 assert(_isInitialState); |
| 499 if (!_isAddingStream) { | 494 if (!_isAddingStream) { |
| 500 return _varData as Object /*=_PendingEvents<T>*/; | 495 return _varData as Object/*=_PendingEvents<T>*/; |
| 501 } | 496 } |
| 502 _StreamControllerAddStreamState<T> state = | 497 _StreamControllerAddStreamState<T> state = |
| 503 _varData as Object /*=_StreamControllerAddStreamState<T>*/; | 498 _varData as Object/*=_StreamControllerAddStreamState<T>*/; |
| 504 return state.varData as Object /*=_PendingEvents<T>*/; | 499 return state.varData as Object/*=_PendingEvents<T>*/; |
| 505 } | 500 } |
| 506 | 501 |
| 507 // Returns the pending events, and creates the object if necessary. | 502 // Returns the pending events, and creates the object if necessary. |
| 508 _StreamImplEvents<T> _ensurePendingEvents() { | 503 _StreamImplEvents<T> _ensurePendingEvents() { |
| 509 assert(_isInitialState); | 504 assert(_isInitialState); |
| 510 if (!_isAddingStream) { | 505 if (!_isAddingStream) { |
| 511 if (_varData == null) _varData = new _StreamImplEvents<T>(); | 506 if (_varData == null) _varData = new _StreamImplEvents<T>(); |
| 512 return _varData as Object /*=_StreamImplEvents<T>*/; | 507 return _varData as Object/*=_StreamImplEvents<T>*/; |
| 513 } | 508 } |
| 514 _StreamControllerAddStreamState<T> state = | 509 _StreamControllerAddStreamState<T> state = |
| 515 _varData as Object /*=_StreamControllerAddStreamState<T>*/; | 510 _varData as Object/*=_StreamControllerAddStreamState<T>*/; |
| 516 if (state.varData == null) state.varData = new _StreamImplEvents<T>(); | 511 if (state.varData == null) state.varData = new _StreamImplEvents<T>(); |
| 517 return state.varData as Object /*=_StreamImplEvents<T>*/; | 512 return state.varData as Object/*=_StreamImplEvents<T>*/; |
| 518 } | 513 } |
| 519 | 514 |
| 520 // Get the current subscription. | 515 // Get the current subscription. |
| 521 // If we are adding a stream, the subscription is moved into the state | 516 // If we are adding a stream, the subscription is moved into the state |
| 522 // object to allow the state object to use the _varData field. | 517 // object to allow the state object to use the _varData field. |
| 523 _ControllerSubscription<T> get _subscription { | 518 _ControllerSubscription<T> get _subscription { |
| 524 assert(hasListener); | 519 assert(hasListener); |
| 525 if (_isAddingStream) { | 520 if (_isAddingStream) { |
| 526 _StreamControllerAddStreamState<T> addState = | 521 _StreamControllerAddStreamState<T> addState = |
| 527 _varData as Object /*=_StreamControllerAddStreamState<T>*/; | 522 _varData as Object/*=_StreamControllerAddStreamState<T>*/; |
| 528 return addState.varData as Object /*=_ControllerSubscription<T>*/; | 523 return addState.varData as Object/*=_ControllerSubscription<T>*/; |
| 529 } | 524 } |
| 530 return _varData as Object /*=_ControllerSubscription<T>*/; | 525 return _varData as Object/*=_ControllerSubscription<T>*/; |
| 531 } | 526 } |
| 532 | 527 |
| 533 /** | 528 /** |
| 534 * Creates an error describing why an event cannot be added. | 529 * Creates an error describing why an event cannot be added. |
| 535 * | 530 * |
| 536 * The reason, and therefore the error message, depends on the current state. | 531 * The reason, and therefore the error message, depends on the current state. |
| 537 */ | 532 */ |
| 538 Error _badEventState() { | 533 Error _badEventState() { |
| 539 if (isClosed) { | 534 if (isClosed) { |
| 540 return new StateError("Cannot add event after closing"); | 535 return new StateError("Cannot add event after closing"); |
| 541 } | 536 } |
| 542 assert(_isAddingStream); | 537 assert(_isAddingStream); |
| 543 return new StateError("Cannot add event while adding a stream"); | 538 return new StateError("Cannot add event while adding a stream"); |
| 544 } | 539 } |
| 545 | 540 |
| 546 // StreamSink interface. | 541 // StreamSink interface. |
| 547 Future addStream(Stream<T> source, {bool cancelOnError: true}) { | 542 Future addStream(Stream<T> source, {bool cancelOnError: true}) { |
| 548 if (!_mayAddEvent) throw _badEventState(); | 543 if (!_mayAddEvent) throw _badEventState(); |
| 549 if (_isCanceled) return new _Future.immediate(null); | 544 if (_isCanceled) return new _Future.immediate(null); |
| 550 _StreamControllerAddStreamState<T> addState = | 545 _StreamControllerAddStreamState<T> addState = |
| 551 new _StreamControllerAddStreamState<T>(this, | 546 new _StreamControllerAddStreamState<T>( |
| 552 _varData, | 547 this, _varData, source, cancelOnError); |
| 553 source, | |
| 554 cancelOnError); | |
| 555 _varData = addState; | 548 _varData = addState; |
| 556 _state |= _STATE_ADDSTREAM; | 549 _state |= _STATE_ADDSTREAM; |
| 557 return addState.addStreamFuture; | 550 return addState.addStreamFuture; |
| 558 } | 551 } |
| 559 | 552 |
| 560 /** | 553 /** |
| 561 * Returns a future that is completed when the stream is done | 554 * Returns a future that is completed when the stream is done |
| 562 * processing events. | 555 * processing events. |
| 563 * | 556 * |
| 564 * This happens either when the done event has been sent, or if the | 557 * This happens either when the done event has been sent, or if the |
| (...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 643 _sendError(error, stackTrace); | 636 _sendError(error, stackTrace); |
| 644 } else if (_isInitialState) { | 637 } else if (_isInitialState) { |
| 645 _ensurePendingEvents().add(new _DelayedError(error, stackTrace)); | 638 _ensurePendingEvents().add(new _DelayedError(error, stackTrace)); |
| 646 } | 639 } |
| 647 } | 640 } |
| 648 | 641 |
| 649 void _close() { | 642 void _close() { |
| 650 // End of addStream stream. | 643 // End of addStream stream. |
| 651 assert(_isAddingStream); | 644 assert(_isAddingStream); |
| 652 _StreamControllerAddStreamState<T> addState = | 645 _StreamControllerAddStreamState<T> addState = |
| 653 _varData as Object /*=_StreamControllerAddStreamState<T>*/; | 646 _varData as Object/*=_StreamControllerAddStreamState<T>*/; |
| 654 _varData = addState.varData; | 647 _varData = addState.varData; |
| 655 _state &= ~_STATE_ADDSTREAM; | 648 _state &= ~_STATE_ADDSTREAM; |
| 656 addState.complete(); | 649 addState.complete(); |
| 657 } | 650 } |
| 658 | 651 |
| 659 // _StreamControllerLifeCycle interface | 652 // _StreamControllerLifeCycle interface |
| 660 | 653 |
| 661 StreamSubscription<T> _subscribe( | 654 StreamSubscription<T> _subscribe(void onData(T data), Function onError, |
| 662 void onData(T data), | 655 void onDone(), bool cancelOnError) { |
| 663 Function onError, | |
| 664 void onDone(), | |
| 665 bool cancelOnError) { | |
| 666 if (!_isInitialState) { | 656 if (!_isInitialState) { |
| 667 throw new StateError("Stream has already been listened to."); | 657 throw new StateError("Stream has already been listened to."); |
| 668 } | 658 } |
| 669 _ControllerSubscription<T> subscription = | 659 _ControllerSubscription<T> subscription = new _ControllerSubscription<T>( |
| 670 new _ControllerSubscription<T>(this, onData, onError, onDone, | 660 this, onData, onError, onDone, cancelOnError); |
| 671 cancelOnError); | |
| 672 | 661 |
| 673 _PendingEvents<T> pendingEvents = _pendingEvents; | 662 _PendingEvents<T> pendingEvents = _pendingEvents; |
| 674 _state |= _STATE_SUBSCRIBED; | 663 _state |= _STATE_SUBSCRIBED; |
| 675 if (_isAddingStream) { | 664 if (_isAddingStream) { |
| 676 _StreamControllerAddStreamState<T> addState = | 665 _StreamControllerAddStreamState<T> addState = |
| 677 _varData as Object /*=_StreamControllerAddStreamState<T>*/; | 666 _varData as Object/*=_StreamControllerAddStreamState<T>*/; |
| 678 addState.varData = subscription; | 667 addState.varData = subscription; |
| 679 addState.resume(); | 668 addState.resume(); |
| 680 } else { | 669 } else { |
| 681 _varData = subscription; | 670 _varData = subscription; |
| 682 } | 671 } |
| 683 subscription._setPendingEvents(pendingEvents); | 672 subscription._setPendingEvents(pendingEvents); |
| 684 subscription._guardCallback(() { | 673 subscription._guardCallback(() { |
| 685 _runGuarded(onListen); | 674 _runGuarded(onListen); |
| 686 }); | 675 }); |
| 687 | 676 |
| 688 return subscription; | 677 return subscription; |
| 689 } | 678 } |
| 690 | 679 |
| 691 Future _recordCancel(StreamSubscription<T> subscription) { | 680 Future _recordCancel(StreamSubscription<T> subscription) { |
| 692 // When we cancel, we first cancel any stream being added, | 681 // When we cancel, we first cancel any stream being added, |
| 693 // Then we call `onCancel`, and finally the _doneFuture is completed. | 682 // Then we call `onCancel`, and finally the _doneFuture is completed. |
| 694 // If either of addStream's cancel or `onCancel` returns a future, | 683 // If either of addStream's cancel or `onCancel` returns a future, |
| 695 // we wait for it before continuing. | 684 // we wait for it before continuing. |
| 696 // Any error during this process ends up in the returned future. | 685 // Any error during this process ends up in the returned future. |
| 697 // If more errors happen, we act as if it happens inside nested try/finallys | 686 // If more errors happen, we act as if it happens inside nested try/finallys |
| 698 // or whenComplete calls, and only the last error ends up in the | 687 // or whenComplete calls, and only the last error ends up in the |
| 699 // returned future. | 688 // returned future. |
| 700 Future result; | 689 Future result; |
| 701 if (_isAddingStream) { | 690 if (_isAddingStream) { |
| 702 _StreamControllerAddStreamState<T> addState = | 691 _StreamControllerAddStreamState<T> addState = |
| 703 _varData as Object /*=_StreamControllerAddStreamState<T>*/; | 692 _varData as Object/*=_StreamControllerAddStreamState<T>*/; |
| 704 result = addState.cancel(); | 693 result = addState.cancel(); |
| 705 } | 694 } |
| 706 _varData = null; | 695 _varData = null; |
| 707 _state = | 696 _state = |
| 708 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; | 697 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; |
| 709 | 698 |
| 710 if (onCancel != null) { | 699 if (onCancel != null) { |
| 711 if (result == null) { | 700 if (result == null) { |
| 712 // Only introduce a future if one is needed. | 701 // Only introduce a future if one is needed. |
| 713 // If _onCancel returns null, no future is needed. | 702 // If _onCancel returns null, no future is needed. |
| (...skipping 22 matching lines...) Expand all Loading... |
| 736 } else { | 725 } else { |
| 737 complete(); | 726 complete(); |
| 738 } | 727 } |
| 739 | 728 |
| 740 return result; | 729 return result; |
| 741 } | 730 } |
| 742 | 731 |
| 743 void _recordPause(StreamSubscription<T> subscription) { | 732 void _recordPause(StreamSubscription<T> subscription) { |
| 744 if (_isAddingStream) { | 733 if (_isAddingStream) { |
| 745 _StreamControllerAddStreamState<T> addState = | 734 _StreamControllerAddStreamState<T> addState = |
| 746 _varData as Object /*=_StreamControllerAddStreamState<T>*/; | 735 _varData as Object/*=_StreamControllerAddStreamState<T>*/; |
| 747 addState.pause(); | 736 addState.pause(); |
| 748 } | 737 } |
| 749 _runGuarded(onPause); | 738 _runGuarded(onPause); |
| 750 } | 739 } |
| 751 | 740 |
| 752 void _recordResume(StreamSubscription<T> subscription) { | 741 void _recordResume(StreamSubscription<T> subscription) { |
| 753 if (_isAddingStream) { | 742 if (_isAddingStream) { |
| 754 _StreamControllerAddStreamState<T> addState = | 743 _StreamControllerAddStreamState<T> addState = |
| 755 _varData as Object /*=_StreamControllerAddStreamState<T>*/; | 744 _varData as Object/*=_StreamControllerAddStreamState<T>*/; |
| 756 addState.resume(); | 745 addState.resume(); |
| 757 } | 746 } |
| 758 _runGuarded(onResume); | 747 _runGuarded(onResume); |
| 759 } | 748 } |
| 760 } | 749 } |
| 761 | 750 |
| 762 abstract class _SyncStreamControllerDispatch<T> | 751 abstract class _SyncStreamControllerDispatch<T> |
| 763 implements _StreamController<T>, SynchronousStreamController<T> { | 752 implements _StreamController<T>, SynchronousStreamController<T> { |
| 764 int get _state; | 753 int get _state; |
| 765 void set _state(int state); | 754 void set _state(int state); |
| (...skipping 23 matching lines...) Expand all Loading... |
| 789 | 778 |
| 790 void _sendDone() { | 779 void _sendDone() { |
| 791 _subscription._addPending(const _DelayedDone()); | 780 _subscription._addPending(const _DelayedDone()); |
| 792 } | 781 } |
| 793 } | 782 } |
| 794 | 783 |
| 795 // TODO(lrn): Use common superclass for callback-controllers when VM supports | 784 // TODO(lrn): Use common superclass for callback-controllers when VM supports |
| 796 // constructors in mixin superclasses. | 785 // constructors in mixin superclasses. |
| 797 | 786 |
| 798 class _AsyncStreamController<T> = _StreamController<T> | 787 class _AsyncStreamController<T> = _StreamController<T> |
| 799 with _AsyncStreamControllerDispatch<T>; | 788 with _AsyncStreamControllerDispatch<T>; |
| 800 | 789 |
| 801 class _SyncStreamController<T> = _StreamController<T> | 790 class _SyncStreamController<T> = _StreamController<T> |
| 802 with _SyncStreamControllerDispatch<T>; | 791 with _SyncStreamControllerDispatch<T>; |
| 803 | 792 |
| 804 typedef _NotificationHandler(); | 793 typedef _NotificationHandler(); |
| 805 | 794 |
| 806 Future _runGuarded(_NotificationHandler notificationHandler) { | 795 Future _runGuarded(_NotificationHandler notificationHandler) { |
| 807 if (notificationHandler == null) return null; | 796 if (notificationHandler == null) return null; |
| 808 try { | 797 try { |
| 809 var result = notificationHandler(); | 798 var result = notificationHandler(); |
| 810 if (result is Future) return result; | 799 if (result is Future) return result; |
| 811 return null; | 800 return null; |
| 812 } catch (e, s) { | 801 } catch (e, s) { |
| 813 Zone.current.handleUncaughtError(e, s); | 802 Zone.current.handleUncaughtError(e, s); |
| 814 } | 803 } |
| 815 } | 804 } |
| 816 | 805 |
| 817 class _ControllerStream<T> extends _StreamImpl<T> { | 806 class _ControllerStream<T> extends _StreamImpl<T> { |
| 818 _StreamControllerLifecycle<T> _controller; | 807 _StreamControllerLifecycle<T> _controller; |
| 819 | 808 |
| 820 _ControllerStream(this._controller); | 809 _ControllerStream(this._controller); |
| 821 | 810 |
| 822 StreamSubscription<T> _createSubscription( | 811 StreamSubscription<T> _createSubscription(void onData(T data), |
| 823 void onData(T data), | 812 Function onError, void onDone(), bool cancelOnError) => |
| 824 Function onError, | 813 _controller._subscribe(onData, onError, onDone, cancelOnError); |
| 825 void onDone(), | |
| 826 bool cancelOnError) => | |
| 827 _controller._subscribe(onData, onError, onDone, cancelOnError); | |
| 828 | 814 |
| 829 // Override == and hashCode so that new streams returned by the same | 815 // Override == and hashCode so that new streams returned by the same |
| 830 // controller are considered equal. The controller returns a new stream | 816 // controller are considered equal. The controller returns a new stream |
| 831 // each time it's queried, but doesn't have to cache the result. | 817 // each time it's queried, but doesn't have to cache the result. |
| 832 | 818 |
| 833 int get hashCode => _controller.hashCode ^ 0x35323532; | 819 int get hashCode => _controller.hashCode ^ 0x35323532; |
| 834 | 820 |
| 835 bool operator==(Object other) { | 821 bool operator ==(Object other) { |
| 836 if (identical(this, other)) return true; | 822 if (identical(this, other)) return true; |
| 837 if (other is! _ControllerStream) return false; | 823 if (other is! _ControllerStream) return false; |
| 838 _ControllerStream otherStream = other; | 824 _ControllerStream otherStream = other; |
| 839 return identical(otherStream._controller, this._controller); | 825 return identical(otherStream._controller, this._controller); |
| 840 } | 826 } |
| 841 } | 827 } |
| 842 | 828 |
| 843 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { | 829 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
| 844 final _StreamControllerLifecycle<T> _controller; | 830 final _StreamControllerLifecycle<T> _controller; |
| 845 | 831 |
| 846 _ControllerSubscription(this._controller, void onData(T data), | 832 _ControllerSubscription(this._controller, void onData(T data), |
| 847 Function onError, void onDone(), bool cancelOnError) | 833 Function onError, void onDone(), bool cancelOnError) |
| 848 : super(onData, onError, onDone, cancelOnError); | 834 : super(onData, onError, onDone, cancelOnError); |
| 849 | 835 |
| 850 Future _onCancel() { | 836 Future _onCancel() { |
| 851 return _controller._recordCancel(this); | 837 return _controller._recordCancel(this); |
| 852 } | 838 } |
| 853 | 839 |
| 854 void _onPause() { | 840 void _onPause() { |
| 855 _controller._recordPause(this); | 841 _controller._recordPause(this); |
| 856 } | 842 } |
| 857 | 843 |
| 858 void _onResume() { | 844 void _onResume() { |
| 859 _controller._recordResume(this); | 845 _controller._recordResume(this); |
| 860 } | 846 } |
| 861 } | 847 } |
| 862 | 848 |
| 863 | |
| 864 /** A class that exposes only the [StreamSink] interface of an object. */ | 849 /** A class that exposes only the [StreamSink] interface of an object. */ |
| 865 class _StreamSinkWrapper<T> implements StreamSink<T> { | 850 class _StreamSinkWrapper<T> implements StreamSink<T> { |
| 866 final StreamController _target; | 851 final StreamController _target; |
| 867 _StreamSinkWrapper(this._target); | 852 _StreamSinkWrapper(this._target); |
| 868 void add(T data) { _target.add(data); } | 853 void add(T data) { |
| 854 _target.add(data); |
| 855 } |
| 856 |
| 869 void addError(Object error, [StackTrace stackTrace]) { | 857 void addError(Object error, [StackTrace stackTrace]) { |
| 870 _target.addError(error, stackTrace); | 858 _target.addError(error, stackTrace); |
| 871 } | 859 } |
| 860 |
| 872 Future close() => _target.close(); | 861 Future close() => _target.close(); |
| 873 Future addStream(Stream<T> source, {bool cancelOnError: true}) => | 862 Future addStream(Stream<T> source, {bool cancelOnError: true}) => |
| 874 _target.addStream(source, cancelOnError: cancelOnError); | 863 _target.addStream(source, cancelOnError: cancelOnError); |
| 875 Future get done => _target.done; | 864 Future get done => _target.done; |
| 876 } | 865 } |
| 877 | 866 |
| 878 /** | 867 /** |
| 879 * Object containing the state used to handle [StreamController.addStream]. | 868 * Object containing the state used to handle [StreamController.addStream]. |
| 880 */ | 869 */ |
| 881 class _AddStreamState<T> { | 870 class _AddStreamState<T> { |
| 882 // [_Future] returned by call to addStream. | 871 // [_Future] returned by call to addStream. |
| 883 final _Future addStreamFuture; | 872 final _Future addStreamFuture; |
| 884 | 873 |
| 885 // Subscription on stream argument to addStream. | 874 // Subscription on stream argument to addStream. |
| 886 final StreamSubscription addSubscription; | 875 final StreamSubscription addSubscription; |
| 887 | 876 |
| 888 _AddStreamState(_EventSink<T> controller, Stream source, bool cancelOnError) | 877 _AddStreamState(_EventSink<T> controller, Stream source, bool cancelOnError) |
| 889 : addStreamFuture = new _Future(), | 878 : addStreamFuture = new _Future(), |
| 890 addSubscription = source.listen(controller._add, | 879 addSubscription = source.listen(controller._add, |
| 891 onError: cancelOnError | 880 onError: cancelOnError |
| 892 ? makeErrorHandler(controller) | 881 ? makeErrorHandler(controller) |
| 893 : controller._addError, | 882 : controller._addError, |
| 894 onDone: controller._close, | 883 onDone: controller._close, |
| 895 cancelOnError: cancelOnError); | 884 cancelOnError: cancelOnError); |
| 896 | 885 |
| 897 static makeErrorHandler(_EventSink controller) => | 886 static makeErrorHandler(_EventSink controller) => (e, StackTrace s) { |
| 898 (e, StackTrace s) { | |
| 899 controller._addError(e, s); | 887 controller._addError(e, s); |
| 900 controller._close(); | 888 controller._close(); |
| 901 }; | 889 }; |
| 902 | 890 |
| 903 void pause() { | 891 void pause() { |
| 904 addSubscription.pause(); | 892 addSubscription.pause(); |
| 905 } | 893 } |
| 906 | 894 |
| 907 void resume() { | 895 void resume() { |
| 908 addSubscription.resume(); | 896 addSubscription.resume(); |
| 909 } | 897 } |
| 910 | 898 |
| 911 /** | 899 /** |
| 912 * Stop adding the stream. | 900 * Stop adding the stream. |
| 913 * | 901 * |
| 914 * Complete the future returned by `StreamController.addStream` when | 902 * Complete the future returned by `StreamController.addStream` when |
| 915 * the cancel is complete. | 903 * the cancel is complete. |
| 916 * | 904 * |
| 917 * Return a future if the cancel takes time, otherwise return `null`. | 905 * Return a future if the cancel takes time, otherwise return `null`. |
| 918 */ | 906 */ |
| 919 Future cancel() { | 907 Future cancel() { |
| 920 var cancel = addSubscription.cancel(); | 908 var cancel = addSubscription.cancel(); |
| 921 if (cancel == null) { | 909 if (cancel == null) { |
| 922 addStreamFuture._asyncComplete(null); | 910 addStreamFuture._asyncComplete(null); |
| 923 return null; | 911 return null; |
| 924 } | 912 } |
| 925 return cancel.whenComplete(() { addStreamFuture._asyncComplete(null); }); | 913 return cancel.whenComplete(() { |
| 914 addStreamFuture._asyncComplete(null); |
| 915 }); |
| 926 } | 916 } |
| 927 | 917 |
| 928 void complete() { | 918 void complete() { |
| 929 addStreamFuture._asyncComplete(null); | 919 addStreamFuture._asyncComplete(null); |
| 930 } | 920 } |
| 931 } | 921 } |
| 932 | 922 |
| 933 class _StreamControllerAddStreamState<T> extends _AddStreamState<T> { | 923 class _StreamControllerAddStreamState<T> extends _AddStreamState<T> { |
| 934 // The subscription or pending data of a _StreamController. | 924 // The subscription or pending data of a _StreamController. |
| 935 // Stored here because we reuse the `_varData` field in the _StreamController | 925 // Stored here because we reuse the `_varData` field in the _StreamController |
| 936 // to store this state object. | 926 // to store this state object. |
| 937 var varData; | 927 var varData; |
| 938 | 928 |
| 939 _StreamControllerAddStreamState(_StreamController<T> controller, | 929 _StreamControllerAddStreamState(_StreamController<T> controller, this.varData, |
| 940 this.varData, | 930 Stream source, bool cancelOnError) |
| 941 Stream source, | |
| 942 bool cancelOnError) | |
| 943 : super(controller, source, cancelOnError) { | 931 : super(controller, source, cancelOnError) { |
| 944 if (controller.isPaused) { | 932 if (controller.isPaused) { |
| 945 addSubscription.pause(); | 933 addSubscription.pause(); |
| 946 } | 934 } |
| 947 } | 935 } |
| 948 } | 936 } |
| OLD | NEW |