Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(189)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 12049013: Change singleSubscription/multiSubscription to normal/broadcast. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Addressed comments, renamed .multiSubscription to .broadcast. Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Default implementation of a stream with a controller for adding 8 // Controller for creating and adding events to a stream.
9 // events to the stream.
10 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
11 10
12 /** 11 /**
13 * A controller and the stream it controls. 12 * A controller with the stream it controls.
floitsch 2013/01/24 13:03:42 containing?
14 * 13 *
15 * This controller allows sending data, error and done events on 14 * This controller allows sending data, error and done events on
16 * its [stream]. 15 * its [stream].
17 * This class can be used to create a simple stream that others 16 * This class can be used to create a simple stream that others
18 * can listen on, and to push events to that stream. 17 * can listen on, and to push events to that stream.
19 * 18 *
20 * It's possible to check whether the stream is paused or not, and whether 19 * It's possible to check whether the stream is paused or not, and whether
21 * it has subscribers or not, as well as getting a callback when either of 20 * it has subscribers or not, as well as getting a callback when either of
22 * these change. 21 * these change.
23 */ 22 */
24 class StreamController<T> extends Stream<T> implements StreamSink<T> { 23 class StreamController<T> implements StreamSink<T> {
25 _StreamImpl<T> _stream; 24 final _StreamImpl<T> stream;
26 Stream<T> get stream => _stream;
27 25
28 /** 26 /**
29 * A controller with a [stream] that supports multiple subscribers. 27 * A controller with a broadcast [stream]..
floitsch 2013/01/24 13:03:42 remove second ".".
30 * 28 *
31 * The [onPauseStateChange] function is called when the stream becomes 29 * The [onPauseStateChange] function is called when the stream becomes
32 * paused or resumes after being paused. The current pause state can 30 * paused or resumes after being paused. The current pause state can
33 * be read from [isPaused]. Ignored if [:null:]. 31 * be read from [isPaused]. Ignored if [:null:].
34 * 32 *
35 * The [onSubscriptionStateChange] function is called when the stream 33 * The [onSubscriptionStateChange] function is called when the stream
36 * receives its first listener or loses its last. The current subscription 34 * receives its first listener or loses its last. The current subscription
37 * state can be read from [hasSubscribers]. Ignored if [:null:]. 35 * state can be read from [hasSubscribers]. Ignored if [:null:].
38 */ 36 */
39 StreamController.multiSubscription({void onPauseStateChange(), 37 StreamController.broadcast({void onPauseStateChange(),
40 void onSubscriptionStateChange()}) { 38 void onSubscriptionStateChange()})
41 _stream = new _MultiControllerStream<T>(onSubscriptionStateChange, 39 : stream = new _MultiControllerStream<T>(onSubscriptionStateChange,
42 onPauseStateChange); 40 onPauseStateChange);
43 } 41
44 /** 42 /**
45 * A controller with a [stream] that supports only one single subscriber. 43 * A controller with a [stream] that supports only one single subscriber.
44 *
46 * The controller will buffer all incoming events until the subscriber is 45 * The controller will buffer all incoming events until the subscriber is
47 * registered. 46 * registered.
48 * 47 *
49 * The [onPauseStateChange] function is called when the stream becomes 48 * The [onPauseStateChange] function is called when the stream becomes
50 * paused or resumes after being paused. The current pause state can 49 * paused or resumes after being paused. The current pause state can
51 * be read from [isPaused]. Ignored if [:null:]. 50 * be read from [isPaused]. Ignored if [:null:].
52 * 51 *
53 * The [onSubscriptionStateChange] function is called when the stream 52 * The [onSubscriptionStateChange] function is called when the stream
54 * receives its first listener or loses its last. The current subscription 53 * receives its first listener or loses its last. The current subscription
55 * state can be read from [hasSubscribers]. Ignored if [:null:]. 54 * state can be read from [hasSubscribers]. Ignored if [:null:].
56 */ 55 */
57 StreamController({void onPauseStateChange(), 56 StreamController({void onPauseStateChange(),
58 void onSubscriptionStateChange()}) { 57 void onSubscriptionStateChange()})
59 _stream = new _SingleControllerStream<T>(onSubscriptionStateChange, 58 : stream = new _SingleControllerStream<T>(onSubscriptionStateChange,
60 onPauseStateChange); 59 onPauseStateChange);
61 }
62
63 bool get isSingleSubscription => _stream.isSingleSubscription;
64
65 Stream<T> asMultiSubscriptionStream() => _stream.asMultiSubscriptionStream();
66
67 StreamSubscription listen(void onData(T data),
68 { void onError(AsyncError error),
69 void onDone(),
70 bool unsubscribeOnError}) {
71 return _stream.listen(onData,
72 onError: onError,
73 onDone: onDone,
74 unsubscribeOnError: unsubscribeOnError);
75 }
76 60
77 /** 61 /**
78 * Returns a view of this object that only exposes the [StreamSink] interface. 62 * Returns a view of this object that only exposes the [StreamSink] interface.
79 */ 63 */
80 StreamSink<T> get sink => new StreamSinkView<T>(this); 64 StreamSink<T> get sink => new StreamSinkView<T>(this);
81 65
82 /** Whether one or more active subscribers have requested a pause. */ 66 /** Whether one or more active subscribers have requested a pause. */
83 bool get isPaused => _stream._isPaused; 67 bool get isPaused => stream._isPaused;
84 68
85 /** Whether there are currently any subscribers on this [Stream]. */ 69 /** Whether there are currently any subscribers on this [Stream]. */
86 bool get hasSubscribers => _stream._hasSubscribers; 70 bool get hasSubscribers => stream._hasSubscribers;
87 71
88 /** 72 /**
89 * Send or queue a data event. 73 * Send or queue a data event.
90 */ 74 */
91 void add(T value) => _stream._add(value); 75 void add(T value) => stream._add(value);
92 76
93 /** 77 /**
94 * Send or enqueue an error event. 78 * Send or enqueue an error event.
95 * 79 *
96 * If [error] is not an [AsyncError], [error] and an optional [stackTrace] 80 * If [error] is not an [AsyncError], [error] and an optional [stackTrace]
97 * is combined into an [AsyncError] and sent this stream's listeners. 81 * is combined into an [AsyncError] and sent this stream's listeners.
98 * 82 *
99 * Otherwise, if [error] is an [AsyncError], it is used directly as the 83 * Otherwise, if [error] is an [AsyncError], it is used directly as the
100 * error object reported to listeners, and the [stackTrace] is ignored. 84 * error object reported to listeners, and the [stackTrace] is ignored.
101 * 85 *
102 * If a subscription has requested to be unsubscribed on errors, 86 * If a subscription has requested to be unsubscribed on errors,
103 * it will be unsubscribed after receiving this event. 87 * it will be unsubscribed after receiving this event.
104 */ 88 */
105 void signalError(Object error, [Object stackTrace]) { 89 void signalError(Object error, [Object stackTrace]) {
106 AsyncError asyncError; 90 AsyncError asyncError;
107 if (error is AsyncError) { 91 if (error is AsyncError) {
108 asyncError = error; 92 asyncError = error;
109 } else { 93 } else {
110 asyncError = new AsyncError(error, stackTrace); 94 asyncError = new AsyncError(error, stackTrace);
111 } 95 }
112 _stream._signalError(asyncError); 96 stream._signalError(asyncError);
113 } 97 }
114 98
115 /** 99 /**
116 * Send or enqueue a "done" message. 100 * Send or enqueue a "done" message.
117 * 101 *
118 * The "done" message should be sent at most once by a stream, and it 102 * The "done" message should be sent at most once by a stream, and it
119 * should be the last message sent. 103 * should be the last message sent.
120 */ 104 */
121 void close() { _stream._close(); } 105 void close() { stream._close(); }
122
123 void forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription)) {
124 _stream._forEachSubscriber(() {
125 try {
126 action();
127 } on AsyncError catch (e) {
128 e.throwDelayed();
129 } catch (e, s) {
130 new AsyncError(e, s).throwDelayed();
131 }
132 });
133 }
134 } 106 }
135 107
136 typedef void _NotificationHandler(); 108 typedef void _NotificationHandler();
137 109
138 class _MultiControllerStream<T> extends _MultiStreamImpl<T> { 110 class _MultiControllerStream<T> extends _MultiStreamImpl<T> {
139 _NotificationHandler _subscriptionHandler; 111 _NotificationHandler _subscriptionHandler;
140 _NotificationHandler _pauseHandler; 112 _NotificationHandler _pauseHandler;
141 113
142 _MultiControllerStream(this._subscriptionHandler, this._pauseHandler); 114 _MultiControllerStream(this._subscriptionHandler, this._pauseHandler);
143 115
(...skipping 13 matching lines...) Expand all
157 _SingleControllerStream(this._subscriptionHandler, this._pauseHandler); 129 _SingleControllerStream(this._subscriptionHandler, this._pauseHandler);
158 130
159 void _onSubscriptionStateChange() { 131 void _onSubscriptionStateChange() {
160 if (_subscriptionHandler != null) _subscriptionHandler(); 132 if (_subscriptionHandler != null) _subscriptionHandler();
161 } 133 }
162 134
163 void _onPauseStateChange() { 135 void _onPauseStateChange() {
164 if (_pauseHandler != null) _pauseHandler(); 136 if (_pauseHandler != null) _pauseHandler();
165 } 137 }
166 } 138 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698