Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(188)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 15673006: Implement asBroadcast using a _MultiplexStreamController. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Don't keep pending events when calling onCancel. Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 430 matching lines...) Expand 10 before | Expand all | Expand 10 after
441 * A multiplex controller is never paused. 441 * A multiplex controller is never paused.
442 * 442 *
443 * Each receiving stream may be paused individually, and they handle their 443 * Each receiving stream may be paused individually, and they handle their
444 * own buffering. 444 * own buffering.
445 */ 445 */
446 bool get isPaused => false; 446 bool get isPaused => false;
447 447
448 /** Whether there are currently a subscriber on the [Stream]. */ 448 /** Whether there are currently a subscriber on the [Stream]. */
449 bool get hasListener => !_isEmpty; 449 bool get hasListener => !_isEmpty;
450 450
451 /** Whether an event is being fired (sent to some, but not all, listeners). */
452 bool get _isFiring => (_state & _STATE_FIRING) != 0;
453
451 // Linked list helpers 454 // Linked list helpers
452 455
453 bool get _isEmpty => identical(_next, this); 456 bool get _isEmpty => identical(_next, this);
454 457
455 /** Adds subscription to linked list of active listeners. */ 458 /** Adds subscription to linked list of active listeners. */
456 void _addListener(_MultiplexSubscription<T> subscription) { 459 void _addListener(_MultiplexSubscription<T> subscription) {
457 _MultiplexSubscriptionLink previous = _previous; 460 _MultiplexSubscriptionLink previous = _previous;
458 previous._next = subscription; 461 previous._next = subscription;
459 _previous = subscription._previous; 462 _previous = subscription._previous;
460 subscription._previous._next = this; 463 subscription._previous._next = this;
(...skipping 20 matching lines...) Expand all
481 } 484 }
482 485
483 void _recordCancel(_MultiplexSubscription<T> subscription) { 486 void _recordCancel(_MultiplexSubscription<T> subscription) {
484 if (subscription._isFiring) { 487 if (subscription._isFiring) {
485 subscription._setRemoveAfterFiring(); 488 subscription._setRemoveAfterFiring();
486 } else { 489 } else {
487 _removeListener(subscription); 490 _removeListener(subscription);
488 // If we are currently firing an event, the empty-check is performed at 491 // If we are currently firing an event, the empty-check is performed at
489 // the end of the listener loop instead of here. 492 // the end of the listener loop instead of here.
490 if ((_state & _STATE_FIRING) == 0 && _isEmpty) { 493 if ((_state & _STATE_FIRING) == 0 && _isEmpty) {
491 _runGuarded(_onCancel); 494 _callOnCancel();
492 } 495 }
493 } 496 }
494 } 497 }
495 498
496 void _recordPause(StreamSubscription<T> subscription) {} 499 void _recordPause(StreamSubscription<T> subscription) {}
497 void _recordResume(StreamSubscription<T> subscription) {} 500 void _recordResume(StreamSubscription<T> subscription) {}
498 501
499 // EventSink interface. 502 // EventSink interface.
500 503
501 void add(T data) { 504 void add(T data) {
502 assert(!isClosed); 505 if (isClosed) {
506 throw new StateError("Cannot add new events after calling close()");
507 }
508 _sendData(data);
509 }
510
511 void addError(Object error, [Object stackTrace]) {
512 if (isClosed) {
513 throw new StateError("Cannot add new events after calling close()");
514 }
515 if (stackTrace != null) _attachStackTrace(error, stackTrace);
516 _sendError(error);
517 }
518
519 void close() {
520 if (isClosed) {
521 throw new StateError("Cannot add new events after calling close()");
522 }
523 _state |= _STATE_CLOSED;
524 _sendDone();
525 }
526
527 // EventDispatch interface.
528
529 void _sendData(T data) {
503 if (_isEmpty) return; 530 if (_isEmpty) return;
504 _forEachListener((_BufferingStreamSubscription<T> subscription) { 531 _forEachListener((_BufferingStreamSubscription<T> subscription) {
505 subscription._add(data); 532 subscription._add(data);
506 }); 533 });
507 } 534 }
508 535
509 void addError(Object error, [Object stackTrace]) { 536 void _sendError(Object error) {
510 assert(!isClosed);
511 if (_isEmpty) return; 537 if (_isEmpty) return;
512 _forEachListener((_BufferingStreamSubscription<T> subscription) { 538 _forEachListener((_BufferingStreamSubscription<T> subscription) {
513 subscription._addError(error); 539 subscription._addError(error);
514 }); 540 });
515 } 541 }
516 542
517 void close() { 543 void _sendDone() {
518 assert(!isClosed);
519 _state |= _STATE_CLOSED;
520 if (_isEmpty) return; 544 if (_isEmpty) return;
521 _forEachListener((_MultiplexSubscription<T> subscription) { 545 _forEachListener((_MultiplexSubscription<T> subscription) {
522 subscription._close(); 546 subscription._close();
523 subscription._eventState |= 547 subscription._eventState |=
524 _MultiplexSubscription._STATE_REMOVE_AFTER_FIRING; 548 _MultiplexSubscription._STATE_REMOVE_AFTER_FIRING;
525 }); 549 });
526 } 550 }
527 551
528 void _forEachListener( 552 void _forEachListener(
529 void action(_BufferingStreamSubscription<T> subscription)) { 553 void action(_BufferingStreamSubscription<T> subscription)) {
530 if ((_state & _STATE_FIRING) != 0) { 554 if (_isFiring) {
531 throw new StateError( 555 throw new StateError(
532 "Cannot fire new event. Controller is already firing an event"); 556 "Cannot fire new event. Controller is already firing an event");
533 } 557 }
534 if (_isEmpty) return; 558 if (_isEmpty) return;
535 559
536 // Get event id of this event. 560 // Get event id of this event.
537 int id = (_state & _STATE_EVENT_ID); 561 int id = (_state & _STATE_EVENT_ID);
538 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] 562 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel]
539 // callbacks while firing, and we prevent reentrancy of this function. 563 // callbacks while firing, and we prevent reentrancy of this function.
540 // 564 //
(...skipping 13 matching lines...) Expand all
554 _removeListener(subscription); 578 _removeListener(subscription);
555 } 579 }
556 subscription._eventState &= ~_MultiplexSubscription._STATE_FIRING; 580 subscription._eventState &= ~_MultiplexSubscription._STATE_FIRING;
557 } else { 581 } else {
558 link = subscription._next; 582 link = subscription._next;
559 } 583 }
560 } 584 }
561 _state &= ~_STATE_FIRING; 585 _state &= ~_STATE_FIRING;
562 586
563 if (_isEmpty) { 587 if (_isEmpty) {
564 _runGuarded(_onCancel); 588 _callOnCancel();
565 } 589 }
566 } 590 }
591
592 void _callOnCancel() {
593 _runGuarded(_onCancel);
594 }
567 } 595 }
596
597 class _BufferingMultiplexStreamController<T>
598 extends _MultiplexStreamController<T>
599 implements _EventDispatch<T> {
600 _StreamImplEvents _pending;
601
602 _BufferingMultiplexStreamController(void onListen(), void onCancel())
603 : super(onListen, onCancel);
604
605 bool get _hasPending => _pending != null && ! _pending.isEmpty;
606
607 void _addPendingEvent(_DelayedEvent event) {
608 if (_pending == null) {
609 _pending = new _StreamImplEvents();
610 }
611 _pending.add(event);
612 }
613
614 void add(T data) {
615 if (_isFiring) {
616 _addPendingEvent(new _DelayedData<T>(data));
617 return;
618 }
619 super.add(data);
620 while (_hasPending) {
621 _pending.handleNext(this);
622 }
623 }
624
625 void addError(Object error, [StackTrace stackTrace]) {
626 if (_isFiring) {
627 _addPendingEvent(new _DelayedError(error));
628 return;
629 }
630 super.addError(error, stackTrace);
631 while (_hasPending) {
632 _pending.handleNext(this);
633 }
634 }
635
636 void close() {
637 if (_isFiring) {
638 _addPendingEvent(const _DelayedDone());
639 _state |= _STATE_CLOSED;
640 return;
641 }
642 super.close();
643 assert(!_hasPending);
644 }
645
646 void _callOnCancel() {
647 if (_hasPending) {
648 _pending.clear();
649 _pending = null;
650 }
651 super._callOnCancel();
652
653 }
654 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698