Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(316)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 16125005: Make new StreamController be async by default. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Throws the given error in the next cycle. */ 7 /** Throws the given error in the next cycle. */
8 _throwDelayed(var error, [Object stackTrace]) { 8 _throwDelayed(var error, [Object stackTrace]) {
9 // We are going to reach the top-level here, but there might be a global 9 // We are going to reach the top-level here, but there might be a global
10 // exception handler. This means that we shouldn't print the stack trace. 10 // exception handler. This means that we shouldn't print the stack trace.
(...skipping 676 matching lines...) Expand 10 before | Expand all | Expand 10 after
687 } 687 }
688 event.perform(dispatch); 688 event.perform(dispatch);
689 } 689 }
690 690
691 void clear() { 691 void clear() {
692 if (isScheduled) cancelSchedule(); 692 if (isScheduled) cancelSchedule();
693 firstPendingEvent = lastPendingEvent = null; 693 firstPendingEvent = lastPendingEvent = null;
694 } 694 }
695 } 695 }
696 696
697 class _MultiplexerLinkedList { 697 class _BroadcastLinkedList {
698 _MultiplexerLinkedList _next; 698 _BroadcastLinkedList _next;
699 _MultiplexerLinkedList _previous; 699 _BroadcastLinkedList _previous;
700 700
701 void _unlink() { 701 void _unlink() {
702 _previous._next = _next; 702 _previous._next = _next;
703 _next._previous = _previous; 703 _next._previous = _previous;
704 _next = _previous = this; 704 _next = _previous = this;
705 } 705 }
706 706
707 void _insertBefore(_MultiplexerLinkedList newNext) { 707 void _insertBefore(_BroadcastLinkedList newNext) {
708 _MultiplexerLinkedList newPrevious = newNext._previous; 708 _BroadcastLinkedList newPrevious = newNext._previous;
709 newPrevious._next = this; 709 newPrevious._next = this;
710 newNext._previous = _previous; 710 newNext._previous = _previous;
711 _previous._next = newNext; 711 _previous._next = newNext;
712 _previous = newPrevious; 712 _previous = newPrevious;
713 } 713 }
714 } 714 }
715 715
716 class _AsBroadcastStream<T> extends Stream<T> { 716 class _AsBroadcastStream<T> extends Stream<T> {
717 final Stream<T> _source; 717 final Stream<T> _source;
718 _BufferingMultiplexStreamController<T> _controller; 718 _AsBroadcastStreamController<T> _controller;
719 StreamSubscription<T> _subscription; 719 StreamSubscription<T> _subscription;
720 720
721 _AsBroadcastStream(this._source) { 721 _AsBroadcastStream(this._source) {
722 _controller = new _BufferingMultiplexStreamController<T>(null, _onCancel); 722 _controller = new _AsBroadcastStreamController<T>(null, _onCancel);
723 } 723 }
724 724
725 bool get isBroadcast => true; 725 bool get isBroadcast => true;
726 726
727 StreamSubscription<T> listen(void onData(T data), 727 StreamSubscription<T> listen(void onData(T data),
728 { void onError(Object error), 728 { void onError(Object error),
729 void onDone(), 729 void onDone(),
730 bool cancelOnError}) { 730 bool cancelOnError}) {
731 if (_controller == null) { 731 if (_controller == null) {
732 throw new StateError("Source stream has been closed."); 732 throw new StateError("Source stream has been closed.");
(...skipping 159 matching lines...) Expand 10 before | Expand all | Expand 10 after
892 _FutureImpl<bool> hasNext = _futureOrPrefetch; 892 _FutureImpl<bool> hasNext = _futureOrPrefetch;
893 _clear(); 893 _clear();
894 hasNext._setValue(false); 894 hasNext._setValue(false);
895 return; 895 return;
896 } 896 }
897 _subscription.pause(); 897 _subscription.pause();
898 _futureOrPrefetch = null; 898 _futureOrPrefetch = null;
899 _state = _STATE_EXTRA_DONE; 899 _state = _STATE_EXTRA_DONE;
900 } 900 }
901 } 901 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698