Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(224)

Side by Side Diff: pkg/dev_compiler/tool/input_sdk/lib/async/stream.dart

Issue 2698353003: unfork DDC's copy of most SDK libraries (Closed)
Patch Set: revert core_patch Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 part of dart.async;
6
7 // -------------------------------------------------------------------
8 // Core Stream types
9 // -------------------------------------------------------------------
10
11 typedef void _TimerCallback();
12
13 /**
14 * A source of asynchronous data events.
15 *
16 * A Stream provides a way to receive a sequence of events.
17 * Each event is either a data event or an error event,
18 * representing the result of a single computation.
19 * When the events provided by a Stream have all been sent,
20 * a single "done" event will mark the end.
21 *
22 * You can [listen] on a stream to make it start generating events,
23 * and to set up listeners that receive the events.
24 * When you listen, you receive a [StreamSubscription] object
25 * which is the active object providing the events,
26 * and which can be used to stop listening again,
27 * or to temporarily pause events from the subscription.
28 *
29 * There are two kinds of streams: "Single-subscription" streams and
30 * "broadcast" streams.
31 *
32 * *A single-subscription stream* allows only a single listener during the whole
33 * lifetime of the stream.
34 * It doesn't start generating events until it has a listener,
35 * and it stops sending events when the listener is unsubscribed,
36 * even if the source of events could still provide more.
37 *
38 * Listening twice on a single-subscription stream is not allowed, even after
39 * the first subscription has been canceled.
40 *
41 * Single-subscription streams are generally used for streaming chunks of
42 * larger contiguous data like file I/O.
43 *
44 * *A broadcast stream* allows any number of listeners, and it fires
45 * its events when they are ready, whether there are listeners or not.
46 *
47 * Broadcast streams are used for independent events/observers.
48 *
49 * If several listeners want to listen to a single subscription stream,
50 * use [asBroadcastStream] to create a broadcast stream on top of the
51 * non-broadcast stream.
52 *
53 * On either kind of stream, stream transformations, such as [where] and
54 * [skip], return the same type of stream as the one the method was called on,
55 * unless otherwise noted.
56 *
57 * When an event is fired, the listener(s) at that time will receive the event.
58 * If a listener is added to a broadcast stream while an event is being fired,
59 * that listener will not receive the event currently being fired.
60 * If a listener is canceled, it immediately stops receiving events.
61 *
62 * When the "done" event is fired, subscribers are unsubscribed before
63 * receiving the event. After the event has been sent, the stream has no
64 * subscribers. Adding new subscribers to a broadcast stream after this point
65 * is allowed, but they will just receive a new "done" event as soon
66 * as possible.
67 *
68 * Stream subscriptions always respect "pause" requests. If necessary they need
69 * to buffer their input, but often, and preferably, they can simply request
70 * their input to pause too.
71 *
72 * The default implementation of [isBroadcast] returns false.
73 * A broadcast stream inheriting from [Stream] must override [isBroadcast]
74 * to return `true`.
75 */
76 abstract class Stream<T> {
77 Stream();
78
79 /**
80 * Internal use only. We do not want to promise that Stream stays const.
81 *
82 * If mixins become compatible with const constructors, we may use a
83 * stream mixin instead of extending Stream from a const class.
84 */
85 const Stream._internal();
86
87 /**
88 * Creates an empty broadcast stream.
89 *
90 * This is a stream which does nothing except sending a done event
91 * when it's listened to.
92 */
93 const factory Stream.empty() = _EmptyStream<T>;
94
95 /**
96 * Creates a new single-subscription stream from the future.
97 *
98 * When the future completes, the stream will fire one event, either
99 * data or error, and then close with a done-event.
100 */
101 factory Stream.fromFuture(Future<T> future) {
102 // Use the controller's buffering to fill in the value even before
103 // the stream has a listener. For a single value, it's not worth it
104 // to wait for a listener before doing the `then` on the future.
105 _StreamController<T> controller = new StreamController<T>(sync: true);
106 future.then((value) {
107 controller._add(value);
108 controller._closeUnchecked();
109 },
110 onError: (error, stackTrace) {
111 controller._addError(error, stackTrace);
112 controller._closeUnchecked();
113 });
114 return controller.stream;
115 }
116
117 /**
118 * Create a stream from a group of futures.
119 *
120 * The stream reports the results of the futures on the stream in the order
121 * in which the futures complete.
122 *
123 * If some futures have completed before calling `Stream.fromFutures`,
124 * their result will be output on the created stream in some unspecified
125 * order.
126 *
127 * When all futures have completed, the stream is closed.
128 *
129 * If no future is passed, the stream closes as soon as possible.
130 */
131 factory Stream.fromFutures(Iterable<Future<T>> futures) {
132 _StreamController<T> controller = new StreamController<T>(sync: true);
133 int count = 0;
134 var onValue = (T value) {
135 if (!controller.isClosed) {
136 controller._add(value);
137 if (--count == 0) controller._closeUnchecked();
138 }
139 };
140 var onError = (error, stack) {
141 if (!controller.isClosed) {
142 controller._addError(error, stack);
143 if (--count == 0) controller._closeUnchecked();
144 }
145 };
146 // The futures are already running, so start listening to them immediately
147 // (instead of waiting for the stream to be listened on).
148 // If we wait, we might not catch errors in the futures in time.
149 for (var future in futures) {
150 count++;
151 future.then(onValue, onError: onError);
152 }
153 // Use schedule microtask since controller is sync.
154 if (count == 0) scheduleMicrotask(controller.close);
155 return controller.stream;
156 }
157
158 /**
159 * Creates a single-subscription stream that gets its data from [data].
160 *
161 * The iterable is iterated when the stream receives a listener, and stops
162 * iterating if the listener cancels the subscription.
163 *
164 * If iterating [data] throws an error, the stream ends immediately with
165 * that error. No done event will be sent (iteration is not complete), but no
166 * further data events will be generated either, since iteration cannot
167 * continue.
168 */
169 factory Stream.fromIterable(Iterable<T> data) {
170 return new _GeneratedStreamImpl<T>(
171 () => new _IterablePendingEvents<T>(data));
172 }
173
174 /**
175 * Creates a stream that repeatedly emits events at [period] intervals.
176 *
177 * The event values are computed by invoking [computation]. The argument to
178 * this callback is an integer that starts with 0 and is incremented for
179 * every event.
180 *
181 * If [computation] is omitted the event values will all be `null`.
182 */
183 factory Stream.periodic(Duration period,
184 [T computation(int computationCount)]) {
185 Timer timer;
186 int computationCount = 0;
187 StreamController<T> controller;
188 // Counts the time that the Stream was running (and not paused).
189 Stopwatch watch = new Stopwatch();
190
191 void sendEvent() {
192 watch.reset();
193 T data;
194 if (computation != null) {
195 try {
196 data = computation(computationCount++);
197 } catch (e, s) {
198 controller.addError(e, s);
199 return;
200 }
201 }
202 controller.add(data);
203 }
204
205 void startPeriodicTimer() {
206 assert(timer == null);
207 timer = new Timer.periodic(period, (Timer timer) {
208 sendEvent();
209 });
210 }
211
212 controller = new StreamController<T>(sync: true,
213 onListen: () {
214 watch.start();
215 startPeriodicTimer();
216 },
217 onPause: () {
218 timer.cancel();
219 timer = null;
220 watch.stop();
221 },
222 onResume: () {
223 assert(timer == null);
224 Duration elapsed = watch.elapsed;
225 watch.start();
226 timer = new Timer(period - elapsed, () {
227 timer = null;
228 startPeriodicTimer();
229 sendEvent();
230 });
231 },
232 onCancel: () {
233 if (timer != null) timer.cancel();
234 timer = null;
235 });
236 return controller.stream;
237 }
238
239 /**
240 * Creates a stream where all events of an existing stream are piped through
241 * a sink-transformation.
242 *
243 * The given [mapSink] closure is invoked when the returned stream is
244 * listened to. All events from the [source] are added into the event sink
245 * that is returned from the invocation. The transformation puts all
246 * transformed events into the sink the [mapSink] closure received during
247 * its invocation. Conceptually the [mapSink] creates a transformation pipe
248 * with the input sink being the returned [EventSink] and the output sink
249 * being the sink it received.
250 *
251 * This constructor is frequently used to build transformers.
252 *
253 * Example use for a duplicating transformer:
254 *
255 * class DuplicationSink implements EventSink<String> {
256 * final EventSink<String> _outputSink;
257 * DuplicationSink(this._outputSink);
258 *
259 * void add(String data) {
260 * _outputSink.add(data);
261 * _outputSink.add(data);
262 * }
263 *
264 * void addError(e, [st]) { _outputSink.addError(e, st); }
265 * void close() { _outputSink.close(); }
266 * }
267 *
268 * class DuplicationTransformer implements StreamTransformer<String, Strin g> {
269 * // Some generic types ommitted for brevety.
270 * Stream bind(Stream stream) => new Stream<String>.eventTransformed(
271 * stream,
272 * (EventSink sink) => new DuplicationSink(sink));
273 * }
274 *
275 * stringStream.transform(new DuplicationTransformer());
276 *
277 * The resulting stream is a broadcast stream if [source] is.
278 */
279 factory Stream.eventTransformed(Stream source,
280 EventSink mapSink(EventSink<T> sink)) {
281 return new _BoundSinkStream(source, mapSink);
282 }
283
284 /**
285 * Reports whether this stream is a broadcast stream.
286 */
287 bool get isBroadcast => false;
288
289 /**
290 * Returns a multi-subscription stream that produces the same events as this.
291 *
292 * The returned stream will subscribe to this stream when its first
293 * subscriber is added, and will stay subscribed until this stream ends,
294 * or a callback cancels the subscription.
295 *
296 * If [onListen] is provided, it is called with a subscription-like object
297 * that represents the underlying subscription to this stream. It is
298 * possible to pause, resume or cancel the subscription during the call
299 * to [onListen]. It is not possible to change the event handlers, including
300 * using [StreamSubscription.asFuture].
301 *
302 * If [onCancel] is provided, it is called in a similar way to [onListen]
303 * when the returned stream stops having listener. If it later gets
304 * a new listener, the [onListen] function is called again.
305 *
306 * Use the callbacks, for example, for pausing the underlying subscription
307 * while having no subscribers to prevent losing events, or canceling the
308 * subscription when there are no listeners.
309 */
310 Stream<T> asBroadcastStream({
311 void onListen(StreamSubscription<T> subscription),
312 void onCancel(StreamSubscription<T> subscription) }) {
313 return new _AsBroadcastStream<T>(this, onListen, onCancel);
314 }
315
316 /**
317 * Adds a subscription to this stream.
318 *
319 * On each data event from this stream, the subscriber's [onData] handler
320 * is called. If [onData] is null, nothing happens.
321 *
322 * On errors from this stream, the [onError] handler is given a
323 * object describing the error.
324 *
325 * The [onError] callback must be of type `void onError(error)` or
326 * `void onError(error, StackTrace stackTrace)`. If [onError] accepts
327 * two arguments it is called with the stack trace (which could be `null` if
328 * the stream itself received an error without stack trace).
329 * Otherwise it is called with just the error object.
330 * If [onError] is omitted, any errors on the stream are considered unhandled,
331 * and will be passed to the current [Zone]'s error handler.
332 * By default unhandled async errors are treated
333 * as if they were uncaught top-level errors.
334 *
335 * If this stream closes, the [onDone] handler is called.
336 *
337 * If [cancelOnError] is true, the subscription is ended when
338 * the first error is reported. The default is false.
339 */
340 StreamSubscription<T> listen(void onData(T event),
341 { Function onError,
342 void onDone(),
343 bool cancelOnError});
344
345 /**
346 * Creates a new stream from this stream that discards some data events.
347 *
348 * The new stream sends the same error and done events as this stream,
349 * but it only sends the data events that satisfy the [test].
350 *
351 * The returned stream is a broadcast stream if this stream is.
352 * If a broadcast stream is listened to more than once, each subscription
353 * will individually perform the `test`.
354 */
355 Stream<T> where(bool test(T event)) {
356 return new _WhereStream<T>(this, test);
357 }
358
359 /**
360 * Creates a new stream that converts each element of this stream
361 * to a new value using the [convert] function.
362 *
363 * For each data event, `o`, in this stream, the returned stream
364 * provides a data event with the value `convert(o)`.
365 * If [convert] throws, the returned stream reports the exception as an error
366 * event instead.
367 *
368 * Error and done events are passed through unchanged to the returned stream.
369 *
370 * The returned stream is a broadcast stream if this stream is.
371 * The [convert] function is called once per data event per listener.
372 * If a broadcast stream is listened to more than once, each subscription
373 * will individually call [convert] on each data event.
374 */
375 Stream/*<S>*/ map/*<S>*/(/*=S*/ convert(T event)) {
376 return new _MapStream<T, dynamic/*=S*/>(this, convert);
377 }
378
379 /**
380 * Creates a new stream with each data event of this stream asynchronously
381 * mapped to a new event.
382 *
383 * This acts like [map], except that [convert] may return a [Future],
384 * and in that case, the stream waits for that future to complete before
385 * continuing with its result.
386 *
387 * The returned stream is a broadcast stream if this stream is.
388 */
389 Stream/*<E>*/ asyncMap/*<E>*/(convert(T event)) {
390 StreamController/*<E>*/ controller;
391 StreamSubscription/*<T>*/ subscription;
392
393 void onListen() {
394 final add = controller.add;
395 assert(controller is _StreamController ||
396 controller is _BroadcastStreamController);
397 final _EventSink/*<E>*/ eventSink =
398 controller as Object /*=_EventSink<E>*/;
399 final addError = eventSink._addError;
400 subscription = this.listen(
401 (T event) {
402 dynamic newValue;
403 try {
404 newValue = convert(event);
405 } catch (e, s) {
406 controller.addError(e, s);
407 return;
408 }
409 if (newValue is Future) {
410 subscription.pause();
411 newValue.then(add, onError: addError)
412 .whenComplete(subscription.resume);
413 } else {
414 controller.add(newValue as Object/*=E*/);
415 }
416 },
417 onError: addError,
418 onDone: controller.close
419 );
420 }
421
422 if (this.isBroadcast) {
423 controller = new StreamController/*<E>*/.broadcast(
424 onListen: onListen,
425 onCancel: () { subscription.cancel(); },
426 sync: true
427 );
428 } else {
429 controller = new StreamController/*<E>*/(
430 onListen: onListen,
431 onPause: () { subscription.pause(); },
432 onResume: () { subscription.resume(); },
433 onCancel: () { subscription.cancel(); },
434 sync: true
435 );
436 }
437 return controller.stream;
438 }
439
440 /**
441 * Creates a new stream with the events of a stream per original event.
442 *
443 * This acts like [expand], except that [convert] returns a [Stream]
444 * instead of an [Iterable].
445 * The events of the returned stream becomes the events of the returned
446 * stream, in the order they are produced.
447 *
448 * If [convert] returns `null`, no value is put on the output stream,
449 * just as if it returned an empty stream.
450 *
451 * The returned stream is a broadcast stream if this stream is.
452 */
453 Stream/*<E>*/ asyncExpand/*<E>*/(Stream/*<E>*/ convert(T event)) {
454 StreamController/*<E>*/ controller;
455 StreamSubscription<T> subscription;
456 void onListen() {
457 assert(controller is _StreamController ||
458 controller is _BroadcastStreamController);
459 final _EventSink/*<E>*/ eventSink =
460 controller as Object /*=_EventSink<E>*/;
461 subscription = this.listen(
462 (T event) {
463 Stream/*<E>*/ newStream;
464 try {
465 newStream = convert(event);
466 } catch (e, s) {
467 controller.addError(e, s);
468 return;
469 }
470 if (newStream != null) {
471 subscription.pause();
472 controller.addStream(newStream)
473 .whenComplete(subscription.resume);
474 }
475 },
476 onError: eventSink._addError, // Avoid Zone error replacement.
477 onDone: controller.close
478 );
479 }
480 if (this.isBroadcast) {
481 controller = new StreamController/*<E>*/.broadcast(
482 onListen: onListen,
483 onCancel: () { subscription.cancel(); },
484 sync: true
485 );
486 } else {
487 controller = new StreamController/*<E>*/(
488 onListen: onListen,
489 onPause: () { subscription.pause(); },
490 onResume: () { subscription.resume(); },
491 onCancel: () { subscription.cancel(); },
492 sync: true
493 );
494 }
495 return controller.stream;
496 }
497
498 /**
499 * Creates a wrapper Stream that intercepts some errors from this stream.
500 *
501 * If this stream sends an error that matches [test], then it is intercepted
502 * by the [handle] function.
503 *
504 * The [onError] callback must be of type `void onError(error)` or
505 * `void onError(error, StackTrace stackTrace)`. Depending on the function
506 * type the stream either invokes [onError] with or without a stack
507 * trace. The stack trace argument might be `null` if the stream itself
508 * received an error without stack trace.
509 *
510 * An asynchronous error [:e:] is matched by a test function if [:test(e):]
511 * returns true. If [test] is omitted, every error is considered matching.
512 *
513 * If the error is intercepted, the [handle] function can decide what to do
514 * with it. It can throw if it wants to raise a new (or the same) error,
515 * or simply return to make the stream forget the error.
516 *
517 * If you need to transform an error into a data event, use the more generic
518 * [Stream.transform] to handle the event by writing a data event to
519 * the output sink.
520 *
521 * The returned stream is a broadcast stream if this stream is.
522 * If a broadcast stream is listened to more than once, each subscription
523 * will individually perform the `test` and handle the error.
524 */
525 Stream<T> handleError(Function onError, { bool test(error) }) {
526 return new _HandleErrorStream<T>(this, onError, test);
527 }
528
529 /**
530 * Creates a new stream from this stream that converts each element
531 * into zero or more events.
532 *
533 * Each incoming event is converted to an [Iterable] of new events,
534 * and each of these new events are then sent by the returned stream
535 * in order.
536 *
537 * The returned stream is a broadcast stream if this stream is.
538 * If a broadcast stream is listened to more than once, each subscription
539 * will individually call `convert` and expand the events.
540 */
541 Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) {
542 return new _ExpandStream<T, dynamic/*=S*/>(this, convert);
543 }
544
545 /**
546 * Pipe the events of this stream into [streamConsumer].
547 *
548 * The events of this stream are added to `streamConsumer` using
549 * [StreamConsumer.addStream].
550 * The `streamConsumer` is closed when this stream has been successfully added
551 * to it - when the future returned by `addStream` completes without an error.
552 *
553 * Returns a future which completes when the stream has been consumed
554 * and the consumer has been closed.
555 *
556 * The returned future completes with the same result as the future returned
557 * by [StreamConsumer.close].
558 * If the adding of the stream itself fails in some way,
559 * then the consumer is expected to be closed, and won't be closed again.
560 * In that case the returned future completes with the error from calling
561 * `addStream`.
562 */
563 Future pipe(StreamConsumer<T> streamConsumer) {
564 return streamConsumer.addStream(this).then((_) => streamConsumer.close());
565 }
566
567 /**
568 * Chains this stream as the input of the provided [StreamTransformer].
569 *
570 * Returns the result of [:streamTransformer.bind:] itself.
571 *
572 * The `streamTransformer` can decide whether it wants to return a
573 * broadcast stream or not.
574 */
575 Stream/*<S>*/ transform/*<S>*/(
576 StreamTransformer<T, dynamic/*=S*/ > streamTransformer) {
577 return streamTransformer.bind(this);
578 }
579
580 /**
581 * Reduces a sequence of values by repeatedly applying [combine].
582 */
583 Future<T> reduce(T combine(T previous, T element)) {
584 _Future<T> result = new _Future<T>();
585 bool seenFirst = false;
586 T value;
587 StreamSubscription subscription;
588 subscription = this.listen(
589 (T element) {
590 if (seenFirst) {
591 _runUserCode(() => combine(value, element),
592 (T newValue) { value = newValue; },
593 _cancelAndErrorClosure(subscription, result));
594 } else {
595 value = element;
596 seenFirst = true;
597 }
598 },
599 onError: result._completeError,
600 onDone: () {
601 if (!seenFirst) {
602 try {
603 throw IterableElementError.noElement();
604 } catch (e, s) {
605 _completeWithErrorCallback(result, e, s);
606 }
607 } else {
608 result._complete(value);
609 }
610 },
611 cancelOnError: true
612 );
613 return result;
614 }
615
616 /** Reduces a sequence of values by repeatedly applying [combine]. */
617 Future/*<S>*/ fold/*<S>*/(var/*=S*/ initialValue,
618 /*=S*/ combine(var/*=S*/ previous, T element)) {
619
620 _Future/*<S>*/ result = new _Future/*<S>*/();
621 var/*=S*/ value = initialValue;
622 StreamSubscription subscription;
623 subscription = this.listen(
624 (T element) {
625 _runUserCode(
626 () => combine(value, element),
627 (/*=S*/ newValue) { value = newValue; },
628 _cancelAndErrorClosure(subscription, result)
629 );
630 },
631 onError: (e, st) {
632 result._completeError(e, st);
633 },
634 onDone: () {
635 result._complete(value);
636 },
637 cancelOnError: true);
638 return result;
639 }
640
641 /**
642 * Collects string of data events' string representations.
643 *
644 * If [separator] is provided, it is inserted between any two
645 * elements.
646 *
647 * Any error in the stream causes the future to complete with that
648 * error. Otherwise it completes with the collected string when
649 * the "done" event arrives.
650 */
651 Future<String> join([String separator = ""]) {
652 _Future<String> result = new _Future<String>();
653 StringBuffer buffer = new StringBuffer();
654 StreamSubscription subscription;
655 bool first = true;
656 subscription = this.listen(
657 (T element) {
658 if (!first) {
659 buffer.write(separator);
660 }
661 first = false;
662 try {
663 buffer.write(element);
664 } catch (e, s) {
665 _cancelAndErrorWithReplacement(subscription, result, e, s);
666 }
667 },
668 onError: (e) {
669 result._completeError(e);
670 },
671 onDone: () {
672 result._complete(buffer.toString());
673 },
674 cancelOnError: true);
675 return result;
676 }
677
678 /**
679 * Checks whether [needle] occurs in the elements provided by this stream.
680 *
681 * Completes the [Future] when the answer is known.
682 * If this stream reports an error, the [Future] will report that error.
683 */
684 Future<bool> contains(Object needle) {
685 _Future<bool> future = new _Future<bool>();
686 StreamSubscription subscription;
687 subscription = this.listen(
688 (T element) {
689 _runUserCode(
690 () => (element == needle),
691 (bool isMatch) {
692 if (isMatch) {
693 _cancelAndValue(subscription, future, true);
694 }
695 },
696 _cancelAndErrorClosure(subscription, future)
697 );
698 },
699 onError: future._completeError,
700 onDone: () {
701 future._complete(false);
702 },
703 cancelOnError: true);
704 return future;
705 }
706
707 /**
708 * Executes [action] on each data event of the stream.
709 *
710 * Completes the returned [Future] when all events of the stream
711 * have been processed. Completes the future with an error if the
712 * stream has an error event, or if [action] throws.
713 */
714 Future forEach(void action(T element)) {
715 _Future future = new _Future();
716 StreamSubscription subscription;
717 subscription = this.listen(
718 (T element) {
719 _runUserCode(
720 () => action(element),
721 (_) {},
722 _cancelAndErrorClosure(subscription, future)
723 );
724 },
725 onError: future._completeError,
726 onDone: () {
727 future._complete(null);
728 },
729 cancelOnError: true);
730 return future;
731 }
732
733 /**
734 * Checks whether [test] accepts all elements provided by this stream.
735 *
736 * Completes the [Future] when the answer is known.
737 * If this stream reports an error, the [Future] will report that error.
738 */
739 Future<bool> every(bool test(T element)) {
740 _Future<bool> future = new _Future<bool>();
741 StreamSubscription subscription;
742 subscription = this.listen(
743 (T element) {
744 _runUserCode(
745 () => test(element),
746 (bool isMatch) {
747 if (!isMatch) {
748 _cancelAndValue(subscription, future, false);
749 }
750 },
751 _cancelAndErrorClosure(subscription, future)
752 );
753 },
754 onError: future._completeError,
755 onDone: () {
756 future._complete(true);
757 },
758 cancelOnError: true);
759 return future;
760 }
761
762 /**
763 * Checks whether [test] accepts any element provided by this stream.
764 *
765 * Completes the [Future] when the answer is known.
766 *
767 * If this stream reports an error, the [Future] reports that error.
768 *
769 * Stops listening to the stream after the first matching element has been
770 * found.
771 *
772 * Internally the method cancels its subscription after this element. This
773 * means that single-subscription (non-broadcast) streams are closed and
774 * cannot be reused after a call to this method.
775 */
776 Future<bool> any(bool test(T element)) {
777 _Future<bool> future = new _Future<bool>();
778 StreamSubscription subscription;
779 subscription = this.listen(
780 (T element) {
781 _runUserCode(
782 () => test(element),
783 (bool isMatch) {
784 if (isMatch) {
785 _cancelAndValue(subscription, future, true);
786 }
787 },
788 _cancelAndErrorClosure(subscription, future)
789 );
790 },
791 onError: future._completeError,
792 onDone: () {
793 future._complete(false);
794 },
795 cancelOnError: true);
796 return future;
797 }
798
799
800 /** Counts the elements in the stream. */
801 Future<int> get length {
802 _Future<int> future = new _Future<int>();
803 int count = 0;
804 this.listen(
805 (_) { count++; },
806 onError: future._completeError,
807 onDone: () {
808 future._complete(count);
809 },
810 cancelOnError: true);
811 return future;
812 }
813
814 /**
815 * Reports whether this stream contains any elements.
816 *
817 * Stops listening to the stream after the first element has been received.
818 *
819 * Internally the method cancels its subscription after the first element.
820 * This means that single-subscription (non-broadcast) streams are closed and
821 * cannot be reused after a call to this getter.
822 */
823 Future<bool> get isEmpty {
824 _Future<bool> future = new _Future<bool>();
825 StreamSubscription subscription;
826 subscription = this.listen(
827 (_) {
828 _cancelAndValue(subscription, future, false);
829 },
830 onError: future._completeError,
831 onDone: () {
832 future._complete(true);
833 },
834 cancelOnError: true);
835 return future;
836 }
837
838 /** Collects the data of this stream in a [List]. */
839 Future<List<T>> toList() {
840 List<T> result = <T>[];
841 _Future<List<T>> future = new _Future<List<T>>();
842 this.listen(
843 (T data) {
844 result.add(data);
845 },
846 onError: future._completeError,
847 onDone: () {
848 future._complete(result);
849 },
850 cancelOnError: true);
851 return future;
852 }
853
854 /**
855 * Collects the data of this stream in a [Set].
856 *
857 * The returned set is the same type as returned by `new Set<T>()`.
858 * If another type of set is needed, either use [forEach] to add each
859 * element to the set, or use
860 * `toList().then((list) => new SomeOtherSet.from(list))`
861 * to create the set.
862 */
863 Future<Set<T>> toSet() {
864 Set<T> result = new Set<T>();
865 _Future<Set<T>> future = new _Future<Set<T>>();
866 this.listen(
867 (T data) {
868 result.add(data);
869 },
870 onError: future._completeError,
871 onDone: () {
872 future._complete(result);
873 },
874 cancelOnError: true);
875 return future;
876 }
877
878 /**
879 * Discards all data on the stream, but signals when it's done or an error
880 * occured.
881 *
882 * When subscribing using [drain], cancelOnError will be true. This means
883 * that the future will complete with the first error on the stream and then
884 * cancel the subscription.
885 *
886 * In case of a `done` event the future completes with the given
887 * [futureValue].
888 */
889 Future/*<E>*/ drain/*<E>*/([/*=E*/ futureValue])
890 => listen(null, cancelOnError: true).asFuture/*<E>*/(futureValue);
891
892 /**
893 * Provides at most the first [n] values of this stream.
894 *
895 * Forwards the first [n] data events of this stream, and all error
896 * events, to the returned stream, and ends with a done event.
897 *
898 * If this stream produces fewer than [count] values before it's done,
899 * so will the returned stream.
900 *
901 * Stops listening to the stream after the first [n] elements have been
902 * received.
903 *
904 * Internally the method cancels its subscription after these elements. This
905 * means that single-subscription (non-broadcast) streams are closed and
906 * cannot be reused after a call to this method.
907 *
908 * The returned stream is a broadcast stream if this stream is.
909 * For a broadcast stream, the events are only counted from the time
910 * the returned stream is listened to.
911 */
912 Stream<T> take(int count) {
913 return new _TakeStream<T>(this, count);
914 }
915
916 /**
917 * Forwards data events while [test] is successful.
918 *
919 * The returned stream provides the same events as this stream as long
920 * as [test] returns [:true:] for the event data. The stream is done
921 * when either this stream is done, or when this stream first provides
922 * a value that [test] doesn't accept.
923 *
924 * Stops listening to the stream after the accepted elements.
925 *
926 * Internally the method cancels its subscription after these elements. This
927 * means that single-subscription (non-broadcast) streams are closed and
928 * cannot be reused after a call to this method.
929 *
930 * The returned stream is a broadcast stream if this stream is.
931 * For a broadcast stream, the events are only tested from the time
932 * the returned stream is listened to.
933 */
934 Stream<T> takeWhile(bool test(T element)) {
935 return new _TakeWhileStream<T>(this, test);
936 }
937
938 /**
939 * Skips the first [count] data events from this stream.
940 *
941 * The returned stream is a broadcast stream if this stream is.
942 * For a broadcast stream, the events are only counted from the time
943 * the returned stream is listened to.
944 */
945 Stream<T> skip(int count) {
946 return new _SkipStream<T>(this, count);
947 }
948
949 /**
950 * Skip data events from this stream while they are matched by [test].
951 *
952 * Error and done events are provided by the returned stream unmodified.
953 *
954 * Starting with the first data event where [test] returns false for the
955 * event data, the returned stream will have the same events as this stream.
956 *
957 * The returned stream is a broadcast stream if this stream is.
958 * For a broadcast stream, the events are only tested from the time
959 * the returned stream is listened to.
960 */
961 Stream<T> skipWhile(bool test(T element)) {
962 return new _SkipWhileStream<T>(this, test);
963 }
964
965 /**
966 * Skips data events if they are equal to the previous data event.
967 *
968 * The returned stream provides the same events as this stream, except
969 * that it never provides two consecutive data events that are equal.
970 *
971 * Equality is determined by the provided [equals] method. If that is
972 * omitted, the '==' operator on the last provided data element is used.
973 *
974 * The returned stream is a broadcast stream if this stream is.
975 * If a broadcast stream is listened to more than once, each subscription
976 * will individually perform the `equals` test.
977 */
978 Stream<T> distinct([bool equals(T previous, T next)]) {
979 return new _DistinctStream<T>(this, equals);
980 }
981
982 /**
983 * Returns the first element of the stream.
984 *
985 * Stops listening to the stream after the first element has been received.
986 *
987 * Internally the method cancels its subscription after the first element.
988 * This means that single-subscription (non-broadcast) streams are closed
989 * and cannot be reused after a call to this getter.
990 *
991 * If an error event occurs before the first data event, the resulting future
992 * is completed with that error.
993 *
994 * If this stream is empty (a done event occurs before the first data event),
995 * the resulting future completes with a [StateError].
996 *
997 * Except for the type of the error, this method is equivalent to
998 * [:this.elementAt(0):].
999 */
1000 Future<T> get first {
1001 _Future<T> future = new _Future<T>();
1002 StreamSubscription subscription;
1003 subscription = this.listen(
1004 (T value) {
1005 _cancelAndValue(subscription, future, value);
1006 },
1007 onError: future._completeError,
1008 onDone: () {
1009 try {
1010 throw IterableElementError.noElement();
1011 } catch (e, s) {
1012 _completeWithErrorCallback(future, e, s);
1013 }
1014 },
1015 cancelOnError: true);
1016 return future;
1017 }
1018
1019 /**
1020 * Returns the last element of the stream.
1021 *
1022 * If an error event occurs before the first data event, the resulting future
1023 * is completed with that error.
1024 *
1025 * If this stream is empty (a done event occurs before the first data event),
1026 * the resulting future completes with a [StateError].
1027 */
1028 Future<T> get last {
1029 _Future<T> future = new _Future<T>();
1030 T result = null;
1031 bool foundResult = false;
1032 listen(
1033 (T value) {
1034 foundResult = true;
1035 result = value;
1036 },
1037 onError: future._completeError,
1038 onDone: () {
1039 if (foundResult) {
1040 future._complete(result);
1041 return;
1042 }
1043 try {
1044 throw IterableElementError.noElement();
1045 } catch (e, s) {
1046 _completeWithErrorCallback(future, e, s);
1047 }
1048 },
1049 cancelOnError: true);
1050 return future;
1051 }
1052
1053 /**
1054 * Returns the single element.
1055 *
1056 * If an error event occurs before or after the first data event, the
1057 * resulting future is completed with that error.
1058 *
1059 * If [this] is empty or has more than one element throws a [StateError].
1060 */
1061 Future<T> get single {
1062 _Future<T> future = new _Future<T>();
1063 T result = null;
1064 bool foundResult = false;
1065 StreamSubscription subscription;
1066 subscription = this.listen(
1067 (T value) {
1068 if (foundResult) {
1069 // This is the second element we get.
1070 try {
1071 throw IterableElementError.tooMany();
1072 } catch (e, s) {
1073 _cancelAndErrorWithReplacement(subscription, future, e, s);
1074 }
1075 return;
1076 }
1077 foundResult = true;
1078 result = value;
1079 },
1080 onError: future._completeError,
1081 onDone: () {
1082 if (foundResult) {
1083 future._complete(result);
1084 return;
1085 }
1086 try {
1087 throw IterableElementError.noElement();
1088 } catch (e, s) {
1089 _completeWithErrorCallback(future, e, s);
1090 }
1091 },
1092 cancelOnError: true);
1093 return future;
1094 }
1095
1096 /**
1097 * Finds the first element of this stream matching [test].
1098 *
1099 * Returns a future that is filled with the first element of this stream
1100 * that [test] returns true for.
1101 *
1102 * If no such element is found before this stream is done, and a
1103 * [defaultValue] function is provided, the result of calling [defaultValue]
1104 * becomes the value of the future.
1105 *
1106 * Stops listening to the stream after the first matching element has been
1107 * received.
1108 *
1109 * Internally the method cancels its subscription after the first element that
1110 * matches the predicate. This means that single-subscription (non-broadcast)
1111 * streams are closed and cannot be reused after a call to this method.
1112 *
1113 * If an error occurs, or if this stream ends without finding a match and
1114 * with no [defaultValue] function provided, the future will receive an
1115 * error.
1116 */
1117 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) {
1118 _Future<dynamic> future = new _Future();
1119 StreamSubscription subscription;
1120 subscription = this.listen(
1121 (T value) {
1122 _runUserCode(
1123 () => test(value),
1124 (bool isMatch) {
1125 if (isMatch) {
1126 _cancelAndValue(subscription, future, value);
1127 }
1128 },
1129 _cancelAndErrorClosure(subscription, future)
1130 );
1131 },
1132 onError: future._completeError,
1133 onDone: () {
1134 if (defaultValue != null) {
1135 _runUserCode(defaultValue, future._complete, future._completeError);
1136 return;
1137 }
1138 try {
1139 throw IterableElementError.noElement();
1140 } catch (e, s) {
1141 _completeWithErrorCallback(future, e, s);
1142 }
1143 },
1144 cancelOnError: true);
1145 return future;
1146 }
1147
1148 /**
1149 * Finds the last element in this stream matching [test].
1150 *
1151 * As [firstWhere], except that the last matching element is found.
1152 * That means that the result cannot be provided before this stream
1153 * is done.
1154 */
1155 Future<dynamic> lastWhere(bool test(T element), {Object defaultValue()}) {
1156 _Future<dynamic> future = new _Future();
1157 T result = null;
1158 bool foundResult = false;
1159 StreamSubscription subscription;
1160 subscription = this.listen(
1161 (T value) {
1162 _runUserCode(
1163 () => true == test(value),
1164 (bool isMatch) {
1165 if (isMatch) {
1166 foundResult = true;
1167 result = value;
1168 }
1169 },
1170 _cancelAndErrorClosure(subscription, future)
1171 );
1172 },
1173 onError: future._completeError,
1174 onDone: () {
1175 if (foundResult) {
1176 future._complete(result);
1177 return;
1178 }
1179 if (defaultValue != null) {
1180 _runUserCode(defaultValue, future._complete, future._completeError);
1181 return;
1182 }
1183 try {
1184 throw IterableElementError.noElement();
1185 } catch (e, s) {
1186 _completeWithErrorCallback(future, e, s);
1187 }
1188 },
1189 cancelOnError: true);
1190 return future;
1191 }
1192
1193 /**
1194 * Finds the single element in this stream matching [test].
1195 *
1196 * Like [lastMatch], except that it is an error if more than one
1197 * matching element occurs in the stream.
1198 */
1199 Future<T> singleWhere(bool test(T element)) {
1200 _Future<T> future = new _Future<T>();
1201 T result = null;
1202 bool foundResult = false;
1203 StreamSubscription subscription;
1204 subscription = this.listen(
1205 (T value) {
1206 _runUserCode(
1207 () => true == test(value),
1208 (bool isMatch) {
1209 if (isMatch) {
1210 if (foundResult) {
1211 try {
1212 throw IterableElementError.tooMany();
1213 } catch (e, s) {
1214 _cancelAndErrorWithReplacement(subscription, future, e, s);
1215 }
1216 return;
1217 }
1218 foundResult = true;
1219 result = value;
1220 }
1221 },
1222 _cancelAndErrorClosure(subscription, future)
1223 );
1224 },
1225 onError: future._completeError,
1226 onDone: () {
1227 if (foundResult) {
1228 future._complete(result);
1229 return;
1230 }
1231 try {
1232 throw IterableElementError.noElement();
1233 } catch (e, s) {
1234 _completeWithErrorCallback(future, e, s);
1235 }
1236 },
1237 cancelOnError: true);
1238 return future;
1239 }
1240
1241 /**
1242 * Returns the value of the [index]th data event of this stream.
1243 *
1244 * Stops listening to the stream after the [index]th data event has been
1245 * received.
1246 *
1247 * Internally the method cancels its subscription after these elements. This
1248 * means that single-subscription (non-broadcast) streams are closed and
1249 * cannot be reused after a call to this method.
1250 *
1251 * If an error event occurs before the value is found, the future completes
1252 * with this error.
1253 *
1254 * If a done event occurs before the value is found, the future completes
1255 * with a [RangeError].
1256 */
1257 Future<T> elementAt(int index) {
1258 if (index is! int || index < 0) throw new ArgumentError(index);
1259 _Future<T> future = new _Future<T>();
1260 StreamSubscription subscription;
1261 int elementIndex = 0;
1262 subscription = this.listen(
1263 (T value) {
1264 if (index == elementIndex) {
1265 _cancelAndValue(subscription, future, value);
1266 return;
1267 }
1268 elementIndex += 1;
1269 },
1270 onError: future._completeError,
1271 onDone: () {
1272 future._completeError(
1273 new RangeError.index(index, this, "index", null, elementIndex));
1274 },
1275 cancelOnError: true);
1276 return future;
1277 }
1278
1279 /**
1280 * Creates a new stream with the same events as this stream.
1281 *
1282 * Whenever more than [timeLimit] passes between two events from this stream,
1283 * the [onTimeout] function is called.
1284 *
1285 * The countdown doesn't start until the returned stream is listened to.
1286 * The countdown is reset every time an event is forwarded from this stream,
1287 * or when the stream is paused and resumed.
1288 *
1289 * The [onTimeout] function is called with one argument: an
1290 * [EventSink] that allows putting events into the returned stream.
1291 * This `EventSink` is only valid during the call to `onTimeout`.
1292 *
1293 * If `onTimeout` is omitted, a timeout will just put a [TimeoutException]
1294 * into the error channel of the returned stream.
1295 *
1296 * The returned stream is a broadcast stream if this stream is.
1297 * If a broadcast stream is listened to more than once, each subscription
1298 * will have its individually timer that starts counting on listen,
1299 * and the subscriptions' timers can be paused individually.
1300 */
1301 Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)}) {
1302 StreamController<T> controller;
1303 // The following variables are set on listen.
1304 StreamSubscription<T> subscription;
1305 Timer timer;
1306 Zone zone;
1307 _TimerCallback timeout;
1308
1309 void onData(T event) {
1310 timer.cancel();
1311 controller.add(event);
1312 timer = zone.createTimer(timeLimit, timeout);
1313 }
1314 void onError(error, StackTrace stackTrace) {
1315 timer.cancel();
1316 assert(controller is _StreamController ||
1317 controller is _BroadcastStreamController);
1318 dynamic eventSink = controller;
1319 eventSink._addError(error, stackTrace); // Avoid Zone error replacement.
1320 timer = zone.createTimer(timeLimit, timeout);
1321 }
1322 void onDone() {
1323 timer.cancel();
1324 controller.close();
1325 }
1326 void onListen() {
1327 // This is the onListen callback for of controller.
1328 // It runs in the same zone that the subscription was created in.
1329 // Use that zone for creating timers and running the onTimeout
1330 // callback.
1331 zone = Zone.current;
1332 if (onTimeout == null) {
1333 timeout = () {
1334 controller.addError(new TimeoutException("No stream event",
1335 timeLimit), null);
1336 };
1337 } else {
1338 // TODO(floitsch): the return type should be 'void', and the type
1339 // should be inferred.
1340 var registeredOnTimeout =
1341 zone.registerUnaryCallback/*<dynamic, EventSink<T>>*/(onTimeout);
1342 _ControllerEventSinkWrapper wrapper =
1343 new _ControllerEventSinkWrapper(null);
1344 timeout = () {
1345 wrapper._sink = controller; // Only valid during call.
1346 zone.runUnaryGuarded(registeredOnTimeout, wrapper);
1347 wrapper._sink = null;
1348 };
1349 }
1350
1351 subscription = this.listen(onData, onError: onError, onDone: onDone);
1352 timer = zone.createTimer(timeLimit, timeout);
1353 }
1354 Future onCancel() {
1355 timer.cancel();
1356 Future result = subscription.cancel();
1357 subscription = null;
1358 return result;
1359 }
1360 controller = isBroadcast
1361 ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
1362 : new _SyncStreamController<T>(
1363 onListen,
1364 () {
1365 // Don't null the timer, onCancel may call cancel again.
1366 timer.cancel();
1367 subscription.pause();
1368 },
1369 () {
1370 subscription.resume();
1371 timer = zone.createTimer(timeLimit, timeout);
1372 },
1373 onCancel);
1374 return controller.stream;
1375 }
1376 }
1377
1378 /**
1379 * A subscription on events from a [Stream].
1380 *
1381 * When you listen on a [Stream] using [Stream.listen],
1382 * a [StreamSubscription] object is returned.
1383 *
1384 * The subscription provides events to the listener,
1385 * and holds the callbacks used to handle the events.
1386 * The subscription can also be used to unsubscribe from the events,
1387 * or to temporarily pause the events from the stream.
1388 */
1389 abstract class StreamSubscription<T> {
1390 /**
1391 * Cancels this subscription.
1392 *
1393 * After this call, the subscription no longer receives events.
1394 *
1395 * The stream may need to shut down the source of events and clean up after
1396 * the subscription is canceled.
1397 *
1398 * Returns a future that is completed once the stream has finished
1399 * its cleanup. May also return `null` if no cleanup was necessary.
1400 *
1401 * Typically, futures are returned when the stream needs to release resources.
1402 * For example, a stream might need to close an open file (as an asynchronous
1403 * operation). If the listener wants to delete the file after having
1404 * canceled the subscription, it must wait for the cleanup future to complete.
1405 *
1406 * A returned future completes with a `null` value.
1407 * If the cleanup throws, which it really shouldn't, the returned future
1408 * completes with that error.
1409 */
1410 Future cancel();
1411
1412 /**
1413 * Set or override the data event handler of this subscription.
1414 *
1415 * This method overrides the handler that has been set at the invocation of
1416 * [Stream.listen].
1417 */
1418 void onData(void handleData(T data));
1419
1420 /**
1421 * Set or override the error event handler of this subscription.
1422 *
1423 * This method overrides the handler that has been set at the invocation of
1424 * [Stream.listen] or by calling [asFuture].
1425 */
1426 void onError(Function handleError);
1427
1428 /**
1429 * Set or override the done event handler of this subscription.
1430 *
1431 * This method overrides the handler that has been set at the invocation of
1432 * [Stream.listen] or by calling [asFuture].
1433 */
1434 void onDone(void handleDone());
1435
1436 /**
1437 * Request that the stream pauses events until further notice.
1438 *
1439 * While paused, the subscription will not fire any events.
1440 * If it receives events from its source, they will be buffered until
1441 * the subscription is resumed.
1442 * The underlying source is usually informed about the pause,
1443 * so it can stop generating events until the subscription is resumed.
1444 *
1445 * To avoid buffering events on a broadcast stream, it is better to
1446 * cancel this subscription, and start to listen again when events
1447 * are needed.
1448 *
1449 * If [resumeSignal] is provided, the stream will undo the pause
1450 * when the future completes. If the future completes with an error,
1451 * the stream will resume, but the error will not be handled!
1452 *
1453 * A call to [resume] will also undo a pause.
1454 *
1455 * If the subscription is paused more than once, an equal number
1456 * of resumes must be performed to resume the stream.
1457 *
1458 * Currently DOM streams silently drop events when the stream is paused. This
1459 * is a bug and will be fixed.
1460 */
1461 void pause([Future resumeSignal]);
1462
1463 /**
1464 * Resume after a pause.
1465 */
1466 void resume();
1467
1468 /**
1469 * Returns true if the [StreamSubscription] is paused.
1470 */
1471 bool get isPaused;
1472
1473 /**
1474 * Returns a future that handles the [onDone] and [onError] callbacks.
1475 *
1476 * This method *overwrites* the existing [onDone] and [onError] callbacks
1477 * with new ones that complete the returned future.
1478 *
1479 * In case of an error the subscription will automatically cancel (even
1480 * when it was listening with `cancelOnError` set to `false`).
1481 *
1482 * In case of a `done` event the future completes with the given
1483 * [futureValue].
1484 */
1485 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]);
1486 }
1487
1488
1489 /**
1490 * An interface that abstracts creation or handling of [Stream] events.
1491 */
1492 abstract class EventSink<T> implements Sink<T> {
1493 /** Send a data event to a stream. */
1494 void add(T event);
1495
1496 /** Send an async error to a stream. */
1497 void addError(errorEvent, [StackTrace stackTrace]);
1498
1499 /** Close the sink. No further events can be added after closing. */
1500 void close();
1501 }
1502
1503
1504 /** [Stream] wrapper that only exposes the [Stream] interface. */
1505 class StreamView<T> extends Stream<T> {
1506 final Stream<T> _stream;
1507
1508 const StreamView(Stream<T> stream) : _stream = stream, super._internal();
1509
1510 bool get isBroadcast => _stream.isBroadcast;
1511
1512 Stream<T> asBroadcastStream(
1513 {void onListen(StreamSubscription<T> subscription),
1514 void onCancel(StreamSubscription<T> subscription)})
1515 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel);
1516
1517 StreamSubscription<T> listen(void onData(T value),
1518 { Function onError,
1519 void onDone(),
1520 bool cancelOnError }) {
1521 return _stream.listen(onData, onError: onError, onDone: onDone,
1522 cancelOnError: cancelOnError);
1523 }
1524 }
1525
1526
1527 /**
1528 * Abstract interface for a "sink" accepting multiple entire streams.
1529 *
1530 * A consumer can accept a number of consecutive streams using [addStream],
1531 * and when no further data need to be added, the [close] method tells the
1532 * consumer to complete its work and shut down.
1533 *
1534 * This class is not just a [Sink<Stream>] because it is also combined with
1535 * other [Sink] classes, like it's combined with [EventSink] in the
1536 * [StreamSink] class.
1537 *
1538 * The [Stream.pipe] accepts a `StreamConsumer` and will pass the stream
1539 * to the consumer's [addStream] method. When that completes, it will
1540 * call [close] and then complete its own returned future.
1541 */
1542 abstract class StreamConsumer<S> {
1543 /**
1544 * Consumes the elements of [stream].
1545 *
1546 * Listens on [stream] and does something for each event.
1547 *
1548 * Returns a future which is completed when the stream is done being added,
1549 * and the consumer is ready to accept a new stream.
1550 * No further calls to [addStream] or [close] should happen before the
1551 * returned future has completed.
1552 *
1553 * The consumer may stop listening to the stream after an error,
1554 * it may consume all the errors and only stop at a done event,
1555 * or it may be canceled early if the receiver don't want any further events.
1556 *
1557 * If the consumer stops listening because of some error preventing it
1558 * from continuing, it may report this error in the returned future,
1559 * otherwise it will just complete the future with `null`.
1560 */
1561 Future addStream(Stream<S> stream);
1562
1563 /**
1564 * Tells the consumer that no further streams will be added.
1565 *
1566 * This allows the consumer to complete any remaining work and release
1567 * resources that are no longer needed
1568 *
1569 * Returns a future which is completed when the consumer has shut down.
1570 * If cleaning up can fail, the error may be reported in the returned future,
1571 * otherwise it completes with `null`.
1572 */
1573 Future close();
1574 }
1575
1576
1577 /**
1578 * A object that accepts stream events both synchronously and asynchronously.
1579 *
1580 * A [StreamSink] unifies the asynchronous methods from [StreamConsumer] and
1581 * the synchronous methods from [EventSink].
1582 *
1583 * The [EventSink] methods can't be used while the [addStream] is called.
1584 * As soon as the [addStream]'s [Future] completes with a value, the
1585 * [EventSink] methods can be used again.
1586 *
1587 * If [addStream] is called after any of the [EventSink] methods, it'll
1588 * be delayed until the underlying system has consumed the data added by the
1589 * [EventSink] methods.
1590 *
1591 * When [EventSink] methods are used, the [done] [Future] can be used to
1592 * catch any errors.
1593 *
1594 * When [close] is called, it will return the [done] [Future].
1595 */
1596 abstract class StreamSink<S> implements EventSink<S>, StreamConsumer<S> {
1597 /**
1598 * Tells the stream sink that no further streams will be added.
1599 *
1600 * This allows the stream sink to complete any remaining work and release
1601 * resources that are no longer needed
1602 *
1603 * Returns a future which is completed when the stream sink has shut down.
1604 * If cleaning up can fail, the error may be reported in the returned future,
1605 * otherwise it completes with `null`.
1606 *
1607 * Returns the same future as [done].
1608 *
1609 * The stream sink may close before the [close] method is called, either due
1610 * to an error or because it is itself providing events to someone who has
1611 * stopped listening. In that case, the [done] future is completed first,
1612 * and the `close` method will return the `done` future when called.
1613 *
1614 * Unifies [StreamConsumer.close] and [EventSink.close] which both mark their
1615 * object as not expecting any further events.
1616 */
1617 Future close();
1618
1619 /**
1620 * Return a future which is completed when the [StreamSink] is finished.
1621 *
1622 * If the `StreamSink` fails with an error,
1623 * perhaps in response to adding events using [add], [addError] or [close],
1624 * the [done] future will complete with that error.
1625 *
1626 * Otherwise, the returned future will complete when either:
1627 *
1628 * * all events have been processed and the sink has been closed, or
1629 * * the sink has otherwise been stopped from handling more events
1630 * (for example by cancelling a stream subscription).
1631 */
1632 Future get done;
1633 }
1634
1635
1636 /**
1637 * The target of a [Stream.transform] call.
1638 *
1639 * The [Stream.transform] call will pass itself to this object and then return
1640 * the resulting stream.
1641 *
1642 * It is good practice to write transformers that can be used multiple times.
1643 */
1644 abstract class StreamTransformer<S, T> {
1645 /**
1646 * Creates a [StreamTransformer].
1647 *
1648 * The returned instance takes responsibility of implementing ([bind]).
1649 * When the user invokes `bind` it returns a new "bound" stream. Only when
1650 * the user starts listening to the bound stream, the `listen` method
1651 * invokes the given closure [transformer].
1652 *
1653 * The [transformer] closure receives the stream, that was bound, as argument
1654 * and returns a [StreamSubscription]. In almost all cases the closure
1655 * listens itself to the stream that is given as argument.
1656 *
1657 * The result of invoking the [transformer] closure is a [StreamSubscription].
1658 * The bound stream-transformer (created by the `bind` method above) then sets
1659 * the handlers it received as part of the `listen` call.
1660 *
1661 * Conceptually this can be summarized as follows:
1662 *
1663 * 1. `var transformer = new StreamTransformer(transformerClosure);`
1664 * creates a `StreamTransformer` that supports the `bind` method.
1665 * 2. `var boundStream = stream.transform(transformer);` binds the `stream`
1666 * and returns a bound stream that has a pointer to `stream`.
1667 * 3. `boundStream.listen(f1, onError: f2, onDone: f3, cancelOnError: b)`
1668 * starts the listening and transformation. This is accomplished
1669 * in 2 steps: first the `boundStream` invokes the `transformerClosure` with
1670 * the `stream` it captured: `transformerClosure(stream, b)`.
1671 * The result `subscription`, a [StreamSubscription], is then
1672 * updated to receive its handlers: `subscription.onData(f1)`,
1673 * `subscription.onError(f2)`, `subscription(f3)`. Finally the subscription
1674 * is returned as result of the `listen` call.
1675 *
1676 * There are two common ways to create a StreamSubscription:
1677 *
1678 * 1. by creating a new class that implements [StreamSubscription].
1679 * Note that the subscription should run callbacks in the [Zone] the
1680 * stream was listened to.
1681 * 2. by allocating a [StreamController] and to return the result of
1682 * listening to its stream.
1683 *
1684 * Example use of a duplicating transformer:
1685 *
1686 * stringStream.transform(new StreamTransformer<String, String>(
1687 * (Stream<String> input, bool cancelOnError) {
1688 * StreamController<String> controller;
1689 * StreamSubscription<String> subscription;
1690 * controller = new StreamController<String>(
1691 * onListen: () {
1692 * subscription = input.listen((data) {
1693 * // Duplicate the data.
1694 * controller.add(data);
1695 * controller.add(data);
1696 * },
1697 * onError: controller.addError,
1698 * onDone: controller.close,
1699 * cancelOnError: cancelOnError);
1700 * },
1701 * onPause: () { subscription.pause(); },
1702 * onResume: () { subscription.resume(); },
1703 * onCancel: () { subscription.cancel(); },
1704 * sync: true);
1705 * return controller.stream.listen(null);
1706 * });
1707 */
1708 const factory StreamTransformer(
1709 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError))
1710 = _StreamSubscriptionTransformer<S, T>;
1711
1712 /**
1713 * Creates a [StreamTransformer] that delegates events to the given functions.
1714 *
1715 * Example use of a duplicating transformer:
1716 *
1717 * stringStream.transform(new StreamTransformer<String, String>.fromHandle rs(
1718 * handleData: (String value, EventSink<String> sink) {
1719 * sink.add(value);
1720 * sink.add(value); // Duplicate the incoming events.
1721 * }));
1722 */
1723 factory StreamTransformer.fromHandlers({
1724 void handleData(S data, EventSink<T> sink),
1725 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
1726 void handleDone(EventSink<T> sink)})
1727 = _StreamHandlerTransformer<S, T>;
1728
1729 /**
1730 * Transform the incoming [stream]'s events.
1731 *
1732 * Creates a new stream.
1733 * When this stream is listened to, it will start listening on [stream],
1734 * and generate events on the new stream based on the events from [stream].
1735 *
1736 * Subscriptions on the returned stream should propagate pause state
1737 * to the subscription on [stream].
1738 */
1739 Stream<T> bind(Stream<S> stream);
1740 }
1741
1742 /**
1743 * An [Iterator] like interface for the values of a [Stream].
1744 *
1745 * This wraps a [Stream] and a subscription on the stream. It listens
1746 * on the stream, and completes the future returned by [moveNext] when the
1747 * next value becomes available.
1748 */
1749 abstract class StreamIterator<T> {
1750
1751 /** Create a [StreamIterator] on [stream]. */
1752 factory StreamIterator(Stream<T> stream)
1753 // TODO(lrn): use redirecting factory constructor when type
1754 // arguments are supported.
1755 => new _StreamIteratorImpl<T>(stream);
1756
1757 /**
1758 * Wait for the next stream value to be available.
1759 *
1760 * Returns a future which will complete with either `true` or `false`.
1761 * Completing with `true` means that another event has been received and
1762 * can be read as [current].
1763 * Completing with `false` means that the stream itearation is done and
1764 * no further events will ever be available.
1765 * The future may complete with an error, if the stream produces an error,
1766 * which also ends iteration.
1767 *
1768 * The function must not be called again until the future returned by a
1769 * previous call is completed.
1770 */
1771 Future<bool> moveNext();
1772
1773 /**
1774 * The current value of the stream.
1775 *
1776 * Is `null` before the first call to [moveNext] and after a call to
1777 * `moveNext` completes with a `false` result or an error.
1778 *
1779 * When a `moveNext` call completes with `true`, the `current` field holds
1780 * the most recent event of the stream, and it stays like that until the next
1781 * call to `moveNext`.
1782 * Between a call to `moveNext` and when its returned future completes,
1783 * the value is unspecified.
1784 */
1785 T get current;
1786
1787 /**
1788 * Cancels the stream iterator (and the underlying stream subscription) early.
1789 *
1790 * The stream iterator is automatically canceled if the [moveNext] future
1791 * completes with either `false` or an error.
1792 *
1793 * If you need to stop listening for values before the stream iterator is
1794 * automatically closed, you must call [cancel] to ensure that the stream
1795 * is properly closed.
1796 *
1797 * If [moveNext] has been called when the iterator is cancelled,
1798 * its returned future will complete with `false` as value,
1799 * as will all further calls to [moveNext].
1800 *
1801 * Returns a future if the cancel-operation is not completed synchronously.
1802 * Otherwise returns `null`.
1803 */
1804 Future cancel();
1805 }
1806
1807
1808 /**
1809 * Wraps an [_EventSink] so it exposes only the [EventSink] interface.
1810 */
1811 class _ControllerEventSinkWrapper<T> implements EventSink<T> {
1812 EventSink _sink;
1813 _ControllerEventSinkWrapper(this._sink);
1814
1815 void add(T data) { _sink.add(data); }
1816 void addError(error, [StackTrace stackTrace]) {
1817 _sink.addError(error, stackTrace);
1818 }
1819 void close() { _sink.close(); }
1820 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698