Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(100)

Side by Side Diff: pkg/dev_compiler/tool/input_sdk/lib/async/stream_controller.dart

Issue 2698353003: unfork DDC's copy of most SDK libraries (Closed)
Patch Set: revert core_patch Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 part of dart.async;
6
7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream.
9 // -------------------------------------------------------------------
10
11 /**
12 * Type of a stream controller's `onListen`, `onPause` and `onResume` callbacks.
13 */
14 typedef void ControllerCallback();
15
16 /**
17 * Type of stream controller `onCancel` callbacks.
18 *
19 * The callback may return either `void` or a future.
20 */
21 typedef ControllerCancelCallback();
22
23 /**
24 * A controller with the stream it controls.
25 *
26 * This controller allows sending data, error and done events on
27 * its [stream].
28 * This class can be used to create a simple stream that others
29 * can listen on, and to push events to that stream.
30 *
31 * It's possible to check whether the stream is paused or not, and whether
32 * it has subscribers or not, as well as getting a callback when either of
33 * these change.
34 *
35 * If the stream starts or stops having listeners (first listener subscribing,
36 * last listener unsubscribing), the `onSubscriptionStateChange` callback
37 * is notified as soon as possible. If the subscription stat changes during
38 * an event firing or a callback being executed, the change will not be reported
39 * until the current event or callback has finished.
40 * If the pause state has also changed during an event or callback, only the
41 * subscription state callback is notified.
42 *
43 * If the subscriber state has not changed, but the pause state has, the
44 * `onPauseStateChange` callback is notified as soon as possible, after firing
45 * a current event or completing another callback. This happens if the stream
46 * is not paused, and a listener pauses it, or if the stream has been resumed
47 * from pause and has no pending events. If the listeners resume a paused stream
48 * while it still has queued events, the controller will still consider the
49 * stream paused until all queued events have been dispatched.
50 *
51 * Whether to invoke a callback depends only on the state before and after
52 * a stream action, for example firing an event. If the state changes multiple
53 * times during the action, and then ends up in the same state as before, no
54 * callback is performed.
55 *
56 * If listeners are added after the stream has completed (sent a "done" event),
57 * the listeners will be sent a "done" event eventually, but they won't affect
58 * the stream at all, and won't trigger callbacks. From the controller's point
59 * of view, the stream is completely inert when has completed.
60 */
61 abstract class StreamController<T> implements StreamSink<T> {
62 /** The stream that this controller is controlling. */
63 Stream<T> get stream;
64
65 /**
66 * A controller with a [stream] that supports only one single subscriber.
67 *
68 * If [sync] is true, the returned stream controller is a
69 * [SynchronousStreamController], and must be used with the care
70 * and attention necessary to not break the [Stream] contract.
71 * See [Completer.sync] for some explanations on when a synchronous
72 * dispatching can be used.
73 * If in doubt, keep the controller non-sync.
74 *
75 * A Stream should be inert until a subscriber starts listening on it (using
76 * the [onListen] callback to start producing events). Streams should not
77 * leak resources (like websockets) when no user ever listens on the stream.
78 *
79 * The controller buffers all incoming events until a subscriber is
80 * registered, but this feature should only be used in rare circumstances.
81 *
82 * The [onPause] function is called when the stream becomes
83 * paused. [onResume] is called when the stream resumed.
84 *
85 * The [onListen] callback is called when the stream
86 * receives its listener and [onCancel] when the listener ends
87 * its subscription. If [onCancel] needs to perform an asynchronous operation,
88 * [onCancel] should return a future that completes when the cancel operation
89 * is done.
90 *
91 * If the stream is canceled before the controller needs new data the
92 * [onResume] call might not be executed.
93 */
94 factory StreamController({void onListen(),
95 void onPause(),
96 void onResume(),
97 onCancel(),
98 bool sync: false}) {
99 return sync
100 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
101 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
102 }
103
104 /**
105 * A controller where [stream] can be listened to more than once.
106 *
107 * The [Stream] returned by [stream] is a broadcast stream.
108 * It can be listened to more than once.
109 *
110 * A Stream should be inert until a subscriber starts listening on it (using
111 * the [onListen] callback to start producing events). Streams should not
112 * leak resources (like websockets) when no user ever listens on the stream.
113 *
114 * Broadcast streams do not buffer events when there is no listener.
115 *
116 * The controller distributes any events to all currently subscribed
117 * listeners at the time when [add], [addError] or [close] is called.
118 * It is not allowed to call `add`, `addError`, or `close` before a previous
119 * call has returned. The controller does not have any internal queue of
120 * events, and if there are no listeners at the time the event is added,
121 * it will just be dropped, or, if it is an error, be reported as uncaught.
122 *
123 * Each listener subscription is handled independently,
124 * and if one pauses, only the pausing listener is affected.
125 * A paused listener will buffer events internally until unpaused or canceled.
126 *
127 * If [sync] is true, events may be fired directly by the stream's
128 * subscriptions during an [add], [addError] or [close] call.
129 * The returned stream controller is a [SynchronousStreamController],
130 * and must be used with the care and attention necessary to not break
131 * the [Stream] contract.
132 * See [Completer.sync] for some explanations on when a synchronous
133 * dispatching can be used.
134 * If in doubt, keep the controller non-sync.
135 *
136 * If [sync] is false, the event will always be fired at a later time,
137 * after the code adding the event has completed.
138 * In that case, no guarantees are given with regard to when
139 * multiple listeners get the events, except that each listener will get
140 * all events in the correct order. Each subscription handles the events
141 * individually.
142 * If two events are sent on an async controller with two listeners,
143 * one of the listeners may get both events
144 * before the other listener gets any.
145 * A listener must be subscribed both when the event is initiated
146 * (that is, when [add] is called)
147 * and when the event is later delivered,
148 * in order to receive the event.
149 *
150 * The [onListen] callback is called when the first listener is subscribed,
151 * and the [onCancel] is called when there are no longer any active listeners.
152 * If a listener is added again later, after the [onCancel] was called,
153 * the [onListen] will be called again.
154 */
155 factory StreamController.broadcast({void onListen(),
156 void onCancel(),
157 bool sync: false}) {
158 return sync
159 ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
160 : new _AsyncBroadcastStreamController<T>(onListen, onCancel);
161 }
162
163 /**
164 * The callback which is called when the stream is listened to.
165 *
166 * May be set to `null`, in which case no callback will happen.
167 */
168 ControllerCallback get onListen;
169
170 void set onListen(void onListenHandler());
171
172 /**
173 * The callback which is called when the stream is paused.
174 *
175 * May be set to `null`, in which case no callback will happen.
176 *
177 * Pause related callbacks are not supported on broadcast stream controllers.
178 */
179 ControllerCallback get onPause;
180
181 void set onPause(void onPauseHandler());
182
183 /**
184 * The callback which is called when the stream is resumed.
185 *
186 * May be set to `null`, in which case no callback will happen.
187 *
188 * Pause related callbacks are not supported on broadcast stream controllers.
189 */
190 ControllerCallback get onResume;
191
192 void set onResume(void onResumeHandler());
193
194 /**
195 * The callback which is called when the stream is canceled.
196 *
197 * May be set to `null`, in which case no callback will happen.
198 */
199 ControllerCancelCallback get onCancel;
200
201 void set onCancel(onCancelHandler());
202
203 /**
204 * Returns a view of this object that only exposes the [StreamSink] interface.
205 */
206 StreamSink<T> get sink;
207
208 /**
209 * Whether the stream controller is closed for adding more events.
210 *
211 * The controller becomes closed by calling the [close] method.
212 * New events cannot be added, by calling [add] or [addError],
213 * to a closed controller.
214 *
215 * If the controller is closed,
216 * the "done" event might not have been delivered yet,
217 * but it has been scheduled, and it is too late to add more events.
218 */
219 bool get isClosed;
220
221 /**
222 * Whether the subscription would need to buffer events.
223 *
224 * This is the case if the controller's stream has a listener and it is
225 * paused, or if it has not received a listener yet. In that case, the
226 * controller is considered paused as well.
227 *
228 * A broadcast stream controller is never considered paused. It always
229 * forwards its events to all uncanceled subscriptions, if any,
230 * and let the subscriptions handle their own pausing and buffering.
231 */
232 bool get isPaused;
233
234 /** Whether there is a subscriber on the [Stream]. */
235 bool get hasListener;
236
237 /**
238 * Send or enqueue an error event.
239 *
240 * If [error] is `null`, it is replaced by a [NullThrownError].
241 */
242 void addError(Object error, [StackTrace stackTrace]);
243
244 /**
245 * Receives events from [source] and puts them into this controller's stream.
246 *
247 * Returns a future which completes when the source stream is done.
248 *
249 * Events must not be added directly to this controller using [add],
250 * [addError], [close] or [addStream], until the returned future
251 * is complete.
252 *
253 * Data and error events are forwarded to this controller's stream. A done
254 * event on the source will end the `addStream` operation and complete the
255 * returned future.
256 *
257 * If [cancelOnError] is true, only the first error on [source] is
258 * forwarded to the controller's stream, and the `addStream` ends
259 * after this. If [cancelOnError] is false, all errors are forwarded
260 * and only a done event will end the `addStream`.
261 */
262 Future addStream(Stream<T> source, {bool cancelOnError: true});
263 }
264
265
266 /**
267 * A stream controller that delivers its events synchronously.
268 *
269 * A synchronous stream controller is intended for cases where
270 * an already asynchronous event triggers an event on a stream.
271 *
272 * Instead of adding the event to the stream in a later microtask,
273 * causing extra latency, the event is instead fired immediately by the
274 * synchronous stream controller, as if the stream event was
275 * the current event or microtask.
276 *
277 * The synchronous stream controller can be used to break the contract
278 * on [Stream], and it must be used carefully to avoid doing so.
279 *
280 * The only advantage to using a [SynchronousStreamController] over a
281 * normal [StreamController] is the improved latency.
282 * Only use the synchronous version if the improvement is significant,
283 * and if its use is safe. Otherwise just use a normal stream controller,
284 * which will always have the correct behavior for a [Stream], and won't
285 * accidentally break other code.
286 *
287 * Adding events to a synchronous controller should only happen as the
288 * very last part of a the handling of the original event.
289 * At that point, adding an event to the stream is equivalent to
290 * returning to the event loop and adding the event in the next microtask.
291 *
292 * Each listener callback will be run as if it was a top-level event
293 * or microtask. This means that if it throws, the error will be reported as
294 * uncaught as soon as possible.
295 * This is one reason to add the event as the last thing in the original event
296 * handler - any action done after adding the event will delay the report of
297 * errors in the event listener callbacks.
298 *
299 * If an event is added in a setting that isn't known to be another event,
300 * it may cause the stream's listener to get that event before the listener
301 * is ready to handle it. We promise that after calling [Stream.listen],
302 * you won't get any events until the code doing the listen has completed.
303 * Calling [add] in response to a function call of unknown origin may break
304 * that promise.
305 *
306 * An [onListen] callback from the controller is *not* an asynchronous event,
307 * and adding events to the controller in the `onListen` callback is always
308 * wrong. The events will be delivered before the listener has even received
309 * the subscription yet.
310 *
311 * The synchronous broadcast stream controller also has a restrictions that a
312 * normal stream controller does not:
313 * The [add], [addError], [close] and [addStream] methods *must not* be
314 * called while an event is being delivered.
315 * That is, if a callback on a subscription on the controller's stream causes
316 * a call to any of the functions above, the call will fail.
317 * A broadcast stream may have more than one listener, and if an
318 * event is added synchronously while another is being also in the process
319 * of being added, the latter event might reach some listeners before
320 * the former. To prevent that, an event cannot be added while a previous
321 * event is being fired.
322 * This guarantees that an event is fully delivered when the
323 * first [add], [addError] or [close] returns,
324 * and further events will be delivered in the correct order.
325 *
326 * This still only guarantees that the event is delivered to the subscription.
327 * If the subscription is paused, the actual callback may still happen later,
328 * and the event will instead be buffered by the subscription.
329 * Barring pausing, and the following buffered events that haven't been
330 * delivered yet, callbacks will be called synchronously when an event is added.
331 *
332 * Adding an event to a synchronous non-broadcast stream controller while
333 * another event is in progress may cause the second event to be delayed
334 * and not be delivered synchronously, and until that event is delivered,
335 * the controller will not act synchronously.
336 */
337 abstract class SynchronousStreamController<T> implements StreamController<T> {
338 /**
339 * Adds event to the controller's stream.
340 *
341 * As [StreamController.add], but must not be called while an event is
342 * being added by [add], [addError] or [close].
343 */
344 void add(T data);
345
346 /**
347 * Adds error to the controller's stream.
348 *
349 * As [StreamController.addError], but must not be called while an event is
350 * being added by [add], [addError] or [close].
351 */
352 void addError(Object error, [StackTrace stackTrace]);
353
354 /**
355 * Closes the controller's stream.
356 *
357 * As [StreamController.close], but must not be called while an event is
358 * being added by [add], [addError] or [close].
359 */
360 Future close();
361 }
362
363 abstract class _StreamControllerLifecycle<T> {
364 StreamSubscription<T> _subscribe(
365 void onData(T data),
366 Function onError,
367 void onDone(),
368 bool cancelOnError);
369 void _recordPause(StreamSubscription<T> subscription) {}
370 void _recordResume(StreamSubscription<T> subscription) {}
371 Future _recordCancel(StreamSubscription<T> subscription) => null;
372 }
373
374 /**
375 * Default implementation of [StreamController].
376 *
377 * Controls a stream that only supports a single controller.
378 */
379 abstract class _StreamController<T> implements StreamController<T>,
380 _StreamControllerLifecycle<T>,
381 _EventSink<T>,
382 _EventDispatch<T> {
383 // The states are bit-flags. More than one can be set at a time.
384 //
385 // The "subscription state" goes through the states:
386 // initial -> subscribed -> canceled.
387 // These are mutually exclusive.
388 // The "closed" state records whether the [close] method has been called
389 // on the controller. This can be done at any time. If done before
390 // subscription, the done event is queued. If done after cancel, the done
391 // event is ignored (just as any other event after a cancel).
392
393 /** The controller is in its initial state with no subscription. */
394 static const int _STATE_INITIAL = 0;
395 /** The controller has a subscription, but hasn't been closed or canceled. */
396 static const int _STATE_SUBSCRIBED = 1;
397 /** The subscription is canceled. */
398 static const int _STATE_CANCELED = 2;
399 /** Mask for the subscription state. */
400 static const int _STATE_SUBSCRIPTION_MASK = 3;
401
402 // The following state relate to the controller, not the subscription.
403 // If closed, adding more events is not allowed.
404 // If executing an [addStream], new events are not allowed either, but will
405 // be added by the stream.
406
407 /**
408 * The controller is closed due to calling [close].
409 *
410 * When the stream is closed, you can neither add new events nor add new
411 * listeners.
412 */
413 static const int _STATE_CLOSED = 4;
414 /**
415 * The controller is in the middle of an [addStream] operation.
416 *
417 * While adding events from a stream, no new events can be added directly
418 * on the controller.
419 */
420 static const int _STATE_ADDSTREAM = 8;
421
422 /**
423 * Field containing different data depending on the current subscription
424 * state.
425 *
426 * If [_state] is [_STATE_INITIAL], the field may contain a [_PendingEvents]
427 * for events added to the controller before a subscription.
428 *
429 * While [_state] is [_STATE_SUBSCRIBED], the field contains the subscription.
430 *
431 * When [_state] is [_STATE_CANCELED] the field is currently not used.
432 */
433 var _varData;
434
435 /** Current state of the controller. */
436 int _state = _STATE_INITIAL;
437
438 /**
439 * Future completed when the stream sends its last event.
440 *
441 * This is also the future returned by [close].
442 */
443 // TODO(lrn): Could this be stored in the varData field too, if it's not
444 // accessed until the call to "close"? Then we need to special case if it's
445 // accessed earlier, or if close is called before subscribing.
446 _Future _doneFuture;
447
448 ControllerCallback onListen;
449 ControllerCallback onPause;
450 ControllerCallback onResume;
451 ControllerCancelCallback onCancel;
452
453 _StreamController(this.onListen,
454 this.onPause,
455 this.onResume,
456 this.onCancel);
457
458 // Return a new stream every time. The streams are equal, but not identical.
459 Stream<T> get stream => new _ControllerStream<T>(this);
460
461 /**
462 * Returns a view of this object that only exposes the [StreamSink] interface.
463 */
464 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this);
465
466 /**
467 * Whether a listener has existed and been canceled.
468 *
469 * After this, adding more events will be ignored.
470 */
471 bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
472
473 /** Whether there is an active listener. */
474 bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0;
475
476 /** Whether there has not been a listener yet. */
477 bool get _isInitialState =>
478 (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL;
479
480 bool get isClosed => (_state & _STATE_CLOSED) != 0;
481
482 bool get isPaused => hasListener ? _subscription._isInputPaused
483 : !_isCanceled;
484
485 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0;
486
487 /** New events may not be added after close, or during addStream. */
488 bool get _mayAddEvent => (_state < _STATE_CLOSED);
489
490 // Returns the pending events.
491 // Pending events are events added before a subscription exists.
492 // They are added to the subscription when it is created.
493 // Pending events, if any, are kept in the _varData field until the
494 // stream is listened to.
495 // While adding a stream, pending events are moved into the
496 // state object to allow the state object to use the _varData field.
497 _PendingEvents<T> get _pendingEvents {
498 assert(_isInitialState);
499 if (!_isAddingStream) {
500 return _varData as Object /*=_PendingEvents<T>*/;
501 }
502 _StreamControllerAddStreamState<T> state =
503 _varData as Object /*=_StreamControllerAddStreamState<T>*/;
504 return state.varData as Object /*=_PendingEvents<T>*/;
505 }
506
507 // Returns the pending events, and creates the object if necessary.
508 _StreamImplEvents<T> _ensurePendingEvents() {
509 assert(_isInitialState);
510 if (!_isAddingStream) {
511 if (_varData == null) _varData = new _StreamImplEvents<T>();
512 return _varData as Object /*=_StreamImplEvents<T>*/;
513 }
514 _StreamControllerAddStreamState<T> state =
515 _varData as Object /*=_StreamControllerAddStreamState<T>*/;
516 if (state.varData == null) state.varData = new _StreamImplEvents<T>();
517 return state.varData as Object /*=_StreamImplEvents<T>*/;
518 }
519
520 // Get the current subscription.
521 // If we are adding a stream, the subscription is moved into the state
522 // object to allow the state object to use the _varData field.
523 _ControllerSubscription<T> get _subscription {
524 assert(hasListener);
525 if (_isAddingStream) {
526 _StreamControllerAddStreamState<T> addState =
527 _varData as Object /*=_StreamControllerAddStreamState<T>*/;
528 return addState.varData as Object /*=_ControllerSubscription<T>*/;
529 }
530 return _varData as Object /*=_ControllerSubscription<T>*/;
531 }
532
533 /**
534 * Creates an error describing why an event cannot be added.
535 *
536 * The reason, and therefore the error message, depends on the current state.
537 */
538 Error _badEventState() {
539 if (isClosed) {
540 return new StateError("Cannot add event after closing");
541 }
542 assert(_isAddingStream);
543 return new StateError("Cannot add event while adding a stream");
544 }
545
546 // StreamSink interface.
547 Future addStream(Stream<T> source, {bool cancelOnError: true}) {
548 if (!_mayAddEvent) throw _badEventState();
549 if (_isCanceled) return new _Future.immediate(null);
550 _StreamControllerAddStreamState<T> addState =
551 new _StreamControllerAddStreamState<T>(this,
552 _varData,
553 source,
554 cancelOnError);
555 _varData = addState;
556 _state |= _STATE_ADDSTREAM;
557 return addState.addStreamFuture;
558 }
559
560 /**
561 * Returns a future that is completed when the stream is done
562 * processing events.
563 *
564 * This happens either when the done event has been sent, or if the
565 * subscriber of a single-subscription stream is cancelled.
566 */
567 Future get done => _ensureDoneFuture();
568
569 Future _ensureDoneFuture() {
570 if (_doneFuture == null) {
571 _doneFuture = _isCanceled ? Future._nullFuture : new _Future();
572 }
573 return _doneFuture;
574 }
575
576 /**
577 * Send or enqueue a data event.
578 */
579 void add(T value) {
580 if (!_mayAddEvent) throw _badEventState();
581 _add(value);
582 }
583
584 /**
585 * Send or enqueue an error event.
586 */
587 void addError(Object error, [StackTrace stackTrace]) {
588 if (!_mayAddEvent) throw _badEventState();
589 error = _nonNullError(error);
590 AsyncError replacement = Zone.current.errorCallback(error, stackTrace);
591 if (replacement != null) {
592 error = _nonNullError(replacement.error);
593 stackTrace = replacement.stackTrace;
594 }
595 _addError(error, stackTrace);
596 }
597
598 /**
599 * Closes this controller and sends a done event on the stream.
600 *
601 * The first time a controller is closed, a "done" event is added to its
602 * stream.
603 *
604 * You are allowed to close the controller more than once, but only the first
605 * call has any effect.
606 *
607 * After closing, no further events may be added using [add], [addError]
608 * or [addStream].
609 *
610 * The returned future is completed when the done event has been delivered.
611 */
612 Future close() {
613 if (isClosed) {
614 return _ensureDoneFuture();
615 }
616 if (!_mayAddEvent) throw _badEventState();
617 _closeUnchecked();
618 return _ensureDoneFuture();
619 }
620
621 void _closeUnchecked() {
622 _state |= _STATE_CLOSED;
623 if (hasListener) {
624 _sendDone();
625 } else if (_isInitialState) {
626 _ensurePendingEvents().add(const _DelayedDone());
627 }
628 }
629
630 // EventSink interface. Used by the [addStream] events.
631
632 // Add data event, used both by the [addStream] events and by [add].
633 void _add(T value) {
634 if (hasListener) {
635 _sendData(value);
636 } else if (_isInitialState) {
637 _ensurePendingEvents().add(new _DelayedData<T>(value));
638 }
639 }
640
641 void _addError(Object error, StackTrace stackTrace) {
642 if (hasListener) {
643 _sendError(error, stackTrace);
644 } else if (_isInitialState) {
645 _ensurePendingEvents().add(new _DelayedError(error, stackTrace));
646 }
647 }
648
649 void _close() {
650 // End of addStream stream.
651 assert(_isAddingStream);
652 _StreamControllerAddStreamState<T> addState =
653 _varData as Object /*=_StreamControllerAddStreamState<T>*/;
654 _varData = addState.varData;
655 _state &= ~_STATE_ADDSTREAM;
656 addState.complete();
657 }
658
659 // _StreamControllerLifeCycle interface
660
661 StreamSubscription<T> _subscribe(
662 void onData(T data),
663 Function onError,
664 void onDone(),
665 bool cancelOnError) {
666 if (!_isInitialState) {
667 throw new StateError("Stream has already been listened to.");
668 }
669 _ControllerSubscription<T> subscription =
670 new _ControllerSubscription<T>(this, onData, onError, onDone,
671 cancelOnError);
672
673 _PendingEvents<T> pendingEvents = _pendingEvents;
674 _state |= _STATE_SUBSCRIBED;
675 if (_isAddingStream) {
676 _StreamControllerAddStreamState<T> addState =
677 _varData as Object /*=_StreamControllerAddStreamState<T>*/;
678 addState.varData = subscription;
679 addState.resume();
680 } else {
681 _varData = subscription;
682 }
683 subscription._setPendingEvents(pendingEvents);
684 subscription._guardCallback(() {
685 _runGuarded(onListen);
686 });
687
688 return subscription;
689 }
690
691 Future _recordCancel(StreamSubscription<T> subscription) {
692 // When we cancel, we first cancel any stream being added,
693 // Then we call `onCancel`, and finally the _doneFuture is completed.
694 // If either of addStream's cancel or `onCancel` returns a future,
695 // we wait for it before continuing.
696 // Any error during this process ends up in the returned future.
697 // If more errors happen, we act as if it happens inside nested try/finallys
698 // or whenComplete calls, and only the last error ends up in the
699 // returned future.
700 Future result;
701 if (_isAddingStream) {
702 _StreamControllerAddStreamState<T> addState =
703 _varData as Object /*=_StreamControllerAddStreamState<T>*/;
704 result = addState.cancel();
705 }
706 _varData = null;
707 _state =
708 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED;
709
710 if (onCancel != null) {
711 if (result == null) {
712 // Only introduce a future if one is needed.
713 // If _onCancel returns null, no future is needed.
714 try {
715 result = onCancel();
716 } catch (e, s) {
717 // Return the error in the returned future.
718 // Complete it asynchronously, so there is time for a listener
719 // to handle the error.
720 result = new _Future().._asyncCompleteError(e, s);
721 }
722 } else {
723 // Simpler case when we already know that we will return a future.
724 result = result.whenComplete(onCancel);
725 }
726 }
727
728 void complete() {
729 if (_doneFuture != null && _doneFuture._mayComplete) {
730 _doneFuture._asyncComplete(null);
731 }
732 }
733
734 if (result != null) {
735 result = result.whenComplete(complete);
736 } else {
737 complete();
738 }
739
740 return result;
741 }
742
743 void _recordPause(StreamSubscription<T> subscription) {
744 if (_isAddingStream) {
745 _StreamControllerAddStreamState<T> addState =
746 _varData as Object /*=_StreamControllerAddStreamState<T>*/;
747 addState.pause();
748 }
749 _runGuarded(onPause);
750 }
751
752 void _recordResume(StreamSubscription<T> subscription) {
753 if (_isAddingStream) {
754 _StreamControllerAddStreamState<T> addState =
755 _varData as Object /*=_StreamControllerAddStreamState<T>*/;
756 addState.resume();
757 }
758 _runGuarded(onResume);
759 }
760 }
761
762 abstract class _SyncStreamControllerDispatch<T>
763 implements _StreamController<T>, SynchronousStreamController<T> {
764 int get _state;
765 void set _state(int state);
766
767 void _sendData(T data) {
768 _subscription._add(data);
769 }
770
771 void _sendError(Object error, StackTrace stackTrace) {
772 _subscription._addError(error, stackTrace);
773 }
774
775 void _sendDone() {
776 _subscription._close();
777 }
778 }
779
780 abstract class _AsyncStreamControllerDispatch<T>
781 implements _StreamController<T> {
782 void _sendData(T data) {
783 _subscription._addPending(new _DelayedData<dynamic /*=T*/>(data));
784 }
785
786 void _sendError(Object error, StackTrace stackTrace) {
787 _subscription._addPending(new _DelayedError(error, stackTrace));
788 }
789
790 void _sendDone() {
791 _subscription._addPending(const _DelayedDone());
792 }
793 }
794
795 // TODO(lrn): Use common superclass for callback-controllers when VM supports
796 // constructors in mixin superclasses.
797
798 class _AsyncStreamController<T> = _StreamController<T>
799 with _AsyncStreamControllerDispatch<T>;
800
801 class _SyncStreamController<T> = _StreamController<T>
802 with _SyncStreamControllerDispatch<T>;
803
804 typedef _NotificationHandler();
805
806 Future _runGuarded(_NotificationHandler notificationHandler) {
807 if (notificationHandler == null) return null;
808 try {
809 var result = notificationHandler();
810 if (result is Future) return result;
811 return null;
812 } catch (e, s) {
813 Zone.current.handleUncaughtError(e, s);
814 }
815 }
816
817 class _ControllerStream<T> extends _StreamImpl<T> {
818 _StreamControllerLifecycle<T> _controller;
819
820 _ControllerStream(this._controller);
821
822 StreamSubscription<T> _createSubscription(
823 void onData(T data),
824 Function onError,
825 void onDone(),
826 bool cancelOnError) =>
827 _controller._subscribe(onData, onError, onDone, cancelOnError);
828
829 // Override == and hashCode so that new streams returned by the same
830 // controller are considered equal. The controller returns a new stream
831 // each time it's queried, but doesn't have to cache the result.
832
833 int get hashCode => _controller.hashCode ^ 0x35323532;
834
835 bool operator==(Object other) {
836 if (identical(this, other)) return true;
837 if (other is! _ControllerStream) return false;
838 _ControllerStream otherStream = other;
839 return identical(otherStream._controller, this._controller);
840 }
841 }
842
843 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
844 final _StreamControllerLifecycle<T> _controller;
845
846 _ControllerSubscription(this._controller, void onData(T data),
847 Function onError, void onDone(), bool cancelOnError)
848 : super(onData, onError, onDone, cancelOnError);
849
850 Future _onCancel() {
851 return _controller._recordCancel(this);
852 }
853
854 void _onPause() {
855 _controller._recordPause(this);
856 }
857
858 void _onResume() {
859 _controller._recordResume(this);
860 }
861 }
862
863
864 /** A class that exposes only the [StreamSink] interface of an object. */
865 class _StreamSinkWrapper<T> implements StreamSink<T> {
866 final StreamController _target;
867 _StreamSinkWrapper(this._target);
868 void add(T data) { _target.add(data); }
869 void addError(Object error, [StackTrace stackTrace]) {
870 _target.addError(error, stackTrace);
871 }
872 Future close() => _target.close();
873 Future addStream(Stream<T> source, {bool cancelOnError: true}) =>
874 _target.addStream(source, cancelOnError: cancelOnError);
875 Future get done => _target.done;
876 }
877
878 /**
879 * Object containing the state used to handle [StreamController.addStream].
880 */
881 class _AddStreamState<T> {
882 // [_Future] returned by call to addStream.
883 final _Future addStreamFuture;
884
885 // Subscription on stream argument to addStream.
886 final StreamSubscription addSubscription;
887
888 _AddStreamState(_EventSink<T> controller, Stream source, bool cancelOnError)
889 : addStreamFuture = new _Future(),
890 addSubscription = source.listen(controller._add,
891 onError: cancelOnError
892 ? makeErrorHandler(controller)
893 : controller._addError,
894 onDone: controller._close,
895 cancelOnError: cancelOnError);
896
897 static makeErrorHandler(_EventSink controller) =>
898 (e, StackTrace s) {
899 controller._addError(e, s);
900 controller._close();
901 };
902
903 void pause() {
904 addSubscription.pause();
905 }
906
907 void resume() {
908 addSubscription.resume();
909 }
910
911 /**
912 * Stop adding the stream.
913 *
914 * Complete the future returned by `StreamController.addStream` when
915 * the cancel is complete.
916 *
917 * Return a future if the cancel takes time, otherwise return `null`.
918 */
919 Future cancel() {
920 var cancel = addSubscription.cancel();
921 if (cancel == null) {
922 addStreamFuture._asyncComplete(null);
923 return null;
924 }
925 return cancel.whenComplete(() { addStreamFuture._asyncComplete(null); });
926 }
927
928 void complete() {
929 addStreamFuture._asyncComplete(null);
930 }
931 }
932
933 class _StreamControllerAddStreamState<T> extends _AddStreamState<T> {
934 // The subscription or pending data of a _StreamController.
935 // Stored here because we reuse the `_varData` field in the _StreamController
936 // to store this state object.
937 var varData;
938
939 _StreamControllerAddStreamState(_StreamController<T> controller,
940 this.varData,
941 Stream source,
942 bool cancelOnError)
943 : super(controller, source, cancelOnError) {
944 if (controller.isPaused) {
945 addSubscription.pause();
946 }
947 }
948 }
OLDNEW
« no previous file with comments | « pkg/dev_compiler/tool/input_sdk/lib/async/stream.dart ('k') | pkg/dev_compiler/tool/input_sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698