OLD | NEW |
---|---|
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 430 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
441 * A multiplex controller is never paused. | 441 * A multiplex controller is never paused. |
442 * | 442 * |
443 * Each receiving stream may be paused individually, and they handle their | 443 * Each receiving stream may be paused individually, and they handle their |
444 * own buffering. | 444 * own buffering. |
445 */ | 445 */ |
446 bool get isPaused => false; | 446 bool get isPaused => false; |
447 | 447 |
448 /** Whether there are currently a subscriber on the [Stream]. */ | 448 /** Whether there are currently a subscriber on the [Stream]. */ |
449 bool get hasListener => !_isEmpty; | 449 bool get hasListener => !_isEmpty; |
450 | 450 |
451 /** Whether an event is being fired (sent to some, but not all, listeners). */ | |
452 bool get _isFiring => (_state & _STATE_FIRING) != 0; | |
453 | |
451 // Linked list helpers | 454 // Linked list helpers |
452 | 455 |
453 bool get _isEmpty => identical(_next, this); | 456 bool get _isEmpty => identical(_next, this); |
454 | 457 |
455 /** Adds subscription to linked list of active listeners. */ | 458 /** Adds subscription to linked list of active listeners. */ |
456 void _addListener(_MultiplexSubscription<T> subscription) { | 459 void _addListener(_MultiplexSubscription<T> subscription) { |
457 _MultiplexSubscriptionLink previous = _previous; | 460 _MultiplexSubscriptionLink previous = _previous; |
458 previous._next = subscription; | 461 previous._next = subscription; |
459 _previous = subscription._previous; | 462 _previous = subscription._previous; |
460 subscription._previous._next = this; | 463 subscription._previous._next = this; |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
493 } | 496 } |
494 } | 497 } |
495 | 498 |
496 void _recordPause(StreamSubscription<T> subscription) {} | 499 void _recordPause(StreamSubscription<T> subscription) {} |
497 void _recordResume(StreamSubscription<T> subscription) {} | 500 void _recordResume(StreamSubscription<T> subscription) {} |
498 | 501 |
499 // EventSink interface. | 502 // EventSink interface. |
500 | 503 |
501 void add(T data) { | 504 void add(T data) { |
502 assert(!isClosed); | 505 assert(!isClosed); |
506 _sendData(data); | |
507 } | |
508 | |
509 void addError(Object error, [Object stackTrace]) { | |
510 assert(!isClosed); | |
511 // TODO(lrn): Handle stacktrace. | |
floitsch
2013/05/29 09:39:58
That should be less than 5 lines of code.
Lasse Reichstein Nielsen
2013/05/29 10:39:12
Done.
| |
512 _sendError(error); | |
513 } | |
514 | |
515 void close() { | |
516 assert(!isClosed); | |
517 _state |= _STATE_CLOSED; | |
518 _sendDone(); | |
519 } | |
520 | |
521 // EventDispatch interface. | |
522 | |
523 void _sendData(T data) { | |
503 if (_isEmpty) return; | 524 if (_isEmpty) return; |
504 _forEachListener((_BufferingStreamSubscription<T> subscription) { | 525 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
505 subscription._add(data); | 526 subscription._add(data); |
506 }); | 527 }); |
507 } | 528 } |
508 | 529 |
509 void addError(Object error, [Object stackTrace]) { | 530 void _sendError(Object error) { |
510 assert(!isClosed); | |
511 if (_isEmpty) return; | 531 if (_isEmpty) return; |
512 _forEachListener((_BufferingStreamSubscription<T> subscription) { | 532 _forEachListener((_BufferingStreamSubscription<T> subscription) { |
513 subscription._addError(error); | 533 subscription._addError(error); |
514 }); | 534 }); |
515 } | 535 } |
516 | 536 |
517 void close() { | 537 void _sendDone() { |
518 assert(!isClosed); | |
519 _state |= _STATE_CLOSED; | |
520 if (_isEmpty) return; | 538 if (_isEmpty) return; |
521 _forEachListener((_MultiplexSubscription<T> subscription) { | 539 _forEachListener((_MultiplexSubscription<T> subscription) { |
522 subscription._close(); | 540 subscription._close(); |
523 subscription._eventState |= | 541 subscription._eventState |= |
524 _MultiplexSubscription._STATE_REMOVE_AFTER_FIRING; | 542 _MultiplexSubscription._STATE_REMOVE_AFTER_FIRING; |
525 }); | 543 }); |
526 } | 544 } |
527 | 545 |
528 void _forEachListener( | 546 void _forEachListener( |
529 void action(_BufferingStreamSubscription<T> subscription)) { | 547 void action(_BufferingStreamSubscription<T> subscription)) { |
530 if ((_state & _STATE_FIRING) != 0) { | 548 if (_isFiring) { |
531 throw new StateError( | 549 throw new StateError( |
532 "Cannot fire new event. Controller is already firing an event"); | 550 "Cannot fire new event. Controller is already firing an event"); |
533 } | 551 } |
534 if (_isEmpty) return; | 552 if (_isEmpty) return; |
535 | 553 |
536 // Get event id of this event. | 554 // Get event id of this event. |
537 int id = (_state & _STATE_EVENT_ID); | 555 int id = (_state & _STATE_EVENT_ID); |
538 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] | 556 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] |
539 // callbacks while firing, and we prevent reentrancy of this function. | 557 // callbacks while firing, and we prevent reentrancy of this function. |
540 // | 558 // |
(...skipping 17 matching lines...) Expand all Loading... | |
558 link = subscription._next; | 576 link = subscription._next; |
559 } | 577 } |
560 } | 578 } |
561 _state &= ~_STATE_FIRING; | 579 _state &= ~_STATE_FIRING; |
562 | 580 |
563 if (_isEmpty) { | 581 if (_isEmpty) { |
564 _runGuarded(_onCancel); | 582 _runGuarded(_onCancel); |
565 } | 583 } |
566 } | 584 } |
567 } | 585 } |
586 | |
587 class _BufferingMultiplexStreamController<T> | |
588 extends _MultiplexStreamController<T> | |
589 implements _EventDispatch<T> { | |
590 _StreamImplEvents _pending; | |
591 | |
592 _BufferingMultiplexStreamController(void onListen(), void onCancel()) | |
593 : super(onListen, onCancel); | |
594 | |
595 bool get _hasPending => _pending != null && ! _pending.isEmpty; | |
596 | |
597 void _addPendingEvent(_DelayedEvent event) { | |
598 if (_pending == null) { | |
599 _pending = new _StreamImplEvents(); | |
600 } | |
601 _pending.add(event); | |
602 } | |
603 | |
604 void add(T data) { | |
605 if (_isFiring || _hasPending) { | |
floitsch
2013/05/29 09:39:58
add comment when _hasPending but not _isFiring.
Lasse Reichstein Nielsen
2013/05/29 10:39:12
It's only during the onCancel callback after remov
| |
606 _addPendingEvent(new _DelayedData<T>(data)); | |
607 return; | |
608 } | |
609 super.add(data); | |
610 while (_hasPending) { | |
611 _pending.handleNext(this); | |
612 } | |
613 } | |
614 | |
615 void addError(Object error, [StackTrace stackTrace]) { | |
616 if (_isFiring || _hasPending) { | |
617 _addPendingEvent(new _DelayedError(error)); | |
618 return; | |
619 } | |
620 super.addError(error, stackTrace); | |
621 while (_hasPending) { | |
622 _pending.handleNext(this); | |
623 } | |
624 } | |
625 | |
626 void close() { | |
627 if (_isFiring || _hasPending) { | |
628 _addPendingEvent(const _DelayedDone()); | |
629 _state |= _STATE_CLOSED; | |
630 return; | |
631 } | |
632 super.close(); | |
633 assert(!_hasPending); | |
634 } | |
635 } | |
OLD | NEW |