OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 430 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
441 * A multiplex controller is never paused. | 441 * A multiplex controller is never paused. |
442 * | 442 * |
443 * Each receiving stream may be paused individually, and they handle their | 443 * Each receiving stream may be paused individually, and they handle their |
444 * own buffering. | 444 * own buffering. |
445 */ | 445 */ |
446 bool get isPaused => false; | 446 bool get isPaused => false; |
447 | 447 |
448 /** Whether there are currently a subscriber on the [Stream]. */ | 448 /** Whether there are currently a subscriber on the [Stream]. */ |
449 bool get hasListener => !_isEmpty; | 449 bool get hasListener => !_isEmpty; |
450 | 450 |
| 451 /** Whether an event is being fired (sent to some, but not all, listeners). */ |
| 452 bool get _isFiring => (_state & _STATE_FIRING) != 0; |
| 453 |
451 // Linked list helpers | 454 // Linked list helpers |
452 | 455 |
453 bool get _isEmpty => identical(_next, this); | 456 bool get _isEmpty => identical(_next, this); |
454 | 457 |
455 /** Adds subscription to linked list of active listeners. */ | 458 /** Adds subscription to linked list of active listeners. */ |
456 void _addListener(_MultiplexSubscription<T> subscription) { | 459 void _addListener(_MultiplexSubscription<T> subscription) { |
457 _MultiplexSubscriptionLink previous = _previous; | 460 _MultiplexSubscriptionLink previous = _previous; |
458 previous._next = subscription; | 461 previous._next = subscription; |
459 _previous = subscription._previous; | 462 _previous = subscription._previous; |
460 subscription._previous._next = this; | 463 subscription._previous._next = this; |
(...skipping 20 matching lines...) Expand all Loading... |
481 } | 484 } |
482 | 485 |
483 void _recordCancel(_MultiplexSubscription<T> subscription) { | 486 void _recordCancel(_MultiplexSubscription<T> subscription) { |
484 if (subscription._isFiring) { | 487 if (subscription._isFiring) { |
485 subscription._setRemoveAfterFiring(); | 488 subscription._setRemoveAfterFiring(); |
486 } else { | 489 } else { |
487 _removeListener(subscription); | 490 _removeListener(subscription); |
488 // If we are currently firing an event, the empty-check is performed at | 491 // If we are currently firing an event, the empty-check is performed at |
489 // the end of the listener loop instead of here. | 492 // the end of the listener loop instead of here. |
490 if ((_state & _STATE_FIRING) == 0 && _isEmpty) { | 493 if ((_state & _STATE_FIRING) == 0 && _isEmpty) { |
491 _runGuarded(_onCancel); | 494 _callOnCancel(); |
492 } | 495 } |
493 } | 496 } |
494 } | 497 } |
495 | 498 |
496 void _recordPause(StreamSubscription<T> subscription) {} | 499 void _recordPause(StreamSubscription<T> subscription) {} |
497 void _recordResume(StreamSubscription<T> subscription) {} | 500 void _recordResume(StreamSubscription<T> subscription) {} |
498 | 501 |
499 // EventSink interface. | 502 // EventSink interface. |
500 | 503 |
501 void add(T data) { | 504 void add(T data) { |
502 assert(!isClosed); | 505 if (isClosed) { |
| 506 throw new StateError("Cannot add new events after calling close()"); |
| 507 } |
| 508 _sendData(data); |
| 509 } |
| 510 |
| 511 void addError(Object error, [Object stackTrace]) { |
| 512 if (isClosed) { |
| 513 throw new StateError("Cannot add new events after calling close()"); |
| 514 } |
| 515 if (stackTrace != null) _attachStackTrace(error, stackTrace); |
| 516 _sendError(error); |
| 517 } |
| 518 |
| 519 void close() { |
| 520 if (isClosed) { |
| 521 throw new StateError("Cannot add new events after calling close()"); |
| 522 } |
| 523 _state |= _STATE_CLOSED; |
| 524 _sendDone(); |
| 525 } |
| 526 |
| 527 // EventDispatch interface. |
| 528 |
| 529 void _sendData(T data) { |
503 if (_isEmpty) return; | 530 if (_isEmpty) return; |
504 _forEachListener((_BufferingStreamSubscription<T> subscription) { | 531 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
505 subscription._add(data); | 532 subscription._add(data); |
506 }); | 533 }); |
507 } | 534 } |
508 | 535 |
509 void addError(Object error, [Object stackTrace]) { | 536 void _sendError(Object error) { |
510 assert(!isClosed); | |
511 if (_isEmpty) return; | 537 if (_isEmpty) return; |
512 _forEachListener((_BufferingStreamSubscription<T> subscription) { | 538 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
513 subscription._addError(error); | 539 subscription._addError(error); |
514 }); | 540 }); |
515 } | 541 } |
516 | 542 |
517 void close() { | 543 void _sendDone() { |
518 assert(!isClosed); | |
519 _state |= _STATE_CLOSED; | |
520 if (_isEmpty) return; | 544 if (_isEmpty) return; |
521 _forEachListener((_MultiplexSubscription<T> subscription) { | 545 _forEachListener((_MultiplexSubscription<T> subscription) { |
522 subscription._close(); | 546 subscription._close(); |
523 subscription._eventState |= | 547 subscription._eventState |= |
524 _MultiplexSubscription._STATE_REMOVE_AFTER_FIRING; | 548 _MultiplexSubscription._STATE_REMOVE_AFTER_FIRING; |
525 }); | 549 }); |
526 } | 550 } |
527 | 551 |
528 void _forEachListener( | 552 void _forEachListener( |
529 void action(_BufferingStreamSubscription<T> subscription)) { | 553 void action(_BufferingStreamSubscription<T> subscription)) { |
530 if ((_state & _STATE_FIRING) != 0) { | 554 if (_isFiring) { |
531 throw new StateError( | 555 throw new StateError( |
532 "Cannot fire new event. Controller is already firing an event"); | 556 "Cannot fire new event. Controller is already firing an event"); |
533 } | 557 } |
534 if (_isEmpty) return; | 558 if (_isEmpty) return; |
535 | 559 |
536 // Get event id of this event. | 560 // Get event id of this event. |
537 int id = (_state & _STATE_EVENT_ID); | 561 int id = (_state & _STATE_EVENT_ID); |
538 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] | 562 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] |
539 // callbacks while firing, and we prevent reentrancy of this function. | 563 // callbacks while firing, and we prevent reentrancy of this function. |
540 // | 564 // |
(...skipping 13 matching lines...) Expand all Loading... |
554 _removeListener(subscription); | 578 _removeListener(subscription); |
555 } | 579 } |
556 subscription._eventState &= ~_MultiplexSubscription._STATE_FIRING; | 580 subscription._eventState &= ~_MultiplexSubscription._STATE_FIRING; |
557 } else { | 581 } else { |
558 link = subscription._next; | 582 link = subscription._next; |
559 } | 583 } |
560 } | 584 } |
561 _state &= ~_STATE_FIRING; | 585 _state &= ~_STATE_FIRING; |
562 | 586 |
563 if (_isEmpty) { | 587 if (_isEmpty) { |
564 _runGuarded(_onCancel); | 588 _callOnCancel(); |
565 } | 589 } |
566 } | 590 } |
| 591 |
| 592 void _callOnCancel() { |
| 593 _runGuarded(_onCancel); |
| 594 } |
567 } | 595 } |
| 596 |
| 597 class _BufferingMultiplexStreamController<T> |
| 598 extends _MultiplexStreamController<T> |
| 599 implements _EventDispatch<T> { |
| 600 _StreamImplEvents _pending; |
| 601 |
| 602 _BufferingMultiplexStreamController(void onListen(), void onCancel()) |
| 603 : super(onListen, onCancel); |
| 604 |
| 605 bool get _hasPending => _pending != null && ! _pending.isEmpty; |
| 606 |
| 607 void _addPendingEvent(_DelayedEvent event) { |
| 608 if (_pending == null) { |
| 609 _pending = new _StreamImplEvents(); |
| 610 } |
| 611 _pending.add(event); |
| 612 } |
| 613 |
| 614 void add(T data) { |
| 615 if (_isFiring) { |
| 616 _addPendingEvent(new _DelayedData<T>(data)); |
| 617 return; |
| 618 } |
| 619 super.add(data); |
| 620 while (_hasPending) { |
| 621 _pending.handleNext(this); |
| 622 } |
| 623 } |
| 624 |
| 625 void addError(Object error, [StackTrace stackTrace]) { |
| 626 if (_isFiring) { |
| 627 _addPendingEvent(new _DelayedError(error)); |
| 628 return; |
| 629 } |
| 630 super.addError(error, stackTrace); |
| 631 while (_hasPending) { |
| 632 _pending.handleNext(this); |
| 633 } |
| 634 } |
| 635 |
| 636 void close() { |
| 637 if (_isFiring) { |
| 638 _addPendingEvent(const _DelayedDone()); |
| 639 _state |= _STATE_CLOSED; |
| 640 return; |
| 641 } |
| 642 super.close(); |
| 643 assert(!_hasPending); |
| 644 } |
| 645 |
| 646 void _callOnCancel() { |
| 647 if (_hasPending) { |
| 648 _pending.clear(); |
| 649 _pending = null; |
| 650 } |
| 651 super._callOnCancel(); |
| 652 |
| 653 } |
| 654 } |
OLD | NEW |