Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(78)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 14196003: Change StreamController constructor. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments and rebase. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/io/file_impl.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
43 * 43 *
44 * If listeners are added after the stream has completed (sent a "done" event), 44 * If listeners are added after the stream has completed (sent a "done" event),
45 * the listeners will be sent a "done" event eventually, but they won't affect 45 * the listeners will be sent a "done" event eventually, but they won't affect
46 * the stream at all, and won't trigger callbacks. From the controller's point 46 * the stream at all, and won't trigger callbacks. From the controller's point
47 * of view, the stream is completely inert when has completed. 47 * of view, the stream is completely inert when has completed.
48 */ 48 */
49 class StreamController<T> extends EventSink<T> { 49 class StreamController<T> extends EventSink<T> {
50 final _StreamImpl<T> stream; 50 final _StreamImpl<T> stream;
51 51
52 /** 52 /**
53 * A controller with a broadcast [stream].. 53 * A controller with a broadcast [stream].
54 * 54 *
55 * The [onPauseStateChange] function is called when the stream becomes 55 * The [onPause] function is called when the stream becomes
56 * paused or resumes after being paused. The current pause state can 56 * paused. [onResume] is called when the stream resumed.
57 * be read from [isPaused]. Ignored if [:null:].
58 * 57 *
59 * The [onSubscriptionStateChange] function is called when the stream 58 * The [onListen] callback is called when the stream
60 * receives its first listener or loses its last. The current subscription 59 * receives its first listener. [onCancel] when the last listener cancels
61 * state can be read from [hasListener]. Ignored if [:null:]. 60 * its subscription.
61 *
62 * If the stream is canceled before the controller needs new data the
63 * [onResume] call might not be executed.
62 */ 64 */
63 StreamController.broadcast({void onPauseStateChange(), 65 StreamController.broadcast({void onListen(),
64 void onSubscriptionStateChange()}) 66 void onPause(),
65 : stream = new _MultiControllerStream<T>(onSubscriptionStateChange, 67 void onResume(),
66 onPauseStateChange); 68 void onCancel()})
69 : stream = new _MultiControllerStream<T>(
70 onListen, onPause, onResume, onCancel);
67 71
68 /** 72 /**
69 * A controller with a [stream] that supports only one single subscriber. 73 * A controller with a [stream] that supports only one single subscriber.
70 * 74 *
71 * The controller will buffer all incoming events until the subscriber is 75 * The controller will buffer all incoming events until the subscriber is
72 * registered. 76 * registered.
73 * 77 *
74 * The [onPauseStateChange] function is called when the stream becomes 78 * The [onPause] function is called when the stream becomes
75 * paused or resumes after being paused. The current pause state can 79 * paused. [onResume] is called when the stream resumed.
76 * be read from [isPaused]. Ignored if [:null:].
77 * 80 *
78 * The [onSubscriptionStateChange] function is called when the stream 81 * The [onListen] callback is called when the stream
79 * receives its first listener or loses its last. The current subscription 82 * receives its listener. [onCancel] when the listener cancels
80 * state can be read from [hasListener]. Ignored if [:null:]. 83 * its subscription.
84 *
85 * If the stream is canceled before the controller needs new data the
86 * [onResume] call might not be executed.
81 */ 87 */
82 StreamController({void onPauseStateChange(), 88 StreamController({void onListen(),
83 void onSubscriptionStateChange()}) 89 void onPause(),
84 : stream = new _SingleControllerStream<T>(onSubscriptionStateChange, 90 void onResume(),
85 onPauseStateChange); 91 void onCancel()})
92 : stream = new _SingleControllerStream<T>(
93 onListen, onPause, onResume, onCancel);
86 94
87 /** 95 /**
88 * Returns a view of this object that only exposes the [EventSink] interface. 96 * Returns a view of this object that only exposes the [EventSink] interface.
89 */ 97 */
90 EventSink<T> get sink => new _EventSinkView<T>(this); 98 EventSink<T> get sink => new _EventSinkView<T>(this);
91 99
92 /** 100 /**
93 * Whether the stream is closed for adding more events. 101 * Whether the stream is closed for adding more events.
94 * 102 *
95 * If true, the "done" event might not have fired yet, but it has been 103 * If true, the "done" event might not have fired yet, but it has been
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
135 * 143 *
136 * The "done" message should be sent at most once by a stream, and it 144 * The "done" message should be sent at most once by a stream, and it
137 * should be the last message sent. 145 * should be the last message sent.
138 */ 146 */
139 void close() { stream._close(); } 147 void close() { stream._close(); }
140 } 148 }
141 149
142 typedef void _NotificationHandler(); 150 typedef void _NotificationHandler();
143 151
144 class _MultiControllerStream<T> extends _MultiStreamImpl<T> { 152 class _MultiControllerStream<T> extends _MultiStreamImpl<T> {
145 _NotificationHandler _subscriptionHandler; 153 _NotificationHandler _onListen;
146 _NotificationHandler _pauseHandler; 154 _NotificationHandler _onPause;
155 _NotificationHandler _onResume;
156 _NotificationHandler _onCancel;
147 157
148 _MultiControllerStream(this._subscriptionHandler, this._pauseHandler); 158 // TODO(floitsch): share this code with _SingleControllerStream.
149 159 void _runGuarded(_NotificationHandler notificationHandler) {
150 void _onSubscriptionStateChange() { 160 if (notificationHandler == null) return;
151 if (_subscriptionHandler != null) { 161 try {
152 try { 162 notificationHandler();
153 _subscriptionHandler(); 163 } catch (e, s) {
154 } catch (e, s) { 164 new AsyncError(e, s).throwDelayed();
155 new AsyncError(e, s).throwDelayed();
156 }
157 } 165 }
158 } 166 }
159 167
168 _MultiControllerStream(this._onListen,
169 this._onPause,
170 this._onResume,
171 this._onCancel);
172
173 void _onSubscriptionStateChange() {
174 _runGuarded(_hasListener ? _onListen : _onCancel);
175 }
176
160 void _onPauseStateChange() { 177 void _onPauseStateChange() {
161 if (_pauseHandler != null) { 178 _runGuarded(_isPaused ? _onPause : _onResume);
162 try {
163 _pauseHandler();
164 } catch (e, s) {
165 new AsyncError(e, s).throwDelayed();
166 }
167 }
168 } 179 }
169 } 180 }
170 181
171 class _SingleControllerStream<T> extends _SingleStreamImpl<T> { 182 class _SingleControllerStream<T> extends _SingleStreamImpl<T> {
172 _NotificationHandler _subscriptionHandler; 183 _NotificationHandler _onListen;
173 _NotificationHandler _pauseHandler; 184 _NotificationHandler _onPause;
185 _NotificationHandler _onResume;
186 _NotificationHandler _onCancel;
174 187
175 _SingleControllerStream(this._subscriptionHandler, this._pauseHandler); 188 // TODO(floitsch): share this code with _MultiControllerStream.
176 189 _runGuarded(_NotificationHandler notificationHandler) {
177 void _onSubscriptionStateChange() { 190 if (notificationHandler == null) return;
178 if (_subscriptionHandler != null) { 191 try {
179 try { 192 notificationHandler();
180 _subscriptionHandler(); 193 } catch (e, s) {
181 } catch (e, s) { 194 new AsyncError(e, s).throwDelayed();
182 new AsyncError(e, s).throwDelayed();
183 }
184 } 195 }
185 } 196 }
186 197
198 _SingleControllerStream(this._onListen,
199 this._onPause,
200 this._onResume,
201 this._onCancel);
202
203 void _onSubscriptionStateChange() {
204 _runGuarded(_hasListener ? _onListen : _onCancel);
205 }
206
187 void _onPauseStateChange() { 207 void _onPauseStateChange() {
188 if (_pauseHandler != null) { 208 _runGuarded(_isPaused ? _onPause : _onResume);
189 try {
190 _pauseHandler();
191 } catch (e, s) {
192 new AsyncError(e, s).throwDelayed();
193 }
194 }
195 } 209 }
196 } 210 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/io/file_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698