Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4)

Side by Side Diff: tool/input_sdk/lib/async/stream_controller.dart

Issue 1953153002: Update dart:async to match the Dart repo. (Closed) Base URL: https://github.com/dart-lang/dev_compiler.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
11 /** 11 /**
12 * Type of a stream controller's `onListen`, `onPause` and `onResume` callbacks.
13 */
14 typedef void ControllerCallback();
15
16 /**
17 * Type of stream controller `onCancel` callbacks.
18 *
19 * The callback may return either `void` or a future.
20 */
21 typedef ControllerCancelCallback();
22
23 /**
12 * A controller with the stream it controls. 24 * A controller with the stream it controls.
13 * 25 *
14 * This controller allows sending data, error and done events on 26 * This controller allows sending data, error and done events on
15 * its [stream]. 27 * its [stream].
16 * This class can be used to create a simple stream that others 28 * This class can be used to create a simple stream that others
17 * can listen on, and to push events to that stream. 29 * can listen on, and to push events to that stream.
18 * 30 *
19 * It's possible to check whether the stream is paused or not, and whether 31 * It's possible to check whether the stream is paused or not, and whether
20 * it has subscribers or not, as well as getting a callback when either of 32 * it has subscribers or not, as well as getting a callback when either of
21 * these change. 33 * these change.
(...skipping 24 matching lines...) Expand all
46 * the stream at all, and won't trigger callbacks. From the controller's point 58 * the stream at all, and won't trigger callbacks. From the controller's point
47 * of view, the stream is completely inert when has completed. 59 * of view, the stream is completely inert when has completed.
48 */ 60 */
49 abstract class StreamController<T> implements StreamSink<T> { 61 abstract class StreamController<T> implements StreamSink<T> {
50 /** The stream that this controller is controlling. */ 62 /** The stream that this controller is controlling. */
51 Stream<T> get stream; 63 Stream<T> get stream;
52 64
53 /** 65 /**
54 * A controller with a [stream] that supports only one single subscriber. 66 * A controller with a [stream] that supports only one single subscriber.
55 * 67 *
56 * If [sync] is true, events may be passed directly to the stream's listener 68 * If [sync] is true, the returned stream controller is a
57 * during an [add], [addError] or [close] call. If [sync] is false, the event 69 * [SynchronousStreamController], and must be used with the care
58 * will be passed to the listener at a later time, after the code creating 70 * and attention necessary to not break the [Stream] contract.
59 * the event has returned. 71 * See [Completer.sync] for some explanations on when a synchronous
72 * dispatching can be used.
73 * If in doubt, keep the controller non-sync.
60 * 74 *
61 * The controller will buffer all incoming events until the subscriber is 75 * A Stream should be inert until a subscriber starts listening on it (using
62 * registered. 76 * the [onListen] callback to start producing events). Streams should not
77 * leak resources (like websockets) when no user ever listens on the stream.
78 *
79 * The controller buffers all incoming events until a subscriber is
80 * registered, but this feature should only be used in rare circumstances.
63 * 81 *
64 * The [onPause] function is called when the stream becomes 82 * The [onPause] function is called when the stream becomes
65 * paused. [onResume] is called when the stream resumed. 83 * paused. [onResume] is called when the stream resumed.
66 * 84 *
67 * The [onListen] callback is called when the stream 85 * The [onListen] callback is called when the stream
68 * receives its listener and [onCancel] when the listener ends 86 * receives its listener and [onCancel] when the listener ends
69 * its subscription. If [onCancel] needs to perform an asynchronous operation, 87 * its subscription. If [onCancel] needs to perform an asynchronous operation,
70 * [onCancel] should return a future that completes when the cancel operation 88 * [onCancel] should return a future that completes when the cancel operation
71 * is done. 89 * is done.
72 * 90 *
73 * If the stream is canceled before the controller needs new data the 91 * If the stream is canceled before the controller needs new data the
74 * [onResume] call might not be executed. 92 * [onResume] call might not be executed.
75 */ 93 */
76 factory StreamController({void onListen(), 94 factory StreamController({void onListen(),
77 void onPause(), 95 void onPause(),
78 void onResume(), 96 void onResume(),
79 onCancel(), 97 onCancel(),
80 bool sync: false}) { 98 bool sync: false}) {
81 if (onListen == null && onPause == null &&
82 onResume == null && onCancel == null) {
83 return sync
84 ? new _NoCallbackSyncStreamController<T>()
85 : new _NoCallbackAsyncStreamController<T>();
86 }
87 return sync 99 return sync
88 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) 100 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
89 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); 101 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
90 } 102 }
91 103
92 /** 104 /**
93 * A controller where [stream] can be listened to more than once. 105 * A controller where [stream] can be listened to more than once.
94 * 106 *
95 * The [Stream] returned by [stream] is a broadcast stream. 107 * The [Stream] returned by [stream] is a broadcast stream.
96 * It can be listened to more than once. 108 * It can be listened to more than once.
97 * 109 *
110 * A Stream should be inert until a subscriber starts listening on it (using
111 * the [onListen] callback to start producing events). Streams should not
112 * leak resources (like websockets) when no user ever listens on the stream.
113 *
114 * Broadcast streams do not buffer events when there is no listener.
115 *
98 * The controller distributes any events to all currently subscribed 116 * The controller distributes any events to all currently subscribed
99 * listeners at the time when [add], [addError] or [close] is called. 117 * listeners at the time when [add], [addError] or [close] is called.
100 * It is not allowed to call `add`, `addError`, or `close` before a previous 118 * It is not allowed to call `add`, `addError`, or `close` before a previous
101 * call has returned. The controller does not have any internal queue of 119 * call has returned. The controller does not have any internal queue of
102 * events, and if there are no listeners at the time the event is added, 120 * events, and if there are no listeners at the time the event is added,
103 * it will just be dropped, or, if it is an error, be reported as uncaught. 121 * it will just be dropped, or, if it is an error, be reported as uncaught.
104 * 122 *
105 * Each listener subscription is handled independently, 123 * Each listener subscription is handled independently,
106 * and if one pauses, only the pausing listener is affected. 124 * and if one pauses, only the pausing listener is affected.
107 * A paused listener will buffer events internally until unpaused or canceled. 125 * A paused listener will buffer events internally until unpaused or canceled.
108 * 126 *
109 * If [sync] is true, events may be fired directly by the stream's 127 * If [sync] is true, events may be fired directly by the stream's
110 * subscriptions during an [add], [addError] or [close] call. 128 * subscriptions during an [add], [addError] or [close] call.
111 * If [sync] is false, the event will be fired at a later time, 129 * The returned stream controller is a [SynchronousStreamController],
130 * and must be used with the care and attention necessary to not break
131 * the [Stream] contract.
132 * See [Completer.sync] for some explanations on when a synchronous
133 * dispatching can be used.
134 * If in doubt, keep the controller non-sync.
135 *
136 * If [sync] is false, the event will always be fired at a later time,
112 * after the code adding the event has completed. 137 * after the code adding the event has completed.
113 * 138 * In that case, no guarantees are given with regard to when
114 * When [sync] is false, no guarantees are given with regard to when
115 * multiple listeners get the events, except that each listener will get 139 * multiple listeners get the events, except that each listener will get
116 * all events in the correct order. Each subscription handles the events 140 * all events in the correct order. Each subscription handles the events
117 * individually. 141 * individually.
118 * If two events are sent on an async controller with two listeners, 142 * If two events are sent on an async controller with two listeners,
119 * one of the listeners may get both events 143 * one of the listeners may get both events
120 * before the other listener gets any. 144 * before the other listener gets any.
121 * A listener must be subscribed both when the event is initiated 145 * A listener must be subscribed both when the event is initiated
122 * (that is, when [add] is called) 146 * (that is, when [add] is called)
123 * and when the event is later delivered, 147 * and when the event is later delivered,
124 * in order to receive the event. 148 * in order to receive the event.
125 * 149 *
126 * The [onListen] callback is called when the first listener is subscribed, 150 * The [onListen] callback is called when the first listener is subscribed,
127 * and the [onCancel] is called when there are no longer any active listeners. 151 * and the [onCancel] is called when there are no longer any active listeners.
128 * If a listener is added again later, after the [onCancel] was called, 152 * If a listener is added again later, after the [onCancel] was called,
129 * the [onListen] will be called again. 153 * the [onListen] will be called again.
130 */ 154 */
131 factory StreamController.broadcast({void onListen(), 155 factory StreamController.broadcast({void onListen(),
132 void onCancel(), 156 void onCancel(),
133 bool sync: false}) { 157 bool sync: false}) {
134 return sync 158 return sync
135 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) 159 ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
136 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); 160 : new _AsyncBroadcastStreamController<T>(onListen, onCancel);
137 } 161 }
138 162
139 /** 163 /**
164 * The callback which is called when the stream is listened to.
165 *
166 * May be set to `null`, in which case no callback will happen.
167 */
168 ControllerCallback get onListen;
169
170 void set onListen(void onListenHandler());
171
172 /**
173 * The callback which is called when the stream is paused.
174 *
175 * May be set to `null`, in which case no callback will happen.
176 *
177 * Pause related callbacks are not supported on broadcast stream controllers.
178 */
179 ControllerCallback get onPause;
180
181 void set onPause(void onPauseHandler());
182
183 /**
184 * The callback which is called when the stream is resumed.
185 *
186 * May be set to `null`, in which case no callback will happen.
187 *
188 * Pause related callbacks are not supported on broadcast stream controllers.
189 */
190 ControllerCallback get onResume;
191
192 void set onResume(void onResumeHandler());
193
194 /**
195 * The callback which is called when the stream is canceled.
196 *
197 * May be set to `null`, in which case no callback will happen.
198 */
199 ControllerCancelCallback get onCancel;
200
201 void set onCancel(onCancelHandler());
202
203 /**
140 * Returns a view of this object that only exposes the [StreamSink] interface. 204 * Returns a view of this object that only exposes the [StreamSink] interface.
141 */ 205 */
142 StreamSink<T> get sink; 206 StreamSink<T> get sink;
143 207
144 /** 208 /**
145 * Whether the stream is closed for adding more events. 209 * Whether the stream controller is closed for adding more events.
146 * 210 *
147 * If true, the "done" event might not have fired yet, but it has been 211 * The controller becomes closed by calling the [close] method.
148 * scheduled, and it is too late to add more events. 212 * New events cannot be added, by calling [add] or [addError],
213 * to a closed controller.
214 *
215 * If the controller is closed,
216 * the "done" event might not have been delivered yet,
217 * but it has been scheduled, and it is too late to add more events.
149 */ 218 */
150 bool get isClosed; 219 bool get isClosed;
151 220
152 /** 221 /**
153 * Whether the subscription would need to buffer events. 222 * Whether the subscription would need to buffer events.
154 * 223 *
155 * This is the case if the controller's stream has a listener and it is 224 * This is the case if the controller's stream has a listener and it is
156 * paused, or if it has not received a listener yet. In that case, the 225 * paused, or if it has not received a listener yet. In that case, the
157 * controller is considered paused as well. 226 * controller is considered paused as well.
158 * 227 *
159 * A broadcast stream controller is never considered paused. It always 228 * A broadcast stream controller is never considered paused. It always
160 * forwards its events to all uncanceled listeners, if any, and let them 229 * forwards its events to all uncanceled subscriptions, if any,
161 * handle their own pausing. 230 * and let the subscriptions handle their own pausing and buffering.
162 */ 231 */
163 bool get isPaused; 232 bool get isPaused;
164 233
165 /** Whether there is a subscriber on the [Stream]. */ 234 /** Whether there is a subscriber on the [Stream]. */
166 bool get hasListener; 235 bool get hasListener;
167 236
168 /** 237 /**
169 * Send or enqueue an error event. 238 * Send or enqueue an error event.
170 * 239 *
171 * If [error] is `null`, it is replaced by a [NullThrownError]. 240 * If [error] is `null`, it is replaced by a [NullThrownError].
172 *
173 * Also allows an objection stack trace object, on top of what [EventSink]
174 * allows.
175 */ 241 */
176 void addError(Object error, [StackTrace stackTrace]); 242 void addError(Object error, [StackTrace stackTrace]);
177 243
178 /** 244 /**
179 * Receives events from [source] and puts them into this controller's stream. 245 * Receives events from [source] and puts them into this controller's stream.
180 * 246 *
181 * Returns a future which completes when the source stream is done. 247 * Returns a future which completes when the source stream is done.
182 * 248 *
183 * Events must not be added directly to this controller using [add], 249 * Events must not be added directly to this controller using [add],
184 * [addError], [close] or [addStream], until the returned future 250 * [addError], [close] or [addStream], until the returned future
185 * is complete. 251 * is complete.
186 * 252 *
187 * Data and error events are forwarded to this controller's stream. A done 253 * Data and error events are forwarded to this controller's stream. A done
188 * event on the source will end the `addStream` operation and complete the 254 * event on the source will end the `addStream` operation and complete the
189 * returned future. 255 * returned future.
190 * 256 *
191 * If [cancelOnError] is true, only the first error on [source] is 257 * If [cancelOnError] is true, only the first error on [source] is
192 * forwarded to the controller's stream, and the `addStream` ends 258 * forwarded to the controller's stream, and the `addStream` ends
193 * after this. If [cancelOnError] is false, all errors are forwarded 259 * after this. If [cancelOnError] is false, all errors are forwarded
194 * and only a done event will end the `addStream`. 260 * and only a done event will end the `addStream`.
195 */ 261 */
196 Future addStream(Stream<T> source, {bool cancelOnError: true}); 262 Future addStream(Stream<T> source, {bool cancelOnError: true});
197 } 263 }
198 264
199 265
266 /**
267 * A stream controller that delivers its events synchronously.
268 *
269 * A synchronous stream controller is intended for cases where
270 * an already asynchronous event triggers an event on a stream.
271 *
272 * Instead of adding the event to the stream in a later microtask,
273 * causing extra latency, the event is instead fired immediately by the
274 * synchronous stream controller, as if the stream event was
275 * the current event or microtask.
276 *
277 * The synchronous stream controller can be used to break the contract
278 * on [Stream], and it must be used carefully to avoid doing so.
279 *
280 * The only advantage to using a [SynchronousStreamController] over a
281 * normal [StreamController] is the improved latency.
282 * Only use the synchronous version if the improvement is significant,
283 * and if its use is safe. Otherwise just use a normal stream controller,
284 * which will always have the correct behavior for a [Stream], and won't
285 * accidentally break other code.
286 *
287 * Adding events to a synchronous controller should only happen as the
288 * very last part of a the handling of the original event.
289 * At that point, adding an event to the stream is equivalent to
290 * returning to the event loop and adding the event in the next microtask.
291 *
292 * Each listener callback will be run as if it was a top-level event
293 * or microtask. This means that if it throws, the error will be reported as
294 * uncaught as soon as possible.
295 * This is one reason to add the event as the last thing in the original event
296 * handler - any action done after adding the event will delay the report of
297 * errors in the event listener callbacks.
298 *
299 * If an event is added in a setting that isn't known to be another event,
300 * it may cause the stream's listener to get that event before the listener
301 * is ready to handle it. We promise that after calling [Stream.listen],
302 * you won't get any events until the code doing the listen has completed.
303 * Calling [add] in response to a function call of unknown origin may break
304 * that promise.
305 *
306 * An [onListen] callback from the controller is *not* an asynchronous event,
307 * and adding events to the controller in the `onListen` callback is always
308 * wrong. The events will be delivered before the listener has even received
309 * the subscription yet.
310 *
311 * The synchronous broadcast stream controller also has a restrictions that a
312 * normal stream controller does not:
313 * The [add], [addError], [close] and [addStream] methods *must not* be
314 * called while an event is being delivered.
315 * That is, if a callback on a subscription on the controller's stream causes
316 * a call to any of the functions above, the call will fail.
317 * A broadcast stream may have more than one listener, and if an
318 * event is added synchronously while another is being also in the process
319 * of being added, the latter event might reach some listeners before
320 * the former. To prevent that, an event cannot be added while a previous
321 * event is being fired.
322 * This guarantees that an event is fully delivered when the
323 * first [add], [addError] or [close] returns,
324 * and further events will be delivered in the correct order.
325 *
326 * This still only guarantees that the event is delivered to the subscription.
327 * If the subscription is paused, the actual callback may still happen later,
328 * and the event will instead be buffered by the subscription.
329 * Barring pausing, and the following buffered events that haven't been
330 * delivered yet, callbacks will be called synchronously when an event is added.
331 *
332 * Adding an event to a synchronous non-broadcast stream controller while
333 * another event is in progress may cause the second event to be delayed
334 * and not be delivered synchronously, and until that event is delivered,
335 * the controller will not act synchronously.
336 */
337 abstract class SynchronousStreamController<T> implements StreamController<T> {
338 /**
339 * Adds event to the controller's stream.
340 *
341 * As [StreamController.add], but must not be called while an event is
342 * being added by [add], [addError] or [close].
343 */
344 void add(T data);
345
346 /**
347 * Adds error to the controller's stream.
348 *
349 * As [StreamController.addError], but must not be called while an event is
350 * being added by [add], [addError] or [close].
351 */
352 void addError(Object error, [StackTrace stackTrace]);
353
354 /**
355 * Closes the controller's stream.
356 *
357 * As [StreamController.close], but must not be called while an event is
358 * being added by [add], [addError] or [close].
359 */
360 Future close();
361 }
362
200 abstract class _StreamControllerLifecycle<T> { 363 abstract class _StreamControllerLifecycle<T> {
201 StreamSubscription<T> _subscribe( 364 StreamSubscription<T> _subscribe(
202 void onData(T data), 365 void onData(T data),
203 Function onError, 366 Function onError,
204 void onDone(), 367 void onDone(),
205 bool cancelOnError); 368 bool cancelOnError);
206 void _recordPause(StreamSubscription<T> subscription) {} 369 void _recordPause(StreamSubscription<T> subscription) {}
207 void _recordResume(StreamSubscription<T> subscription) {} 370 void _recordResume(StreamSubscription<T> subscription) {}
208 Future _recordCancel(StreamSubscription<T> subscription) => null; 371 Future _recordCancel(StreamSubscription<T> subscription) => null;
209 } 372 }
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
275 /** 438 /**
276 * Future completed when the stream sends its last event. 439 * Future completed when the stream sends its last event.
277 * 440 *
278 * This is also the future returned by [close]. 441 * This is also the future returned by [close].
279 */ 442 */
280 // TODO(lrn): Could this be stored in the varData field too, if it's not 443 // TODO(lrn): Could this be stored in the varData field too, if it's not
281 // accessed until the call to "close"? Then we need to special case if it's 444 // accessed until the call to "close"? Then we need to special case if it's
282 // accessed earlier, or if close is called before subscribing. 445 // accessed earlier, or if close is called before subscribing.
283 _Future _doneFuture; 446 _Future _doneFuture;
284 447
285 _StreamController(); 448 ControllerCallback onListen;
449 ControllerCallback onPause;
450 ControllerCallback onResume;
451 ControllerCancelCallback onCancel;
286 452
287 _NotificationHandler get _onListen; 453 _StreamController(this.onListen,
288 _NotificationHandler get _onPause; 454 this.onPause,
289 _NotificationHandler get _onResume; 455 this.onResume,
290 _NotificationHandler get _onCancel; 456 this.onCancel);
291 457
292 // Return a new stream every time. The streams are equal, but not identical. 458 // Return a new stream every time. The streams are equal, but not identical.
293 Stream<T> get stream => new _ControllerStream(this); 459 Stream<T> get stream => new _ControllerStream<T>(this);
294 460
295 /** 461 /**
296 * Returns a view of this object that only exposes the [StreamSink] interface. 462 * Returns a view of this object that only exposes the [StreamSink] interface.
297 */ 463 */
298 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); 464 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this);
299 465
300 /** 466 /**
301 * Whether a listener has existed and been canceled. 467 * Whether a listener has existed and been canceled.
302 * 468 *
303 * After this, adding more events will be ignored. 469 * After this, adding more events will be ignored.
(...skipping 17 matching lines...) Expand all
321 /** New events may not be added after close, or during addStream. */ 487 /** New events may not be added after close, or during addStream. */
322 bool get _mayAddEvent => (_state < _STATE_CLOSED); 488 bool get _mayAddEvent => (_state < _STATE_CLOSED);
323 489
324 // Returns the pending events. 490 // Returns the pending events.
325 // Pending events are events added before a subscription exists. 491 // Pending events are events added before a subscription exists.
326 // They are added to the subscription when it is created. 492 // They are added to the subscription when it is created.
327 // Pending events, if any, are kept in the _varData field until the 493 // Pending events, if any, are kept in the _varData field until the
328 // stream is listened to. 494 // stream is listened to.
329 // While adding a stream, pending events are moved into the 495 // While adding a stream, pending events are moved into the
330 // state object to allow the state object to use the _varData field. 496 // state object to allow the state object to use the _varData field.
331 _PendingEvents get _pendingEvents { 497 _PendingEvents<T> get _pendingEvents {
332 assert(_isInitialState); 498 assert(_isInitialState);
333 if (!_isAddingStream) { 499 if (!_isAddingStream) {
334 return _varData; 500 return _varData as Object /*=_PendingEvents<T>*/;
335 } 501 }
336 _StreamControllerAddStreamState state = _varData; 502 _StreamControllerAddStreamState<T> state =
337 return state.varData; 503 _varData as Object /*=_StreamControllerAddStreamState<T>*/;
504 return state.varData as Object /*=_PendingEvents<T>*/;
338 } 505 }
339 506
340 // Returns the pending events, and creates the object if necessary. 507 // Returns the pending events, and creates the object if necessary.
341 _StreamImplEvents _ensurePendingEvents() { 508 _StreamImplEvents<T> _ensurePendingEvents() {
342 assert(_isInitialState); 509 assert(_isInitialState);
343 if (!_isAddingStream) { 510 if (!_isAddingStream) {
344 if (_varData == null) _varData = new _StreamImplEvents(); 511 if (_varData == null) _varData = new _StreamImplEvents<T>();
345 return _varData; 512 return _varData as Object /*=_StreamImplEvents<T>*/;
346 } 513 }
347 _StreamControllerAddStreamState state = _varData; 514 _StreamControllerAddStreamState<T> state =
348 if (state.varData == null) state.varData = new _StreamImplEvents(); 515 _varData as Object /*=_StreamControllerAddStreamState<T>*/;
349 return state.varData; 516 if (state.varData == null) state.varData = new _StreamImplEvents<T>();
517 return state.varData as Object /*=_StreamImplEvents<T>*/;
350 } 518 }
351 519
352 // Get the current subscription. 520 // Get the current subscription.
353 // If we are adding a stream, the subscription is moved into the state 521 // If we are adding a stream, the subscription is moved into the state
354 // object to allow the state object to use the _varData field. 522 // object to allow the state object to use the _varData field.
355 _ControllerSubscription get _subscription { 523 _ControllerSubscription<T> get _subscription {
356 assert(hasListener); 524 assert(hasListener);
357 if (_isAddingStream) { 525 if (_isAddingStream) {
358 _StreamControllerAddStreamState addState = _varData; 526 _StreamControllerAddStreamState<T> addState =
359 return addState.varData; 527 _varData as Object /*=_StreamControllerAddStreamState<T>*/;
528 return addState.varData as Object /*=_ControllerSubscription<T>*/;
360 } 529 }
361 return _varData; 530 return _varData as Object /*=_ControllerSubscription<T>*/;
362 } 531 }
363 532
364 /** 533 /**
365 * Creates an error describing why an event cannot be added. 534 * Creates an error describing why an event cannot be added.
366 * 535 *
367 * The reason, and therefore the error message, depends on the current state. 536 * The reason, and therefore the error message, depends on the current state.
368 */ 537 */
369 Error _badEventState() { 538 Error _badEventState() {
370 if (isClosed) { 539 if (isClosed) {
371 return new StateError("Cannot add event after closing"); 540 return new StateError("Cannot add event after closing");
372 } 541 }
373 assert(_isAddingStream); 542 assert(_isAddingStream);
374 return new StateError("Cannot add event while adding a stream"); 543 return new StateError("Cannot add event while adding a stream");
375 } 544 }
376 545
377 // StreamSink interface. 546 // StreamSink interface.
378 Future addStream(Stream<T> source, {bool cancelOnError: true}) { 547 Future addStream(Stream<T> source, {bool cancelOnError: true}) {
379 if (!_mayAddEvent) throw _badEventState(); 548 if (!_mayAddEvent) throw _badEventState();
380 if (_isCanceled) return new _Future.immediate(null); 549 if (_isCanceled) return new _Future.immediate(null);
381 _StreamControllerAddStreamState addState = 550 _StreamControllerAddStreamState<T> addState =
382 new _StreamControllerAddStreamState(this, 551 new _StreamControllerAddStreamState<T>(this,
383 _varData, 552 _varData,
384 source, 553 source,
385 cancelOnError); 554 cancelOnError);
386 _varData = addState; 555 _varData = addState;
387 _state |= _STATE_ADDSTREAM; 556 _state |= _STATE_ADDSTREAM;
388 return addState.addStreamFuture; 557 return addState.addStreamFuture;
389 } 558 }
390 559
391 /** 560 /**
392 * Returns a future that is completed when the stream is done 561 * Returns a future that is completed when the stream is done
393 * processing events. 562 * processing events.
394 * 563 *
395 * This happens either when the done event has been sent, or if the 564 * This happens either when the done event has been sent, or if the
(...skipping 13 matching lines...) Expand all
409 */ 578 */
410 void add(T value) { 579 void add(T value) {
411 if (!_mayAddEvent) throw _badEventState(); 580 if (!_mayAddEvent) throw _badEventState();
412 _add(value); 581 _add(value);
413 } 582 }
414 583
415 /** 584 /**
416 * Send or enqueue an error event. 585 * Send or enqueue an error event.
417 */ 586 */
418 void addError(Object error, [StackTrace stackTrace]) { 587 void addError(Object error, [StackTrace stackTrace]) {
588 if (!_mayAddEvent) throw _badEventState();
419 error = _nonNullError(error); 589 error = _nonNullError(error);
420 if (!_mayAddEvent) throw _badEventState();
421 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); 590 AsyncError replacement = Zone.current.errorCallback(error, stackTrace);
422 if (replacement != null) { 591 if (replacement != null) {
423 error = _nonNullError(replacement.error); 592 error = _nonNullError(replacement.error);
424 stackTrace = replacement.stackTrace; 593 stackTrace = replacement.stackTrace;
425 } 594 }
426 _addError(error, stackTrace); 595 _addError(error, stackTrace);
427 } 596 }
428 597
429 /** 598 /**
430 * Closes this controller and sends a done event on the stream. 599 * Closes this controller and sends a done event on the stream.
431 * 600 *
432 * The first time a controller is closed, a "done" event is added to its 601 * The first time a controller is closed, a "done" event is added to its
433 * stream. 602 * stream.
434 * 603 *
435 * You are allowed to close the controller more than once, but only the first 604 * You are allowed to close the controller more than once, but only the first
436 * call has any effect. 605 * call has any effect.
437 * 606 *
438 * After closing, no further events may be added using [add] or [addError]. 607 * After closing, no further events may be added using [add], [addError]
608 * or [addStream].
439 * 609 *
440 * The returned future is completed when the done event has been delivered. 610 * The returned future is completed when the done event has been delivered.
441 */ 611 */
442 Future close() { 612 Future close() {
443 if (isClosed) { 613 if (isClosed) {
444 return _ensureDoneFuture(); 614 return _ensureDoneFuture();
445 } 615 }
446 if (!_mayAddEvent) throw _badEventState(); 616 if (!_mayAddEvent) throw _badEventState();
447 _closeUnchecked(); 617 _closeUnchecked();
448 return _ensureDoneFuture(); 618 return _ensureDoneFuture();
(...skipping 23 matching lines...) Expand all
472 if (hasListener) { 642 if (hasListener) {
473 _sendError(error, stackTrace); 643 _sendError(error, stackTrace);
474 } else if (_isInitialState) { 644 } else if (_isInitialState) {
475 _ensurePendingEvents().add(new _DelayedError(error, stackTrace)); 645 _ensurePendingEvents().add(new _DelayedError(error, stackTrace));
476 } 646 }
477 } 647 }
478 648
479 void _close() { 649 void _close() {
480 // End of addStream stream. 650 // End of addStream stream.
481 assert(_isAddingStream); 651 assert(_isAddingStream);
482 _StreamControllerAddStreamState addState = _varData; 652 _StreamControllerAddStreamState<T> addState =
653 _varData as Object /*=_StreamControllerAddStreamState<T>*/;
483 _varData = addState.varData; 654 _varData = addState.varData;
484 _state &= ~_STATE_ADDSTREAM; 655 _state &= ~_STATE_ADDSTREAM;
485 addState.complete(); 656 addState.complete();
486 } 657 }
487 658
488 // _StreamControllerLifeCycle interface 659 // _StreamControllerLifeCycle interface
489 660
490 StreamSubscription<T> _subscribe( 661 StreamSubscription<T> _subscribe(
491 void onData(T data), 662 void onData(T data),
492 Function onError, 663 Function onError,
493 void onDone(), 664 void onDone(),
494 bool cancelOnError) { 665 bool cancelOnError) {
495 if (!_isInitialState) { 666 if (!_isInitialState) {
496 throw new StateError("Stream has already been listened to."); 667 throw new StateError("Stream has already been listened to.");
497 } 668 }
498 _ControllerSubscription subscription = 669 _ControllerSubscription<T> subscription =
499 new _ControllerSubscription(this, onData, onError, onDone, 670 new _ControllerSubscription<T>(this, onData, onError, onDone,
500 cancelOnError); 671 cancelOnError);
501 672
502 _PendingEvents pendingEvents = _pendingEvents; 673 _PendingEvents<T> pendingEvents = _pendingEvents;
503 _state |= _STATE_SUBSCRIBED; 674 _state |= _STATE_SUBSCRIBED;
504 if (_isAddingStream) { 675 if (_isAddingStream) {
505 _StreamControllerAddStreamState addState = _varData; 676 _StreamControllerAddStreamState<T> addState =
677 _varData as Object /*=_StreamControllerAddStreamState<T>*/;
506 addState.varData = subscription; 678 addState.varData = subscription;
507 addState.resume(); 679 addState.resume();
508 } else { 680 } else {
509 _varData = subscription; 681 _varData = subscription;
510 } 682 }
511 subscription._setPendingEvents(pendingEvents); 683 subscription._setPendingEvents(pendingEvents);
512 subscription._guardCallback(() { 684 subscription._guardCallback(() {
513 _runGuarded(_onListen); 685 _runGuarded(onListen);
514 }); 686 });
515 687
516 return subscription; 688 return subscription;
517 } 689 }
518 690
519 Future _recordCancel(StreamSubscription<T> subscription) { 691 Future _recordCancel(StreamSubscription<T> subscription) {
520 // When we cancel, we first cancel any stream being added, 692 // When we cancel, we first cancel any stream being added,
521 // Then we call _onCancel, and finally the _doneFuture is completed. 693 // Then we call `onCancel`, and finally the _doneFuture is completed.
522 // If either of addStream's cancel or _onCancel returns a future, 694 // If either of addStream's cancel or `onCancel` returns a future,
523 // we wait for it before continuing. 695 // we wait for it before continuing.
524 // Any error during this process ends up in the returned future. 696 // Any error during this process ends up in the returned future.
525 // If more errors happen, we act as if it happens inside nested try/finallys 697 // If more errors happen, we act as if it happens inside nested try/finallys
526 // or whenComplete calls, and only the last error ends up in the 698 // or whenComplete calls, and only the last error ends up in the
527 // returned future. 699 // returned future.
528 Future result; 700 Future result;
529 if (_isAddingStream) { 701 if (_isAddingStream) {
530 _StreamControllerAddStreamState addState = _varData; 702 _StreamControllerAddStreamState<T> addState =
703 _varData as Object /*=_StreamControllerAddStreamState<T>*/;
531 result = addState.cancel(); 704 result = addState.cancel();
532 } 705 }
533 _varData = null; 706 _varData = null;
534 _state = 707 _state =
535 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; 708 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED;
536 709
537 if (_onCancel != null) { 710 if (onCancel != null) {
538 if (result == null) { 711 if (result == null) {
539 // Only introduce a future if one is needed. 712 // Only introduce a future if one is needed.
540 // If _onCancel returns null, no future is needed. 713 // If _onCancel returns null, no future is needed.
541 try { 714 try {
542 result = _onCancel(); 715 result = onCancel();
543 } catch (e, s) { 716 } catch (e, s) {
544 // Return the error in the returned future. 717 // Return the error in the returned future.
545 // Complete it asynchronously, so there is time for a listener 718 // Complete it asynchronously, so there is time for a listener
546 // to handle the error. 719 // to handle the error.
547 result = new _Future().._asyncCompleteError(e, s); 720 result = new _Future().._asyncCompleteError(e, s);
548 } 721 }
549 } else { 722 } else {
550 // Simpler case when we already know that we will return a future. 723 // Simpler case when we already know that we will return a future.
551 result = result.whenComplete(_onCancel); 724 result = result.whenComplete(onCancel);
552 } 725 }
553 } 726 }
554 727
555 void complete() { 728 void complete() {
556 if (_doneFuture != null && _doneFuture._mayComplete) { 729 if (_doneFuture != null && _doneFuture._mayComplete) {
557 _doneFuture._asyncComplete(null); 730 _doneFuture._asyncComplete(null);
558 } 731 }
559 } 732 }
560 733
561 if (result != null) { 734 if (result != null) {
562 result = result.whenComplete(complete); 735 result = result.whenComplete(complete);
563 } else { 736 } else {
564 complete(); 737 complete();
565 } 738 }
566 739
567 return result; 740 return result;
568 } 741 }
569 742
570 void _recordPause(StreamSubscription<T> subscription) { 743 void _recordPause(StreamSubscription<T> subscription) {
571 if (_isAddingStream) { 744 if (_isAddingStream) {
572 _StreamControllerAddStreamState addState = _varData; 745 _StreamControllerAddStreamState<T> addState =
746 _varData as Object /*=_StreamControllerAddStreamState<T>*/;
573 addState.pause(); 747 addState.pause();
574 } 748 }
575 _runGuarded(_onPause); 749 _runGuarded(onPause);
576 } 750 }
577 751
578 void _recordResume(StreamSubscription<T> subscription) { 752 void _recordResume(StreamSubscription<T> subscription) {
579 if (_isAddingStream) { 753 if (_isAddingStream) {
580 _StreamControllerAddStreamState addState = _varData; 754 _StreamControllerAddStreamState<T> addState =
755 _varData as Object /*=_StreamControllerAddStreamState<T>*/;
581 addState.resume(); 756 addState.resume();
582 } 757 }
583 _runGuarded(_onResume); 758 _runGuarded(onResume);
584 } 759 }
585 } 760 }
586 761
587 abstract class _SyncStreamControllerDispatch<T> 762 abstract class _SyncStreamControllerDispatch<T>
588 implements _StreamController<T> { 763 implements _StreamController<T>, SynchronousStreamController<T> {
764 int get _state;
765 void set _state(int state);
766
589 void _sendData(T data) { 767 void _sendData(T data) {
590 _subscription._add(data); 768 _subscription._add(data);
591 } 769 }
592 770
593 void _sendError(Object error, StackTrace stackTrace) { 771 void _sendError(Object error, StackTrace stackTrace) {
594 _subscription._addError(error, stackTrace); 772 _subscription._addError(error, stackTrace);
595 } 773 }
596 774
597 void _sendDone() { 775 void _sendDone() {
598 _subscription._close(); 776 _subscription._close();
599 } 777 }
600 } 778 }
601 779
602 abstract class _AsyncStreamControllerDispatch<T> 780 abstract class _AsyncStreamControllerDispatch<T>
603 implements _StreamController<T> { 781 implements _StreamController<T> {
604 void _sendData(T data) { 782 void _sendData(T data) {
605 _subscription._addPending(new _DelayedData(data)); 783 _subscription._addPending(new _DelayedData<dynamic /*=T*/>(data));
606 } 784 }
607 785
608 void _sendError(Object error, StackTrace stackTrace) { 786 void _sendError(Object error, StackTrace stackTrace) {
609 _subscription._addPending(new _DelayedError(error, stackTrace)); 787 _subscription._addPending(new _DelayedError(error, stackTrace));
610 } 788 }
611 789
612 void _sendDone() { 790 void _sendDone() {
613 _subscription._addPending(const _DelayedDone()); 791 _subscription._addPending(const _DelayedDone());
614 } 792 }
615 } 793 }
616 794
617 // TODO(lrn): Use common superclass for callback-controllers when VM supports 795 // TODO(lrn): Use common superclass for callback-controllers when VM supports
618 // constructors in mixin superclasses. 796 // constructors in mixin superclasses.
619 797
620 class _AsyncStreamController<T> extends _StreamController<T> 798 class _AsyncStreamController<T> = _StreamController<T>
621 with _AsyncStreamControllerDispatch<T> { 799 with _AsyncStreamControllerDispatch<T>;
622 final _NotificationHandler _onListen;
623 final _NotificationHandler _onPause;
624 final _NotificationHandler _onResume;
625 final _NotificationHandler _onCancel;
626 800
627 _AsyncStreamController(void this._onListen(), 801 class _SyncStreamController<T> = _StreamController<T>
628 void this._onPause(), 802 with _SyncStreamControllerDispatch<T>;
629 void this._onResume(),
630 this._onCancel());
631 }
632
633 class _SyncStreamController<T> extends _StreamController<T>
634 with _SyncStreamControllerDispatch<T> {
635 final _NotificationHandler _onListen;
636 final _NotificationHandler _onPause;
637 final _NotificationHandler _onResume;
638 final _NotificationHandler _onCancel;
639
640 _SyncStreamController(void this._onListen(),
641 void this._onPause(),
642 void this._onResume(),
643 this._onCancel());
644 }
645
646 abstract class _NoCallbacks {
647 _NotificationHandler get _onListen => null;
648 _NotificationHandler get _onPause => null;
649 _NotificationHandler get _onResume => null;
650 _NotificationHandler get _onCancel => null;
651 }
652
653 class _NoCallbackAsyncStreamController<T> = _StreamController<T>
654 with _AsyncStreamControllerDispatch<T>, _NoCallbacks;
655
656 class _NoCallbackSyncStreamController<T> = _StreamController<T>
657 with _SyncStreamControllerDispatch<T>, _NoCallbacks;
658 803
659 typedef _NotificationHandler(); 804 typedef _NotificationHandler();
660 805
661 Future _runGuarded(_NotificationHandler notificationHandler) { 806 Future _runGuarded(_NotificationHandler notificationHandler) {
662 if (notificationHandler == null) return null; 807 if (notificationHandler == null) return null;
663 try { 808 try {
664 var result = notificationHandler(); 809 var result = notificationHandler();
665 if (result is Future) return result; 810 if (result is Future) return result;
666 return null; 811 return null;
667 } catch (e, s) { 812 } catch (e, s) {
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
765 910
766 /** 911 /**
767 * Stop adding the stream. 912 * Stop adding the stream.
768 * 913 *
769 * Complete the future returned by `StreamController.addStream` when 914 * Complete the future returned by `StreamController.addStream` when
770 * the cancel is complete. 915 * the cancel is complete.
771 * 916 *
772 * Return a future if the cancel takes time, otherwise return `null`. 917 * Return a future if the cancel takes time, otherwise return `null`.
773 */ 918 */
774 Future cancel() { 919 Future cancel() {
775 var cancel2 = addSubscription.cancel(); 920 var cancel = addSubscription.cancel();
776 if (cancel2 == null) { 921 if (cancel == null) {
777 addStreamFuture._asyncComplete(null); 922 addStreamFuture._asyncComplete(null);
778 return null; 923 return null;
779 } 924 }
780 return cancel2.whenComplete(() { addStreamFuture._asyncComplete(null); }); 925 return cancel.whenComplete(() { addStreamFuture._asyncComplete(null); });
781 } 926 }
782 927
783 void complete() { 928 void complete() {
784 addStreamFuture._asyncComplete(null); 929 addStreamFuture._asyncComplete(null);
785 } 930 }
786 } 931 }
787 932
788 class _StreamControllerAddStreamState<T> extends _AddStreamState<T> { 933 class _StreamControllerAddStreamState<T> extends _AddStreamState<T> {
789 // The subscription or pending data of a _StreamController. 934 // The subscription or pending data of a _StreamController.
790 // Stored here because we reuse the `_varData` field in the _StreamController 935 // Stored here because we reuse the `_varData` field in the _StreamController
791 // to store this state object. 936 // to store this state object.
792 var varData; 937 var varData;
793 938
794 _StreamControllerAddStreamState(_StreamController controller, 939 _StreamControllerAddStreamState(_StreamController<T> controller,
795 this.varData, 940 this.varData,
796 Stream source, 941 Stream source,
797 bool cancelOnError) 942 bool cancelOnError)
798 : super(controller, source, cancelOnError) { 943 : super(controller, source, cancelOnError) {
799 if (controller.isPaused) { 944 if (controller.isPaused) {
800 addSubscription.pause(); 945 addSubscription.pause();
801 } 946 }
802 } 947 }
803 } 948 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698