| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| 11 /** | 11 /** |
| 12 * Type of a stream controller's `onListen`, `onPause` and `onResume` callbacks. |
| 13 */ |
| 14 typedef void ControllerCallback(); |
| 15 |
| 16 /** |
| 17 * Type of stream controller `onCancel` callbacks. |
| 18 * |
| 19 * The callback may return either `void` or a future. |
| 20 */ |
| 21 typedef ControllerCancelCallback(); |
| 22 |
| 23 /** |
| 12 * A controller with the stream it controls. | 24 * A controller with the stream it controls. |
| 13 * | 25 * |
| 14 * This controller allows sending data, error and done events on | 26 * This controller allows sending data, error and done events on |
| 15 * its [stream]. | 27 * its [stream]. |
| 16 * This class can be used to create a simple stream that others | 28 * This class can be used to create a simple stream that others |
| 17 * can listen on, and to push events to that stream. | 29 * can listen on, and to push events to that stream. |
| 18 * | 30 * |
| 19 * It's possible to check whether the stream is paused or not, and whether | 31 * It's possible to check whether the stream is paused or not, and whether |
| 20 * it has subscribers or not, as well as getting a callback when either of | 32 * it has subscribers or not, as well as getting a callback when either of |
| 21 * these change. | 33 * these change. |
| (...skipping 24 matching lines...) Expand all Loading... |
| 46 * the stream at all, and won't trigger callbacks. From the controller's point | 58 * the stream at all, and won't trigger callbacks. From the controller's point |
| 47 * of view, the stream is completely inert when has completed. | 59 * of view, the stream is completely inert when has completed. |
| 48 */ | 60 */ |
| 49 abstract class StreamController<T> implements StreamSink<T> { | 61 abstract class StreamController<T> implements StreamSink<T> { |
| 50 /** The stream that this controller is controlling. */ | 62 /** The stream that this controller is controlling. */ |
| 51 Stream<T> get stream; | 63 Stream<T> get stream; |
| 52 | 64 |
| 53 /** | 65 /** |
| 54 * A controller with a [stream] that supports only one single subscriber. | 66 * A controller with a [stream] that supports only one single subscriber. |
| 55 * | 67 * |
| 56 * If [sync] is true, events may be passed directly to the stream's listener | 68 * If [sync] is true, the returned stream controller is a |
| 57 * during an [add], [addError] or [close] call. If [sync] is false, the event | 69 * [SynchronousStreamController], and must be used with the care |
| 58 * will be passed to the listener at a later time, after the code creating | 70 * and attention necessary to not break the [Stream] contract. |
| 59 * the event has returned. | 71 * See [Completer.sync] for some explanations on when a synchronous |
| 72 * dispatching can be used. |
| 73 * If in doubt, keep the controller non-sync. |
| 60 * | 74 * |
| 61 * The controller will buffer all incoming events until the subscriber is | 75 * A Stream should be inert until a subscriber starts listening on it (using |
| 62 * registered. | 76 * the [onListen] callback to start producing events). Streams should not |
| 77 * leak resources (like websockets) when no user ever listens on the stream. |
| 78 * |
| 79 * The controller buffers all incoming events until a subscriber is |
| 80 * registered, but this feature should only be used in rare circumstances. |
| 63 * | 81 * |
| 64 * The [onPause] function is called when the stream becomes | 82 * The [onPause] function is called when the stream becomes |
| 65 * paused. [onResume] is called when the stream resumed. | 83 * paused. [onResume] is called when the stream resumed. |
| 66 * | 84 * |
| 67 * The [onListen] callback is called when the stream | 85 * The [onListen] callback is called when the stream |
| 68 * receives its listener and [onCancel] when the listener ends | 86 * receives its listener and [onCancel] when the listener ends |
| 69 * its subscription. If [onCancel] needs to perform an asynchronous operation, | 87 * its subscription. If [onCancel] needs to perform an asynchronous operation, |
| 70 * [onCancel] should return a future that completes when the cancel operation | 88 * [onCancel] should return a future that completes when the cancel operation |
| 71 * is done. | 89 * is done. |
| 72 * | 90 * |
| 73 * If the stream is canceled before the controller needs new data the | 91 * If the stream is canceled before the controller needs new data the |
| 74 * [onResume] call might not be executed. | 92 * [onResume] call might not be executed. |
| 75 */ | 93 */ |
| 76 factory StreamController({void onListen(), | 94 factory StreamController({void onListen(), |
| 77 void onPause(), | 95 void onPause(), |
| 78 void onResume(), | 96 void onResume(), |
| 79 onCancel(), | 97 onCancel(), |
| 80 bool sync: false}) { | 98 bool sync: false}) { |
| 81 if (onListen == null && onPause == null && | |
| 82 onResume == null && onCancel == null) { | |
| 83 return sync | |
| 84 ? new _NoCallbackSyncStreamController<T>() | |
| 85 : new _NoCallbackAsyncStreamController<T>(); | |
| 86 } | |
| 87 return sync | 99 return sync |
| 88 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) | 100 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) |
| 89 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); | 101 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); |
| 90 } | 102 } |
| 91 | 103 |
| 92 /** | 104 /** |
| 93 * A controller where [stream] can be listened to more than once. | 105 * A controller where [stream] can be listened to more than once. |
| 94 * | 106 * |
| 95 * The [Stream] returned by [stream] is a broadcast stream. | 107 * The [Stream] returned by [stream] is a broadcast stream. |
| 96 * It can be listened to more than once. | 108 * It can be listened to more than once. |
| 97 * | 109 * |
| 110 * A Stream should be inert until a subscriber starts listening on it (using |
| 111 * the [onListen] callback to start producing events). Streams should not |
| 112 * leak resources (like websockets) when no user ever listens on the stream. |
| 113 * |
| 114 * Broadcast streams do not buffer events when there is no listener. |
| 115 * |
| 98 * The controller distributes any events to all currently subscribed | 116 * The controller distributes any events to all currently subscribed |
| 99 * listeners at the time when [add], [addError] or [close] is called. | 117 * listeners at the time when [add], [addError] or [close] is called. |
| 100 * It is not allowed to call `add`, `addError`, or `close` before a previous | 118 * It is not allowed to call `add`, `addError`, or `close` before a previous |
| 101 * call has returned. The controller does not have any internal queue of | 119 * call has returned. The controller does not have any internal queue of |
| 102 * events, and if there are no listeners at the time the event is added, | 120 * events, and if there are no listeners at the time the event is added, |
| 103 * it will just be dropped, or, if it is an error, be reported as uncaught. | 121 * it will just be dropped, or, if it is an error, be reported as uncaught. |
| 104 * | 122 * |
| 105 * Each listener subscription is handled independently, | 123 * Each listener subscription is handled independently, |
| 106 * and if one pauses, only the pausing listener is affected. | 124 * and if one pauses, only the pausing listener is affected. |
| 107 * A paused listener will buffer events internally until unpaused or canceled. | 125 * A paused listener will buffer events internally until unpaused or canceled. |
| 108 * | 126 * |
| 109 * If [sync] is true, events may be fired directly by the stream's | 127 * If [sync] is true, events may be fired directly by the stream's |
| 110 * subscriptions during an [add], [addError] or [close] call. | 128 * subscriptions during an [add], [addError] or [close] call. |
| 111 * If [sync] is false, the event will be fired at a later time, | 129 * The returned stream controller is a [SynchronousStreamController], |
| 130 * and must be used with the care and attention necessary to not break |
| 131 * the [Stream] contract. |
| 132 * See [Completer.sync] for some explanations on when a synchronous |
| 133 * dispatching can be used. |
| 134 * If in doubt, keep the controller non-sync. |
| 135 * |
| 136 * If [sync] is false, the event will always be fired at a later time, |
| 112 * after the code adding the event has completed. | 137 * after the code adding the event has completed. |
| 113 * | 138 * In that case, no guarantees are given with regard to when |
| 114 * When [sync] is false, no guarantees are given with regard to when | |
| 115 * multiple listeners get the events, except that each listener will get | 139 * multiple listeners get the events, except that each listener will get |
| 116 * all events in the correct order. Each subscription handles the events | 140 * all events in the correct order. Each subscription handles the events |
| 117 * individually. | 141 * individually. |
| 118 * If two events are sent on an async controller with two listeners, | 142 * If two events are sent on an async controller with two listeners, |
| 119 * one of the listeners may get both events | 143 * one of the listeners may get both events |
| 120 * before the other listener gets any. | 144 * before the other listener gets any. |
| 121 * A listener must be subscribed both when the event is initiated | 145 * A listener must be subscribed both when the event is initiated |
| 122 * (that is, when [add] is called) | 146 * (that is, when [add] is called) |
| 123 * and when the event is later delivered, | 147 * and when the event is later delivered, |
| 124 * in order to receive the event. | 148 * in order to receive the event. |
| 125 * | 149 * |
| 126 * The [onListen] callback is called when the first listener is subscribed, | 150 * The [onListen] callback is called when the first listener is subscribed, |
| 127 * and the [onCancel] is called when there are no longer any active listeners. | 151 * and the [onCancel] is called when there are no longer any active listeners. |
| 128 * If a listener is added again later, after the [onCancel] was called, | 152 * If a listener is added again later, after the [onCancel] was called, |
| 129 * the [onListen] will be called again. | 153 * the [onListen] will be called again. |
| 130 */ | 154 */ |
| 131 factory StreamController.broadcast({void onListen(), | 155 factory StreamController.broadcast({void onListen(), |
| 132 void onCancel(), | 156 void onCancel(), |
| 133 bool sync: false}) { | 157 bool sync: false}) { |
| 134 return sync | 158 return sync |
| 135 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) | 159 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) |
| 136 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); | 160 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); |
| 137 } | 161 } |
| 138 | 162 |
| 139 /** | 163 /** |
| 164 * The callback which is called when the stream is listened to. |
| 165 * |
| 166 * May be set to `null`, in which case no callback will happen. |
| 167 */ |
| 168 ControllerCallback get onListen; |
| 169 |
| 170 void set onListen(void onListenHandler()); |
| 171 |
| 172 /** |
| 173 * The callback which is called when the stream is paused. |
| 174 * |
| 175 * May be set to `null`, in which case no callback will happen. |
| 176 * |
| 177 * Pause related callbacks are not supported on broadcast stream controllers. |
| 178 */ |
| 179 ControllerCallback get onPause; |
| 180 |
| 181 void set onPause(void onPauseHandler()); |
| 182 |
| 183 /** |
| 184 * The callback which is called when the stream is resumed. |
| 185 * |
| 186 * May be set to `null`, in which case no callback will happen. |
| 187 * |
| 188 * Pause related callbacks are not supported on broadcast stream controllers. |
| 189 */ |
| 190 ControllerCallback get onResume; |
| 191 |
| 192 void set onResume(void onResumeHandler()); |
| 193 |
| 194 /** |
| 195 * The callback which is called when the stream is canceled. |
| 196 * |
| 197 * May be set to `null`, in which case no callback will happen. |
| 198 */ |
| 199 ControllerCancelCallback get onCancel; |
| 200 |
| 201 void set onCancel(onCancelHandler()); |
| 202 |
| 203 /** |
| 140 * Returns a view of this object that only exposes the [StreamSink] interface. | 204 * Returns a view of this object that only exposes the [StreamSink] interface. |
| 141 */ | 205 */ |
| 142 StreamSink<T> get sink; | 206 StreamSink<T> get sink; |
| 143 | 207 |
| 144 /** | 208 /** |
| 145 * Whether the stream is closed for adding more events. | 209 * Whether the stream controller is closed for adding more events. |
| 146 * | 210 * |
| 147 * If true, the "done" event might not have fired yet, but it has been | 211 * The controller becomes closed by calling the [close] method. |
| 148 * scheduled, and it is too late to add more events. | 212 * New events cannot be added, by calling [add] or [addError], |
| 213 * to a closed controller. |
| 214 * |
| 215 * If the controller is closed, |
| 216 * the "done" event might not have been delivered yet, |
| 217 * but it has been scheduled, and it is too late to add more events. |
| 149 */ | 218 */ |
| 150 bool get isClosed; | 219 bool get isClosed; |
| 151 | 220 |
| 152 /** | 221 /** |
| 153 * Whether the subscription would need to buffer events. | 222 * Whether the subscription would need to buffer events. |
| 154 * | 223 * |
| 155 * This is the case if the controller's stream has a listener and it is | 224 * This is the case if the controller's stream has a listener and it is |
| 156 * paused, or if it has not received a listener yet. In that case, the | 225 * paused, or if it has not received a listener yet. In that case, the |
| 157 * controller is considered paused as well. | 226 * controller is considered paused as well. |
| 158 * | 227 * |
| 159 * A broadcast stream controller is never considered paused. It always | 228 * A broadcast stream controller is never considered paused. It always |
| 160 * forwards its events to all uncanceled listeners, if any, and let them | 229 * forwards its events to all uncanceled subscriptions, if any, |
| 161 * handle their own pausing. | 230 * and let the subscriptions handle their own pausing and buffering. |
| 162 */ | 231 */ |
| 163 bool get isPaused; | 232 bool get isPaused; |
| 164 | 233 |
| 165 /** Whether there is a subscriber on the [Stream]. */ | 234 /** Whether there is a subscriber on the [Stream]. */ |
| 166 bool get hasListener; | 235 bool get hasListener; |
| 167 | 236 |
| 168 /** | 237 /** |
| 169 * Send or enqueue an error event. | 238 * Send or enqueue an error event. |
| 170 * | 239 * |
| 171 * If [error] is `null`, it is replaced by a [NullThrownError]. | 240 * If [error] is `null`, it is replaced by a [NullThrownError]. |
| 172 * | |
| 173 * Also allows an objection stack trace object, on top of what [EventSink] | |
| 174 * allows. | |
| 175 */ | 241 */ |
| 176 void addError(Object error, [StackTrace stackTrace]); | 242 void addError(Object error, [StackTrace stackTrace]); |
| 177 | 243 |
| 178 /** | 244 /** |
| 179 * Receives events from [source] and puts them into this controller's stream. | 245 * Receives events from [source] and puts them into this controller's stream. |
| 180 * | 246 * |
| 181 * Returns a future which completes when the source stream is done. | 247 * Returns a future which completes when the source stream is done. |
| 182 * | 248 * |
| 183 * Events must not be added directly to this controller using [add], | 249 * Events must not be added directly to this controller using [add], |
| 184 * [addError], [close] or [addStream], until the returned future | 250 * [addError], [close] or [addStream], until the returned future |
| 185 * is complete. | 251 * is complete. |
| 186 * | 252 * |
| 187 * Data and error events are forwarded to this controller's stream. A done | 253 * Data and error events are forwarded to this controller's stream. A done |
| 188 * event on the source will end the `addStream` operation and complete the | 254 * event on the source will end the `addStream` operation and complete the |
| 189 * returned future. | 255 * returned future. |
| 190 * | 256 * |
| 191 * If [cancelOnError] is true, only the first error on [source] is | 257 * If [cancelOnError] is true, only the first error on [source] is |
| 192 * forwarded to the controller's stream, and the `addStream` ends | 258 * forwarded to the controller's stream, and the `addStream` ends |
| 193 * after this. If [cancelOnError] is false, all errors are forwarded | 259 * after this. If [cancelOnError] is false, all errors are forwarded |
| 194 * and only a done event will end the `addStream`. | 260 * and only a done event will end the `addStream`. |
| 195 */ | 261 */ |
| 196 Future addStream(Stream<T> source, {bool cancelOnError: true}); | 262 Future addStream(Stream<T> source, {bool cancelOnError: true}); |
| 197 } | 263 } |
| 198 | 264 |
| 199 | 265 |
| 266 /** |
| 267 * A stream controller that delivers its events synchronously. |
| 268 * |
| 269 * A synchronous stream controller is intended for cases where |
| 270 * an already asynchronous event triggers an event on a stream. |
| 271 * |
| 272 * Instead of adding the event to the stream in a later microtask, |
| 273 * causing extra latency, the event is instead fired immediately by the |
| 274 * synchronous stream controller, as if the stream event was |
| 275 * the current event or microtask. |
| 276 * |
| 277 * The synchronous stream controller can be used to break the contract |
| 278 * on [Stream], and it must be used carefully to avoid doing so. |
| 279 * |
| 280 * The only advantage to using a [SynchronousStreamController] over a |
| 281 * normal [StreamController] is the improved latency. |
| 282 * Only use the synchronous version if the improvement is significant, |
| 283 * and if its use is safe. Otherwise just use a normal stream controller, |
| 284 * which will always have the correct behavior for a [Stream], and won't |
| 285 * accidentally break other code. |
| 286 * |
| 287 * Adding events to a synchronous controller should only happen as the |
| 288 * very last part of a the handling of the original event. |
| 289 * At that point, adding an event to the stream is equivalent to |
| 290 * returning to the event loop and adding the event in the next microtask. |
| 291 * |
| 292 * Each listener callback will be run as if it was a top-level event |
| 293 * or microtask. This means that if it throws, the error will be reported as |
| 294 * uncaught as soon as possible. |
| 295 * This is one reason to add the event as the last thing in the original event |
| 296 * handler - any action done after adding the event will delay the report of |
| 297 * errors in the event listener callbacks. |
| 298 * |
| 299 * If an event is added in a setting that isn't known to be another event, |
| 300 * it may cause the stream's listener to get that event before the listener |
| 301 * is ready to handle it. We promise that after calling [Stream.listen], |
| 302 * you won't get any events until the code doing the listen has completed. |
| 303 * Calling [add] in response to a function call of unknown origin may break |
| 304 * that promise. |
| 305 * |
| 306 * An [onListen] callback from the controller is *not* an asynchronous event, |
| 307 * and adding events to the controller in the `onListen` callback is always |
| 308 * wrong. The events will be delivered before the listener has even received |
| 309 * the subscription yet. |
| 310 * |
| 311 * The synchronous broadcast stream controller also has a restrictions that a |
| 312 * normal stream controller does not: |
| 313 * The [add], [addError], [close] and [addStream] methods *must not* be |
| 314 * called while an event is being delivered. |
| 315 * That is, if a callback on a subscription on the controller's stream causes |
| 316 * a call to any of the functions above, the call will fail. |
| 317 * A broadcast stream may have more than one listener, and if an |
| 318 * event is added synchronously while another is being also in the process |
| 319 * of being added, the latter event might reach some listeners before |
| 320 * the former. To prevent that, an event cannot be added while a previous |
| 321 * event is being fired. |
| 322 * This guarantees that an event is fully delivered when the |
| 323 * first [add], [addError] or [close] returns, |
| 324 * and further events will be delivered in the correct order. |
| 325 * |
| 326 * This still only guarantees that the event is delivered to the subscription. |
| 327 * If the subscription is paused, the actual callback may still happen later, |
| 328 * and the event will instead be buffered by the subscription. |
| 329 * Barring pausing, and the following buffered events that haven't been |
| 330 * delivered yet, callbacks will be called synchronously when an event is added. |
| 331 * |
| 332 * Adding an event to a synchronous non-broadcast stream controller while |
| 333 * another event is in progress may cause the second event to be delayed |
| 334 * and not be delivered synchronously, and until that event is delivered, |
| 335 * the controller will not act synchronously. |
| 336 */ |
| 337 abstract class SynchronousStreamController<T> implements StreamController<T> { |
| 338 /** |
| 339 * Adds event to the controller's stream. |
| 340 * |
| 341 * As [StreamController.add], but must not be called while an event is |
| 342 * being added by [add], [addError] or [close]. |
| 343 */ |
| 344 void add(T data); |
| 345 |
| 346 /** |
| 347 * Adds error to the controller's stream. |
| 348 * |
| 349 * As [StreamController.addError], but must not be called while an event is |
| 350 * being added by [add], [addError] or [close]. |
| 351 */ |
| 352 void addError(Object error, [StackTrace stackTrace]); |
| 353 |
| 354 /** |
| 355 * Closes the controller's stream. |
| 356 * |
| 357 * As [StreamController.close], but must not be called while an event is |
| 358 * being added by [add], [addError] or [close]. |
| 359 */ |
| 360 Future close(); |
| 361 } |
| 362 |
| 200 abstract class _StreamControllerLifecycle<T> { | 363 abstract class _StreamControllerLifecycle<T> { |
| 201 StreamSubscription<T> _subscribe( | 364 StreamSubscription<T> _subscribe( |
| 202 void onData(T data), | 365 void onData(T data), |
| 203 Function onError, | 366 Function onError, |
| 204 void onDone(), | 367 void onDone(), |
| 205 bool cancelOnError); | 368 bool cancelOnError); |
| 206 void _recordPause(StreamSubscription<T> subscription) {} | 369 void _recordPause(StreamSubscription<T> subscription) {} |
| 207 void _recordResume(StreamSubscription<T> subscription) {} | 370 void _recordResume(StreamSubscription<T> subscription) {} |
| 208 Future _recordCancel(StreamSubscription<T> subscription) => null; | 371 Future _recordCancel(StreamSubscription<T> subscription) => null; |
| 209 } | 372 } |
| (...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 275 /** | 438 /** |
| 276 * Future completed when the stream sends its last event. | 439 * Future completed when the stream sends its last event. |
| 277 * | 440 * |
| 278 * This is also the future returned by [close]. | 441 * This is also the future returned by [close]. |
| 279 */ | 442 */ |
| 280 // TODO(lrn): Could this be stored in the varData field too, if it's not | 443 // TODO(lrn): Could this be stored in the varData field too, if it's not |
| 281 // accessed until the call to "close"? Then we need to special case if it's | 444 // accessed until the call to "close"? Then we need to special case if it's |
| 282 // accessed earlier, or if close is called before subscribing. | 445 // accessed earlier, or if close is called before subscribing. |
| 283 _Future _doneFuture; | 446 _Future _doneFuture; |
| 284 | 447 |
| 285 _StreamController(); | 448 ControllerCallback onListen; |
| 449 ControllerCallback onPause; |
| 450 ControllerCallback onResume; |
| 451 ControllerCancelCallback onCancel; |
| 286 | 452 |
| 287 _NotificationHandler get _onListen; | 453 _StreamController(this.onListen, |
| 288 _NotificationHandler get _onPause; | 454 this.onPause, |
| 289 _NotificationHandler get _onResume; | 455 this.onResume, |
| 290 _NotificationHandler get _onCancel; | 456 this.onCancel); |
| 291 | 457 |
| 292 // Return a new stream every time. The streams are equal, but not identical. | 458 // Return a new stream every time. The streams are equal, but not identical. |
| 293 Stream<T> get stream => new _ControllerStream(this); | 459 Stream<T> get stream => new _ControllerStream<T>(this); |
| 294 | 460 |
| 295 /** | 461 /** |
| 296 * Returns a view of this object that only exposes the [StreamSink] interface. | 462 * Returns a view of this object that only exposes the [StreamSink] interface. |
| 297 */ | 463 */ |
| 298 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); | 464 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); |
| 299 | 465 |
| 300 /** | 466 /** |
| 301 * Whether a listener has existed and been canceled. | 467 * Whether a listener has existed and been canceled. |
| 302 * | 468 * |
| 303 * After this, adding more events will be ignored. | 469 * After this, adding more events will be ignored. |
| (...skipping 17 matching lines...) Expand all Loading... |
| 321 /** New events may not be added after close, or during addStream. */ | 487 /** New events may not be added after close, or during addStream. */ |
| 322 bool get _mayAddEvent => (_state < _STATE_CLOSED); | 488 bool get _mayAddEvent => (_state < _STATE_CLOSED); |
| 323 | 489 |
| 324 // Returns the pending events. | 490 // Returns the pending events. |
| 325 // Pending events are events added before a subscription exists. | 491 // Pending events are events added before a subscription exists. |
| 326 // They are added to the subscription when it is created. | 492 // They are added to the subscription when it is created. |
| 327 // Pending events, if any, are kept in the _varData field until the | 493 // Pending events, if any, are kept in the _varData field until the |
| 328 // stream is listened to. | 494 // stream is listened to. |
| 329 // While adding a stream, pending events are moved into the | 495 // While adding a stream, pending events are moved into the |
| 330 // state object to allow the state object to use the _varData field. | 496 // state object to allow the state object to use the _varData field. |
| 331 _PendingEvents get _pendingEvents { | 497 _PendingEvents<T> get _pendingEvents { |
| 332 assert(_isInitialState); | 498 assert(_isInitialState); |
| 333 if (!_isAddingStream) { | 499 if (!_isAddingStream) { |
| 334 return _varData; | 500 return _varData as Object /*=_PendingEvents<T>*/; |
| 335 } | 501 } |
| 336 _StreamControllerAddStreamState state = _varData; | 502 _StreamControllerAddStreamState<T> state = |
| 337 return state.varData; | 503 _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
| 504 return state.varData as Object /*=_PendingEvents<T>*/; |
| 338 } | 505 } |
| 339 | 506 |
| 340 // Returns the pending events, and creates the object if necessary. | 507 // Returns the pending events, and creates the object if necessary. |
| 341 _StreamImplEvents _ensurePendingEvents() { | 508 _StreamImplEvents<T> _ensurePendingEvents() { |
| 342 assert(_isInitialState); | 509 assert(_isInitialState); |
| 343 if (!_isAddingStream) { | 510 if (!_isAddingStream) { |
| 344 if (_varData == null) _varData = new _StreamImplEvents(); | 511 if (_varData == null) _varData = new _StreamImplEvents<T>(); |
| 345 return _varData; | 512 return _varData as Object /*=_StreamImplEvents<T>*/; |
| 346 } | 513 } |
| 347 _StreamControllerAddStreamState state = _varData; | 514 _StreamControllerAddStreamState<T> state = |
| 348 if (state.varData == null) state.varData = new _StreamImplEvents(); | 515 _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
| 349 return state.varData; | 516 if (state.varData == null) state.varData = new _StreamImplEvents<T>(); |
| 517 return state.varData as Object /*=_StreamImplEvents<T>*/; |
| 350 } | 518 } |
| 351 | 519 |
| 352 // Get the current subscription. | 520 // Get the current subscription. |
| 353 // If we are adding a stream, the subscription is moved into the state | 521 // If we are adding a stream, the subscription is moved into the state |
| 354 // object to allow the state object to use the _varData field. | 522 // object to allow the state object to use the _varData field. |
| 355 _ControllerSubscription get _subscription { | 523 _ControllerSubscription<T> get _subscription { |
| 356 assert(hasListener); | 524 assert(hasListener); |
| 357 if (_isAddingStream) { | 525 if (_isAddingStream) { |
| 358 _StreamControllerAddStreamState addState = _varData; | 526 _StreamControllerAddStreamState<T> addState = |
| 359 return addState.varData; | 527 _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
| 528 return addState.varData as Object /*=_ControllerSubscription<T>*/; |
| 360 } | 529 } |
| 361 return _varData; | 530 return _varData as Object /*=_ControllerSubscription<T>*/; |
| 362 } | 531 } |
| 363 | 532 |
| 364 /** | 533 /** |
| 365 * Creates an error describing why an event cannot be added. | 534 * Creates an error describing why an event cannot be added. |
| 366 * | 535 * |
| 367 * The reason, and therefore the error message, depends on the current state. | 536 * The reason, and therefore the error message, depends on the current state. |
| 368 */ | 537 */ |
| 369 Error _badEventState() { | 538 Error _badEventState() { |
| 370 if (isClosed) { | 539 if (isClosed) { |
| 371 return new StateError("Cannot add event after closing"); | 540 return new StateError("Cannot add event after closing"); |
| 372 } | 541 } |
| 373 assert(_isAddingStream); | 542 assert(_isAddingStream); |
| 374 return new StateError("Cannot add event while adding a stream"); | 543 return new StateError("Cannot add event while adding a stream"); |
| 375 } | 544 } |
| 376 | 545 |
| 377 // StreamSink interface. | 546 // StreamSink interface. |
| 378 Future addStream(Stream<T> source, {bool cancelOnError: true}) { | 547 Future addStream(Stream<T> source, {bool cancelOnError: true}) { |
| 379 if (!_mayAddEvent) throw _badEventState(); | 548 if (!_mayAddEvent) throw _badEventState(); |
| 380 if (_isCanceled) return new _Future.immediate(null); | 549 if (_isCanceled) return new _Future.immediate(null); |
| 381 _StreamControllerAddStreamState addState = | 550 _StreamControllerAddStreamState<T> addState = |
| 382 new _StreamControllerAddStreamState(this, | 551 new _StreamControllerAddStreamState<T>(this, |
| 383 _varData, | 552 _varData, |
| 384 source, | 553 source, |
| 385 cancelOnError); | 554 cancelOnError); |
| 386 _varData = addState; | 555 _varData = addState; |
| 387 _state |= _STATE_ADDSTREAM; | 556 _state |= _STATE_ADDSTREAM; |
| 388 return addState.addStreamFuture; | 557 return addState.addStreamFuture; |
| 389 } | 558 } |
| 390 | 559 |
| 391 /** | 560 /** |
| 392 * Returns a future that is completed when the stream is done | 561 * Returns a future that is completed when the stream is done |
| 393 * processing events. | 562 * processing events. |
| 394 * | 563 * |
| 395 * This happens either when the done event has been sent, or if the | 564 * This happens either when the done event has been sent, or if the |
| (...skipping 13 matching lines...) Expand all Loading... |
| 409 */ | 578 */ |
| 410 void add(T value) { | 579 void add(T value) { |
| 411 if (!_mayAddEvent) throw _badEventState(); | 580 if (!_mayAddEvent) throw _badEventState(); |
| 412 _add(value); | 581 _add(value); |
| 413 } | 582 } |
| 414 | 583 |
| 415 /** | 584 /** |
| 416 * Send or enqueue an error event. | 585 * Send or enqueue an error event. |
| 417 */ | 586 */ |
| 418 void addError(Object error, [StackTrace stackTrace]) { | 587 void addError(Object error, [StackTrace stackTrace]) { |
| 588 if (!_mayAddEvent) throw _badEventState(); |
| 419 error = _nonNullError(error); | 589 error = _nonNullError(error); |
| 420 if (!_mayAddEvent) throw _badEventState(); | |
| 421 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | 590 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); |
| 422 if (replacement != null) { | 591 if (replacement != null) { |
| 423 error = _nonNullError(replacement.error); | 592 error = _nonNullError(replacement.error); |
| 424 stackTrace = replacement.stackTrace; | 593 stackTrace = replacement.stackTrace; |
| 425 } | 594 } |
| 426 _addError(error, stackTrace); | 595 _addError(error, stackTrace); |
| 427 } | 596 } |
| 428 | 597 |
| 429 /** | 598 /** |
| 430 * Closes this controller and sends a done event on the stream. | 599 * Closes this controller and sends a done event on the stream. |
| 431 * | 600 * |
| 432 * The first time a controller is closed, a "done" event is added to its | 601 * The first time a controller is closed, a "done" event is added to its |
| 433 * stream. | 602 * stream. |
| 434 * | 603 * |
| 435 * You are allowed to close the controller more than once, but only the first | 604 * You are allowed to close the controller more than once, but only the first |
| 436 * call has any effect. | 605 * call has any effect. |
| 437 * | 606 * |
| 438 * After closing, no further events may be added using [add] or [addError]. | 607 * After closing, no further events may be added using [add], [addError] |
| 608 * or [addStream]. |
| 439 * | 609 * |
| 440 * The returned future is completed when the done event has been delivered. | 610 * The returned future is completed when the done event has been delivered. |
| 441 */ | 611 */ |
| 442 Future close() { | 612 Future close() { |
| 443 if (isClosed) { | 613 if (isClosed) { |
| 444 return _ensureDoneFuture(); | 614 return _ensureDoneFuture(); |
| 445 } | 615 } |
| 446 if (!_mayAddEvent) throw _badEventState(); | 616 if (!_mayAddEvent) throw _badEventState(); |
| 447 _closeUnchecked(); | 617 _closeUnchecked(); |
| 448 return _ensureDoneFuture(); | 618 return _ensureDoneFuture(); |
| (...skipping 23 matching lines...) Expand all Loading... |
| 472 if (hasListener) { | 642 if (hasListener) { |
| 473 _sendError(error, stackTrace); | 643 _sendError(error, stackTrace); |
| 474 } else if (_isInitialState) { | 644 } else if (_isInitialState) { |
| 475 _ensurePendingEvents().add(new _DelayedError(error, stackTrace)); | 645 _ensurePendingEvents().add(new _DelayedError(error, stackTrace)); |
| 476 } | 646 } |
| 477 } | 647 } |
| 478 | 648 |
| 479 void _close() { | 649 void _close() { |
| 480 // End of addStream stream. | 650 // End of addStream stream. |
| 481 assert(_isAddingStream); | 651 assert(_isAddingStream); |
| 482 _StreamControllerAddStreamState addState = _varData; | 652 _StreamControllerAddStreamState<T> addState = |
| 653 _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
| 483 _varData = addState.varData; | 654 _varData = addState.varData; |
| 484 _state &= ~_STATE_ADDSTREAM; | 655 _state &= ~_STATE_ADDSTREAM; |
| 485 addState.complete(); | 656 addState.complete(); |
| 486 } | 657 } |
| 487 | 658 |
| 488 // _StreamControllerLifeCycle interface | 659 // _StreamControllerLifeCycle interface |
| 489 | 660 |
| 490 StreamSubscription<T> _subscribe( | 661 StreamSubscription<T> _subscribe( |
| 491 void onData(T data), | 662 void onData(T data), |
| 492 Function onError, | 663 Function onError, |
| 493 void onDone(), | 664 void onDone(), |
| 494 bool cancelOnError) { | 665 bool cancelOnError) { |
| 495 if (!_isInitialState) { | 666 if (!_isInitialState) { |
| 496 throw new StateError("Stream has already been listened to."); | 667 throw new StateError("Stream has already been listened to."); |
| 497 } | 668 } |
| 498 _ControllerSubscription subscription = | 669 _ControllerSubscription<T> subscription = |
| 499 new _ControllerSubscription(this, onData, onError, onDone, | 670 new _ControllerSubscription<T>(this, onData, onError, onDone, |
| 500 cancelOnError); | 671 cancelOnError); |
| 501 | 672 |
| 502 _PendingEvents pendingEvents = _pendingEvents; | 673 _PendingEvents<T> pendingEvents = _pendingEvents; |
| 503 _state |= _STATE_SUBSCRIBED; | 674 _state |= _STATE_SUBSCRIBED; |
| 504 if (_isAddingStream) { | 675 if (_isAddingStream) { |
| 505 _StreamControllerAddStreamState addState = _varData; | 676 _StreamControllerAddStreamState<T> addState = |
| 677 _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
| 506 addState.varData = subscription; | 678 addState.varData = subscription; |
| 507 addState.resume(); | 679 addState.resume(); |
| 508 } else { | 680 } else { |
| 509 _varData = subscription; | 681 _varData = subscription; |
| 510 } | 682 } |
| 511 subscription._setPendingEvents(pendingEvents); | 683 subscription._setPendingEvents(pendingEvents); |
| 512 subscription._guardCallback(() { | 684 subscription._guardCallback(() { |
| 513 _runGuarded(_onListen); | 685 _runGuarded(onListen); |
| 514 }); | 686 }); |
| 515 | 687 |
| 516 return subscription; | 688 return subscription; |
| 517 } | 689 } |
| 518 | 690 |
| 519 Future _recordCancel(StreamSubscription<T> subscription) { | 691 Future _recordCancel(StreamSubscription<T> subscription) { |
| 520 // When we cancel, we first cancel any stream being added, | 692 // When we cancel, we first cancel any stream being added, |
| 521 // Then we call _onCancel, and finally the _doneFuture is completed. | 693 // Then we call `onCancel`, and finally the _doneFuture is completed. |
| 522 // If either of addStream's cancel or _onCancel returns a future, | 694 // If either of addStream's cancel or `onCancel` returns a future, |
| 523 // we wait for it before continuing. | 695 // we wait for it before continuing. |
| 524 // Any error during this process ends up in the returned future. | 696 // Any error during this process ends up in the returned future. |
| 525 // If more errors happen, we act as if it happens inside nested try/finallys | 697 // If more errors happen, we act as if it happens inside nested try/finallys |
| 526 // or whenComplete calls, and only the last error ends up in the | 698 // or whenComplete calls, and only the last error ends up in the |
| 527 // returned future. | 699 // returned future. |
| 528 Future result; | 700 Future result; |
| 529 if (_isAddingStream) { | 701 if (_isAddingStream) { |
| 530 _StreamControllerAddStreamState addState = _varData; | 702 _StreamControllerAddStreamState<T> addState = |
| 703 _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
| 531 result = addState.cancel(); | 704 result = addState.cancel(); |
| 532 } | 705 } |
| 533 _varData = null; | 706 _varData = null; |
| 534 _state = | 707 _state = |
| 535 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; | 708 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; |
| 536 | 709 |
| 537 if (_onCancel != null) { | 710 if (onCancel != null) { |
| 538 if (result == null) { | 711 if (result == null) { |
| 539 // Only introduce a future if one is needed. | 712 // Only introduce a future if one is needed. |
| 540 // If _onCancel returns null, no future is needed. | 713 // If _onCancel returns null, no future is needed. |
| 541 try { | 714 try { |
| 542 result = _onCancel(); | 715 result = onCancel(); |
| 543 } catch (e, s) { | 716 } catch (e, s) { |
| 544 // Return the error in the returned future. | 717 // Return the error in the returned future. |
| 545 // Complete it asynchronously, so there is time for a listener | 718 // Complete it asynchronously, so there is time for a listener |
| 546 // to handle the error. | 719 // to handle the error. |
| 547 result = new _Future().._asyncCompleteError(e, s); | 720 result = new _Future().._asyncCompleteError(e, s); |
| 548 } | 721 } |
| 549 } else { | 722 } else { |
| 550 // Simpler case when we already know that we will return a future. | 723 // Simpler case when we already know that we will return a future. |
| 551 result = result.whenComplete(_onCancel); | 724 result = result.whenComplete(onCancel); |
| 552 } | 725 } |
| 553 } | 726 } |
| 554 | 727 |
| 555 void complete() { | 728 void complete() { |
| 556 if (_doneFuture != null && _doneFuture._mayComplete) { | 729 if (_doneFuture != null && _doneFuture._mayComplete) { |
| 557 _doneFuture._asyncComplete(null); | 730 _doneFuture._asyncComplete(null); |
| 558 } | 731 } |
| 559 } | 732 } |
| 560 | 733 |
| 561 if (result != null) { | 734 if (result != null) { |
| 562 result = result.whenComplete(complete); | 735 result = result.whenComplete(complete); |
| 563 } else { | 736 } else { |
| 564 complete(); | 737 complete(); |
| 565 } | 738 } |
| 566 | 739 |
| 567 return result; | 740 return result; |
| 568 } | 741 } |
| 569 | 742 |
| 570 void _recordPause(StreamSubscription<T> subscription) { | 743 void _recordPause(StreamSubscription<T> subscription) { |
| 571 if (_isAddingStream) { | 744 if (_isAddingStream) { |
| 572 _StreamControllerAddStreamState addState = _varData; | 745 _StreamControllerAddStreamState<T> addState = |
| 746 _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
| 573 addState.pause(); | 747 addState.pause(); |
| 574 } | 748 } |
| 575 _runGuarded(_onPause); | 749 _runGuarded(onPause); |
| 576 } | 750 } |
| 577 | 751 |
| 578 void _recordResume(StreamSubscription<T> subscription) { | 752 void _recordResume(StreamSubscription<T> subscription) { |
| 579 if (_isAddingStream) { | 753 if (_isAddingStream) { |
| 580 _StreamControllerAddStreamState addState = _varData; | 754 _StreamControllerAddStreamState<T> addState = |
| 755 _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
| 581 addState.resume(); | 756 addState.resume(); |
| 582 } | 757 } |
| 583 _runGuarded(_onResume); | 758 _runGuarded(onResume); |
| 584 } | 759 } |
| 585 } | 760 } |
| 586 | 761 |
| 587 abstract class _SyncStreamControllerDispatch<T> | 762 abstract class _SyncStreamControllerDispatch<T> |
| 588 implements _StreamController<T> { | 763 implements _StreamController<T>, SynchronousStreamController<T> { |
| 764 int get _state; |
| 765 void set _state(int state); |
| 766 |
| 589 void _sendData(T data) { | 767 void _sendData(T data) { |
| 590 _subscription._add(data); | 768 _subscription._add(data); |
| 591 } | 769 } |
| 592 | 770 |
| 593 void _sendError(Object error, StackTrace stackTrace) { | 771 void _sendError(Object error, StackTrace stackTrace) { |
| 594 _subscription._addError(error, stackTrace); | 772 _subscription._addError(error, stackTrace); |
| 595 } | 773 } |
| 596 | 774 |
| 597 void _sendDone() { | 775 void _sendDone() { |
| 598 _subscription._close(); | 776 _subscription._close(); |
| 599 } | 777 } |
| 600 } | 778 } |
| 601 | 779 |
| 602 abstract class _AsyncStreamControllerDispatch<T> | 780 abstract class _AsyncStreamControllerDispatch<T> |
| 603 implements _StreamController<T> { | 781 implements _StreamController<T> { |
| 604 void _sendData(T data) { | 782 void _sendData(T data) { |
| 605 _subscription._addPending(new _DelayedData(data)); | 783 _subscription._addPending(new _DelayedData<dynamic /*=T*/>(data)); |
| 606 } | 784 } |
| 607 | 785 |
| 608 void _sendError(Object error, StackTrace stackTrace) { | 786 void _sendError(Object error, StackTrace stackTrace) { |
| 609 _subscription._addPending(new _DelayedError(error, stackTrace)); | 787 _subscription._addPending(new _DelayedError(error, stackTrace)); |
| 610 } | 788 } |
| 611 | 789 |
| 612 void _sendDone() { | 790 void _sendDone() { |
| 613 _subscription._addPending(const _DelayedDone()); | 791 _subscription._addPending(const _DelayedDone()); |
| 614 } | 792 } |
| 615 } | 793 } |
| 616 | 794 |
| 617 // TODO(lrn): Use common superclass for callback-controllers when VM supports | 795 // TODO(lrn): Use common superclass for callback-controllers when VM supports |
| 618 // constructors in mixin superclasses. | 796 // constructors in mixin superclasses. |
| 619 | 797 |
| 620 class _AsyncStreamController<T> extends _StreamController<T> | 798 class _AsyncStreamController<T> = _StreamController<T> |
| 621 with _AsyncStreamControllerDispatch<T> { | 799 with _AsyncStreamControllerDispatch<T>; |
| 622 final _NotificationHandler _onListen; | |
| 623 final _NotificationHandler _onPause; | |
| 624 final _NotificationHandler _onResume; | |
| 625 final _NotificationHandler _onCancel; | |
| 626 | 800 |
| 627 _AsyncStreamController(void this._onListen(), | 801 class _SyncStreamController<T> = _StreamController<T> |
| 628 void this._onPause(), | 802 with _SyncStreamControllerDispatch<T>; |
| 629 void this._onResume(), | |
| 630 this._onCancel()); | |
| 631 } | |
| 632 | |
| 633 class _SyncStreamController<T> extends _StreamController<T> | |
| 634 with _SyncStreamControllerDispatch<T> { | |
| 635 final _NotificationHandler _onListen; | |
| 636 final _NotificationHandler _onPause; | |
| 637 final _NotificationHandler _onResume; | |
| 638 final _NotificationHandler _onCancel; | |
| 639 | |
| 640 _SyncStreamController(void this._onListen(), | |
| 641 void this._onPause(), | |
| 642 void this._onResume(), | |
| 643 this._onCancel()); | |
| 644 } | |
| 645 | |
| 646 abstract class _NoCallbacks { | |
| 647 _NotificationHandler get _onListen => null; | |
| 648 _NotificationHandler get _onPause => null; | |
| 649 _NotificationHandler get _onResume => null; | |
| 650 _NotificationHandler get _onCancel => null; | |
| 651 } | |
| 652 | |
| 653 class _NoCallbackAsyncStreamController<T> = _StreamController<T> | |
| 654 with _AsyncStreamControllerDispatch<T>, _NoCallbacks; | |
| 655 | |
| 656 class _NoCallbackSyncStreamController<T> = _StreamController<T> | |
| 657 with _SyncStreamControllerDispatch<T>, _NoCallbacks; | |
| 658 | 803 |
| 659 typedef _NotificationHandler(); | 804 typedef _NotificationHandler(); |
| 660 | 805 |
| 661 Future _runGuarded(_NotificationHandler notificationHandler) { | 806 Future _runGuarded(_NotificationHandler notificationHandler) { |
| 662 if (notificationHandler == null) return null; | 807 if (notificationHandler == null) return null; |
| 663 try { | 808 try { |
| 664 var result = notificationHandler(); | 809 var result = notificationHandler(); |
| 665 if (result is Future) return result; | 810 if (result is Future) return result; |
| 666 return null; | 811 return null; |
| 667 } catch (e, s) { | 812 } catch (e, s) { |
| (...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 765 | 910 |
| 766 /** | 911 /** |
| 767 * Stop adding the stream. | 912 * Stop adding the stream. |
| 768 * | 913 * |
| 769 * Complete the future returned by `StreamController.addStream` when | 914 * Complete the future returned by `StreamController.addStream` when |
| 770 * the cancel is complete. | 915 * the cancel is complete. |
| 771 * | 916 * |
| 772 * Return a future if the cancel takes time, otherwise return `null`. | 917 * Return a future if the cancel takes time, otherwise return `null`. |
| 773 */ | 918 */ |
| 774 Future cancel() { | 919 Future cancel() { |
| 775 var cancel2 = addSubscription.cancel(); | 920 var cancel = addSubscription.cancel(); |
| 776 if (cancel2 == null) { | 921 if (cancel == null) { |
| 777 addStreamFuture._asyncComplete(null); | 922 addStreamFuture._asyncComplete(null); |
| 778 return null; | 923 return null; |
| 779 } | 924 } |
| 780 return cancel2.whenComplete(() { addStreamFuture._asyncComplete(null); }); | 925 return cancel.whenComplete(() { addStreamFuture._asyncComplete(null); }); |
| 781 } | 926 } |
| 782 | 927 |
| 783 void complete() { | 928 void complete() { |
| 784 addStreamFuture._asyncComplete(null); | 929 addStreamFuture._asyncComplete(null); |
| 785 } | 930 } |
| 786 } | 931 } |
| 787 | 932 |
| 788 class _StreamControllerAddStreamState<T> extends _AddStreamState<T> { | 933 class _StreamControllerAddStreamState<T> extends _AddStreamState<T> { |
| 789 // The subscription or pending data of a _StreamController. | 934 // The subscription or pending data of a _StreamController. |
| 790 // Stored here because we reuse the `_varData` field in the _StreamController | 935 // Stored here because we reuse the `_varData` field in the _StreamController |
| 791 // to store this state object. | 936 // to store this state object. |
| 792 var varData; | 937 var varData; |
| 793 | 938 |
| 794 _StreamControllerAddStreamState(_StreamController controller, | 939 _StreamControllerAddStreamState(_StreamController<T> controller, |
| 795 this.varData, | 940 this.varData, |
| 796 Stream source, | 941 Stream source, |
| 797 bool cancelOnError) | 942 bool cancelOnError) |
| 798 : super(controller, source, cancelOnError) { | 943 : super(controller, source, cancelOnError) { |
| 799 if (controller.isPaused) { | 944 if (controller.isPaused) { |
| 800 addSubscription.pause(); | 945 addSubscription.pause(); |
| 801 } | 946 } |
| 802 } | 947 } |
| 803 } | 948 } |
| OLD | NEW |