Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 430 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 441 * A multiplex controller is never paused. | 441 * A multiplex controller is never paused. |
| 442 * | 442 * |
| 443 * Each receiving stream may be paused individually, and they handle their | 443 * Each receiving stream may be paused individually, and they handle their |
| 444 * own buffering. | 444 * own buffering. |
| 445 */ | 445 */ |
| 446 bool get isPaused => false; | 446 bool get isPaused => false; |
| 447 | 447 |
| 448 /** Whether there are currently a subscriber on the [Stream]. */ | 448 /** Whether there are currently a subscriber on the [Stream]. */ |
| 449 bool get hasListener => !_isEmpty; | 449 bool get hasListener => !_isEmpty; |
| 450 | 450 |
| 451 /** Whether an event is being fired (sent to some, but not all, listeners). */ | |
| 452 bool get _isFiring => (_state & _STATE_FIRING) != 0; | |
| 453 | |
| 451 // Linked list helpers | 454 // Linked list helpers |
| 452 | 455 |
| 453 bool get _isEmpty => identical(_next, this); | 456 bool get _isEmpty => identical(_next, this); |
| 454 | 457 |
| 455 /** Adds subscription to linked list of active listeners. */ | 458 /** Adds subscription to linked list of active listeners. */ |
| 456 void _addListener(_MultiplexSubscription<T> subscription) { | 459 void _addListener(_MultiplexSubscription<T> subscription) { |
| 457 _MultiplexSubscriptionLink previous = _previous; | 460 _MultiplexSubscriptionLink previous = _previous; |
| 458 previous._next = subscription; | 461 previous._next = subscription; |
| 459 _previous = subscription._previous; | 462 _previous = subscription._previous; |
| 460 subscription._previous._next = this; | 463 subscription._previous._next = this; |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 493 } | 496 } |
| 494 } | 497 } |
| 495 | 498 |
| 496 void _recordPause(StreamSubscription<T> subscription) {} | 499 void _recordPause(StreamSubscription<T> subscription) {} |
| 497 void _recordResume(StreamSubscription<T> subscription) {} | 500 void _recordResume(StreamSubscription<T> subscription) {} |
| 498 | 501 |
| 499 // EventSink interface. | 502 // EventSink interface. |
| 500 | 503 |
| 501 void add(T data) { | 504 void add(T data) { |
| 502 assert(!isClosed); | 505 assert(!isClosed); |
| 506 _sendData(data); | |
| 507 } | |
| 508 | |
| 509 void addError(Object error, [Object stackTrace]) { | |
| 510 assert(!isClosed); | |
| 511 // TODO(lrn): Handle stacktrace. | |
|
floitsch
2013/05/29 09:39:58
That should be less than 5 lines of code.
Lasse Reichstein Nielsen
2013/05/29 10:39:12
Done.
| |
| 512 _sendError(error); | |
| 513 } | |
| 514 | |
| 515 void close() { | |
| 516 assert(!isClosed); | |
| 517 _state |= _STATE_CLOSED; | |
| 518 _sendDone(); | |
| 519 } | |
| 520 | |
| 521 // EventDispatch interface. | |
| 522 | |
| 523 void _sendData(T data) { | |
| 503 if (_isEmpty) return; | 524 if (_isEmpty) return; |
| 504 _forEachListener((_BufferingStreamSubscription<T> subscription) { | 525 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| 505 subscription._add(data); | 526 subscription._add(data); |
| 506 }); | 527 }); |
| 507 } | 528 } |
| 508 | 529 |
| 509 void addError(Object error, [Object stackTrace]) { | 530 void _sendError(Object error) { |
| 510 assert(!isClosed); | |
| 511 if (_isEmpty) return; | 531 if (_isEmpty) return; |
| 512 _forEachListener((_BufferingStreamSubscription<T> subscription) { | 532 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| 513 subscription._addError(error); | 533 subscription._addError(error); |
| 514 }); | 534 }); |
| 515 } | 535 } |
| 516 | 536 |
| 517 void close() { | 537 void _sendDone() { |
| 518 assert(!isClosed); | |
| 519 _state |= _STATE_CLOSED; | |
| 520 if (_isEmpty) return; | 538 if (_isEmpty) return; |
| 521 _forEachListener((_MultiplexSubscription<T> subscription) { | 539 _forEachListener((_MultiplexSubscription<T> subscription) { |
| 522 subscription._close(); | 540 subscription._close(); |
| 523 subscription._eventState |= | 541 subscription._eventState |= |
| 524 _MultiplexSubscription._STATE_REMOVE_AFTER_FIRING; | 542 _MultiplexSubscription._STATE_REMOVE_AFTER_FIRING; |
| 525 }); | 543 }); |
| 526 } | 544 } |
| 527 | 545 |
| 528 void _forEachListener( | 546 void _forEachListener( |
| 529 void action(_BufferingStreamSubscription<T> subscription)) { | 547 void action(_BufferingStreamSubscription<T> subscription)) { |
| 530 if ((_state & _STATE_FIRING) != 0) { | 548 if (_isFiring) { |
| 531 throw new StateError( | 549 throw new StateError( |
| 532 "Cannot fire new event. Controller is already firing an event"); | 550 "Cannot fire new event. Controller is already firing an event"); |
| 533 } | 551 } |
| 534 if (_isEmpty) return; | 552 if (_isEmpty) return; |
| 535 | 553 |
| 536 // Get event id of this event. | 554 // Get event id of this event. |
| 537 int id = (_state & _STATE_EVENT_ID); | 555 int id = (_state & _STATE_EVENT_ID); |
| 538 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] | 556 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] |
| 539 // callbacks while firing, and we prevent reentrancy of this function. | 557 // callbacks while firing, and we prevent reentrancy of this function. |
| 540 // | 558 // |
| (...skipping 17 matching lines...) Expand all Loading... | |
| 558 link = subscription._next; | 576 link = subscription._next; |
| 559 } | 577 } |
| 560 } | 578 } |
| 561 _state &= ~_STATE_FIRING; | 579 _state &= ~_STATE_FIRING; |
| 562 | 580 |
| 563 if (_isEmpty) { | 581 if (_isEmpty) { |
| 564 _runGuarded(_onCancel); | 582 _runGuarded(_onCancel); |
| 565 } | 583 } |
| 566 } | 584 } |
| 567 } | 585 } |
| 586 | |
| 587 class _BufferingMultiplexStreamController<T> | |
| 588 extends _MultiplexStreamController<T> | |
| 589 implements _EventDispatch<T> { | |
| 590 _StreamImplEvents _pending; | |
| 591 | |
| 592 _BufferingMultiplexStreamController(void onListen(), void onCancel()) | |
| 593 : super(onListen, onCancel); | |
| 594 | |
| 595 bool get _hasPending => _pending != null && ! _pending.isEmpty; | |
| 596 | |
| 597 void _addPendingEvent(_DelayedEvent event) { | |
| 598 if (_pending == null) { | |
| 599 _pending = new _StreamImplEvents(); | |
| 600 } | |
| 601 _pending.add(event); | |
| 602 } | |
| 603 | |
| 604 void add(T data) { | |
| 605 if (_isFiring || _hasPending) { | |
|
floitsch
2013/05/29 09:39:58
add comment when _hasPending but not _isFiring.
Lasse Reichstein Nielsen
2013/05/29 10:39:12
It's only during the onCancel callback after remov
| |
| 606 _addPendingEvent(new _DelayedData<T>(data)); | |
| 607 return; | |
| 608 } | |
| 609 super.add(data); | |
| 610 while (_hasPending) { | |
| 611 _pending.handleNext(this); | |
| 612 } | |
| 613 } | |
| 614 | |
| 615 void addError(Object error, [StackTrace stackTrace]) { | |
| 616 if (_isFiring || _hasPending) { | |
| 617 _addPendingEvent(new _DelayedError(error)); | |
| 618 return; | |
| 619 } | |
| 620 super.addError(error, stackTrace); | |
| 621 while (_hasPending) { | |
| 622 _pending.handleNext(this); | |
| 623 } | |
| 624 } | |
| 625 | |
| 626 void close() { | |
| 627 if (_isFiring || _hasPending) { | |
| 628 _addPendingEvent(const _DelayedDone()); | |
| 629 _state |= _STATE_CLOSED; | |
| 630 return; | |
| 631 } | |
| 632 super.close(); | |
| 633 assert(!_hasPending); | |
| 634 } | |
| 635 } | |
| OLD | NEW |