Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(380)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 15673006: Implement asBroadcast using a _MultiplexStreamController. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 430 matching lines...) Expand 10 before | Expand all | Expand 10 after
441 * A multiplex controller is never paused. 441 * A multiplex controller is never paused.
442 * 442 *
443 * Each receiving stream may be paused individually, and they handle their 443 * Each receiving stream may be paused individually, and they handle their
444 * own buffering. 444 * own buffering.
445 */ 445 */
446 bool get isPaused => false; 446 bool get isPaused => false;
447 447
448 /** Whether there are currently a subscriber on the [Stream]. */ 448 /** Whether there are currently a subscriber on the [Stream]. */
449 bool get hasListener => !_isEmpty; 449 bool get hasListener => !_isEmpty;
450 450
451 /** Whether an event is being fired (sent to some, but not all, listeners). */
452 bool get _isFiring => (_state & _STATE_FIRING) != 0;
453
451 // Linked list helpers 454 // Linked list helpers
452 455
453 bool get _isEmpty => identical(_next, this); 456 bool get _isEmpty => identical(_next, this);
454 457
455 /** Adds subscription to linked list of active listeners. */ 458 /** Adds subscription to linked list of active listeners. */
456 void _addListener(_MultiplexSubscription<T> subscription) { 459 void _addListener(_MultiplexSubscription<T> subscription) {
457 _MultiplexSubscriptionLink previous = _previous; 460 _MultiplexSubscriptionLink previous = _previous;
458 previous._next = subscription; 461 previous._next = subscription;
459 _previous = subscription._previous; 462 _previous = subscription._previous;
460 subscription._previous._next = this; 463 subscription._previous._next = this;
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
493 } 496 }
494 } 497 }
495 498
496 void _recordPause(StreamSubscription<T> subscription) {} 499 void _recordPause(StreamSubscription<T> subscription) {}
497 void _recordResume(StreamSubscription<T> subscription) {} 500 void _recordResume(StreamSubscription<T> subscription) {}
498 501
499 // EventSink interface. 502 // EventSink interface.
500 503
501 void add(T data) { 504 void add(T data) {
502 assert(!isClosed); 505 assert(!isClosed);
506 _sendData(data);
507 }
508
509 void addError(Object error, [Object stackTrace]) {
510 assert(!isClosed);
511 // TODO(lrn): Handle stacktrace.
floitsch 2013/05/29 09:39:58 That should be less than 5 lines of code.
Lasse Reichstein Nielsen 2013/05/29 10:39:12 Done.
512 _sendError(error);
513 }
514
515 void close() {
516 assert(!isClosed);
517 _state |= _STATE_CLOSED;
518 _sendDone();
519 }
520
521 // EventDispatch interface.
522
523 void _sendData(T data) {
503 if (_isEmpty) return; 524 if (_isEmpty) return;
504 _forEachListener((_BufferingStreamSubscription<T> subscription) { 525 _forEachListener((_BufferingStreamSubscription<T> subscription) {
505 subscription._add(data); 526 subscription._add(data);
506 }); 527 });
507 } 528 }
508 529
509 void addError(Object error, [Object stackTrace]) { 530 void _sendError(Object error) {
510 assert(!isClosed);
511 if (_isEmpty) return; 531 if (_isEmpty) return;
512 _forEachListener((_BufferingStreamSubscription<T> subscription) { 532 _forEachListener((_BufferingStreamSubscription<T> subscription) {
513 subscription._addError(error); 533 subscription._addError(error);
514 }); 534 });
515 } 535 }
516 536
517 void close() { 537 void _sendDone() {
518 assert(!isClosed);
519 _state |= _STATE_CLOSED;
520 if (_isEmpty) return; 538 if (_isEmpty) return;
521 _forEachListener((_MultiplexSubscription<T> subscription) { 539 _forEachListener((_MultiplexSubscription<T> subscription) {
522 subscription._close(); 540 subscription._close();
523 subscription._eventState |= 541 subscription._eventState |=
524 _MultiplexSubscription._STATE_REMOVE_AFTER_FIRING; 542 _MultiplexSubscription._STATE_REMOVE_AFTER_FIRING;
525 }); 543 });
526 } 544 }
527 545
528 void _forEachListener( 546 void _forEachListener(
529 void action(_BufferingStreamSubscription<T> subscription)) { 547 void action(_BufferingStreamSubscription<T> subscription)) {
530 if ((_state & _STATE_FIRING) != 0) { 548 if (_isFiring) {
531 throw new StateError( 549 throw new StateError(
532 "Cannot fire new event. Controller is already firing an event"); 550 "Cannot fire new event. Controller is already firing an event");
533 } 551 }
534 if (_isEmpty) return; 552 if (_isEmpty) return;
535 553
536 // Get event id of this event. 554 // Get event id of this event.
537 int id = (_state & _STATE_EVENT_ID); 555 int id = (_state & _STATE_EVENT_ID);
538 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] 556 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel]
539 // callbacks while firing, and we prevent reentrancy of this function. 557 // callbacks while firing, and we prevent reentrancy of this function.
540 // 558 //
(...skipping 17 matching lines...) Expand all
558 link = subscription._next; 576 link = subscription._next;
559 } 577 }
560 } 578 }
561 _state &= ~_STATE_FIRING; 579 _state &= ~_STATE_FIRING;
562 580
563 if (_isEmpty) { 581 if (_isEmpty) {
564 _runGuarded(_onCancel); 582 _runGuarded(_onCancel);
565 } 583 }
566 } 584 }
567 } 585 }
586
587 class _BufferingMultiplexStreamController<T>
588 extends _MultiplexStreamController<T>
589 implements _EventDispatch<T> {
590 _StreamImplEvents _pending;
591
592 _BufferingMultiplexStreamController(void onListen(), void onCancel())
593 : super(onListen, onCancel);
594
595 bool get _hasPending => _pending != null && ! _pending.isEmpty;
596
597 void _addPendingEvent(_DelayedEvent event) {
598 if (_pending == null) {
599 _pending = new _StreamImplEvents();
600 }
601 _pending.add(event);
602 }
603
604 void add(T data) {
605 if (_isFiring || _hasPending) {
floitsch 2013/05/29 09:39:58 add comment when _hasPending but not _isFiring.
Lasse Reichstein Nielsen 2013/05/29 10:39:12 It's only during the onCancel callback after remov
606 _addPendingEvent(new _DelayedData<T>(data));
607 return;
608 }
609 super.add(data);
610 while (_hasPending) {
611 _pending.handleNext(this);
612 }
613 }
614
615 void addError(Object error, [StackTrace stackTrace]) {
616 if (_isFiring || _hasPending) {
617 _addPendingEvent(new _DelayedError(error));
618 return;
619 }
620 super.addError(error, stackTrace);
621 while (_hasPending) {
622 _pending.handleNext(this);
623 }
624 }
625
626 void close() {
627 if (_isFiring || _hasPending) {
628 _addPendingEvent(const _DelayedDone());
629 _state |= _STATE_CLOSED;
630 return;
631 }
632 super.close();
633 assert(!_hasPending);
634 }
635 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698