Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(285)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 12049013: Change singleSubscription/multiSubscription to normal/broadcast. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Default implementation of a stream with a controller for adding 8 // Default implementation of a stream with a controller for adding
9 // events to the stream. 9 // events to the stream.
10 // ------------------------------------------------------------------- 10 // -------------------------------------------------------------------
11 11
12 /** 12 /**
13 * A controller and the stream it controls. 13 * A controller and the stream it controls.
14 * 14 *
15 * This controller allows sending data, error and done events on 15 * This controller allows sending data, error and done events on
16 * its [stream]. 16 * its [stream].
17 * This class can be used to create a simple stream that others 17 * This class can be used to create a simple stream that others
18 * can listen on, and to push events to that stream. 18 * can listen on, and to push events to that stream.
19 * 19 *
20 * It's possible to check whether the stream is paused or not, and whether 20 * It's possible to check whether the stream is paused or not, and whether
21 * it has subscribers or not, as well as getting a callback when either of 21 * it has subscribers or not, as well as getting a callback when either of
22 * these change. 22 * these change.
23 */ 23 */
24 class StreamController<T> extends Stream<T> implements StreamSink<T> { 24 class StreamController<T> implements StreamSink<T> {
floitsch 2013/01/22 14:37:56 This looks like an unrelated change. At least upda
Lasse Reichstein Nielsen 2013/01/22 15:33:37 Description updated.
25 _StreamImpl<T> _stream; 25 _StreamImpl<T> _stream;
floitsch 2013/01/22 14:37:56 Should we make this simply final and initialize it
Lasse Reichstein Nielsen 2013/01/22 15:33:37 Yes. We can do that now that the state-change-call
26 Stream<T> get stream => _stream;
27 26
28 /** 27 /**
29 * A controller with a [stream] that supports multiple subscribers. 28 * A controller with a [stream] that supports multiple subscribers.
30 * 29 *
31 * The [onPauseStateChange] function is called when the stream becomes 30 * The [onPauseStateChange] function is called when the stream becomes
32 * paused or resumes after being paused. The current pause state can 31 * paused or resumes after being paused. The current pause state can
33 * be read from [isPaused]. Ignored if [:null:]. 32 * be read from [isPaused]. Ignored if [:null:].
34 * 33 *
35 * The [onSubscriptionStateChange] function is called when the stream 34 * The [onSubscriptionStateChange] function is called when the stream
36 * receives its first listener or loses its last. The current subscription 35 * receives its first listener or loses its last. The current subscription
(...skipping 16 matching lines...) Expand all
53 * The [onSubscriptionStateChange] function is called when the stream 52 * The [onSubscriptionStateChange] function is called when the stream
54 * receives its first listener or loses its last. The current subscription 53 * receives its first listener or loses its last. The current subscription
55 * state can be read from [hasSubscribers]. Ignored if [:null:]. 54 * state can be read from [hasSubscribers]. Ignored if [:null:].
56 */ 55 */
57 StreamController({void onPauseStateChange(), 56 StreamController({void onPauseStateChange(),
58 void onSubscriptionStateChange()}) { 57 void onSubscriptionStateChange()}) {
59 _stream = new _SingleControllerStream<T>(onSubscriptionStateChange, 58 _stream = new _SingleControllerStream<T>(onSubscriptionStateChange,
60 onPauseStateChange); 59 onPauseStateChange);
61 } 60 }
62 61
63 bool get isSingleSubscription => _stream.isSingleSubscription; 62 /** The stream that this controller is controlling. */
64 63 Stream<T> get stream => _stream;
65 Stream<T> asMultiSubscriptionStream() => _stream.asMultiSubscriptionStream();
66
67 StreamSubscription listen(void onData(T data),
68 { void onError(AsyncError error),
69 void onDone(),
70 bool unsubscribeOnError}) {
71 return _stream.listen(onData,
72 onError: onError,
73 onDone: onDone,
74 unsubscribeOnError: unsubscribeOnError);
75 }
76 64
77 /** 65 /**
78 * Returns a view of this object that only exposes the [StreamSink] interface. 66 * Returns a view of this object that only exposes the [StreamSink] interface.
79 */ 67 */
80 StreamSink<T> get sink => new StreamSinkView<T>(this); 68 StreamSink<T> get sink => new StreamSinkView<T>(this);
81 69
82 /** Whether one or more active subscribers have requested a pause. */ 70 /** Whether one or more active subscribers have requested a pause. */
83 bool get isPaused => _stream._isPaused; 71 bool get isPaused => _stream._isPaused;
84 72
85 /** Whether there are currently any subscribers on this [Stream]. */ 73 /** Whether there are currently any subscribers on this [Stream]. */
(...skipping 26 matching lines...) Expand all
112 _stream._signalError(asyncError); 100 _stream._signalError(asyncError);
113 } 101 }
114 102
115 /** 103 /**
116 * Send or enqueue a "done" message. 104 * Send or enqueue a "done" message.
117 * 105 *
118 * The "done" message should be sent at most once by a stream, and it 106 * The "done" message should be sent at most once by a stream, and it
119 * should be the last message sent. 107 * should be the last message sent.
120 */ 108 */
121 void close() { _stream._close(); } 109 void close() { _stream._close(); }
122
123 void forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription)) {
124 _stream._forEachSubscriber(() {
125 try {
126 action();
127 } on AsyncError catch (e) {
128 e.throwDelayed();
129 } catch (e, s) {
130 new AsyncError(e, s).throwDelayed();
131 }
132 });
133 }
134 } 110 }
135 111
136 typedef void _NotificationHandler(); 112 typedef void _NotificationHandler();
137 113
138 class _MultiControllerStream<T> extends _MultiStreamImpl<T> { 114 class _MultiControllerStream<T> extends _MultiStreamImpl<T> {
139 _NotificationHandler _subscriptionHandler; 115 _NotificationHandler _subscriptionHandler;
140 _NotificationHandler _pauseHandler; 116 _NotificationHandler _pauseHandler;
141 117
142 _MultiControllerStream(this._subscriptionHandler, this._pauseHandler); 118 _MultiControllerStream(this._subscriptionHandler, this._pauseHandler);
143 119
(...skipping 13 matching lines...) Expand all
157 _SingleControllerStream(this._subscriptionHandler, this._pauseHandler); 133 _SingleControllerStream(this._subscriptionHandler, this._pauseHandler);
158 134
159 void _onSubscriptionStateChange() { 135 void _onSubscriptionStateChange() {
160 if (_subscriptionHandler != null) _subscriptionHandler(); 136 if (_subscriptionHandler != null) _subscriptionHandler();
161 } 137 }
162 138
163 void _onPauseStateChange() { 139 void _onPauseStateChange() {
164 if (_pauseHandler != null) _pauseHandler(); 140 if (_pauseHandler != null) _pauseHandler();
165 } 141 }
166 } 142 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698