Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(331)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 11783009: Big merge from experimental to bleeding edge. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/signal.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 // part of dart.async;
6
7 // -------------------------------------------------------------------
8 // Core Stream types
9 // -------------------------------------------------------------------
10
11 abstract class Stream<T> {
12 Stream();
13
14 factory Stream.fromFuture(Future<T> future) {
15 var controller = new StreamController<T>();
16 future.then((value) {
17 controller.add(value);
18 controller.close();
19 },
20 onError: (error) {
21 controller.signalError(error);
22 controller.close();
23 });
24 return controller.stream;
25 }
26
27 /**
28 * Stream that outputs events from the [sources] in cyclic order.
29 *
30 * The merged streams are paused and resumed in order to ensure the proper
31 * order of output events.
32 */
33 factory Stream.cyclic(Iterable<Stream> sources) = CyclicScheduleStream<T>;
34
35 /**
36 * Create a stream that forwards data from the highest priority active source.
37 *
38 * Sources are provided in order of increasing priority, and only data from
39 * the highest priority source stream that has provided data are output
40 * on the created stream.
41 *
42 * Errors from the most recent active stream, and any higher priority stream,
43 * are forwarded to the created stream.
44 *
45 * If a higher priority source stream completes without providing data,
46 * it will have no effect on lower priority streams.
47 */
48 factory Stream.superceding(Iterable<Stream<T>> sources) = SupercedeStream<T>;
49
50 /**
51 * Add a subscription to this stream.
52 *
53 * On each data event from this stream, the subscribers [onData] handler
54 * is called. If [onData] is null, nothing happens.
55 *
56 * On errors from this stream, the [onError] handler is given a
57 * [AsyncError] object describing the error.
58 *
59 * If this stream closes, the [onDone] handler is called.
60 *
61 * If [unsubscribeOnError] is true, the subscription is ended when
62 * the first error is reported. The default is false.
63 */
64 StreamSubscription<T> listen(void onData(T event),
65 { void onError(AsyncError error),
66 void onDone(),
67 bool unsubscribeOnError});
68
69 /**
70 * Creates a new stream from this stream that discards some data events.
71 *
72 * The new stream sends the same error and done events as this stream,
73 * but it only sends the data events that satisfy the [test].
74 */
75 Stream<T> where(bool test(T event)) {
76 return this.transform(new WhereStream<T>(test));
77 }
78
79 /**
80 * Create a new stream that converts each element of this stream
81 * to a new value using the [convert] function.
82 */
83 Stream mappedBy(convert(T event)) {
84 return this.transform(new MapStream<T, dynamic>(convert));
85 }
86
87 /**
88 * Create a wrapper Stream that intercepts some errors from this stream.
89 *
90 * If the handler returns null, the error is considered handled.
91 * Otherwise the returned [AsyncError] is passed to the subscribers
92 * of the stream.
93 */
94 Stream handleError(AsyncError handle(AsyncError error)) {
95 return this.transform(new HandleErrorStream<T>(handle));
96 }
97
98 /**
99 * Create a new stream from this stream that converts each element
100 * into zero or more events.
101 *
102 * Each incoming event is converted to an [Iterable] of new events,
103 * and each of these new events are then sent by the returned stream
104 * in order.
105 */
106 Stream expand(Iterable convert(T value)) {
107 return this.transform(new ExpandStream<T, dynamic>(convert));
108 }
109
110 /**
111 * Bind this stream as the input of the provided [StreamConsumer].
112 */
113 Future pipe(StreamConsumer<dynamic, T> streamConsumer) {
114 return streamConsumer.consume(this);
115 }
116
117 /**
118 * Chain this stream as the input of the provided [StreamTransformer].
119 *
120 * Returns the result of [:streamTransformer.bind:] itself.
121 */
122 Stream transform(StreamTransformer<T, dynamic> streamTransformer) {
123 return streamTransformer.bind(this);
124 }
125
126
127 /** Reduces a sequence of values by repeatedly applying [combine]. */
128 Future reduce(var initialValue, combine(var previous, T element)) {
129 Completer completer = new Completer();
130 var value = initialValue;
131 StreamSubscription subscription;
132 subscription = this.listen(
133 (T element) {
134 try {
135 value = combine(value, element);
136 } catch (e, s) {
137 subscription.cancel();
138 completer.completeError(e, s);
139 }
140 },
141 onError: (AsyncError e) {
142 completer.completeError(e.error, e.stackTrace);
143 },
144 onDone: () {
145 completer.complete(value);
146 },
147 unsubscribeOnError: true);
148 return completer.future;
149 }
150
151 // Deprecated method, previously called 'pipe', retained for compatibility.
152 Signal pipeInto(Sink<T> sink,
153 {void onError(AsyncError error),
154 bool unsubscribeOnError}) {
155 SignalCompleter completer = new SignalCompleter();
156 this.listen(
157 sink.add,
158 onError: onError,
159 onDone: () {
160 sink.close();
161 completer.complete();
162 },
163 unsubscribeOnError: unsubscribeOnError);
164 return completer.signal;
165 }
166
167
168 /**
169 * Check whether [match] occurs in the elements provided by this stream.
170 *
171 * Completes the [Future] when the answer is known.
172 * If this stream reports an error, the [Future] will report that error.
173 */
174 Future<bool> contains(T match) {
175 _FutureImpl<bool> future = new _FutureImpl<bool>();
176 StreamSubscription subscription;
177 subscription = this.listen(
178 (T element) {
179 if (element == match) {
180 subscription.cancel();
181 future._setValue(true);
182 }
183 },
184 onError: future._setError,
185 onDone: () {
186 future._setValue(false);
187 },
188 unsubscribeOnError: true);
189 return future;
190 }
191
192 /**
193 * Check whether [test] accepts all elements provided by this stream.
194 *
195 * Completes the [Future] when the answer is known.
196 * If this stream reports an error, the [Future] will report that error.
197 */
198 Future<bool> every(bool test(T element)) {
199 _FutureImpl<bool> future = new _FutureImpl<bool>();
200 StreamSubscription subscription;
201 subscription = this.listen(
202 (T element) {
203 if (!test(element)) {
204 subscription.cancel();
205 future._setValue(false);
206 }
207 },
208 onError: future._setError,
209 onDone: () {
210 future._setValue(true);
211 },
212 unsubscribeOnError: true);
213 return future;
214 }
215
216 /**
217 * Check whether [test] accepts any element provided by this stream.
218 *
219 * Completes the [Future] when the answer is known.
220 * If this stream reports an error, the [Future] will report that error.
221 */
222 Future<bool> any(bool test(T element)) {
223 _FutureImpl<bool> future = new _FutureImpl<bool>();
224 StreamSubscription subscription;
225 subscription = this.listen(
226 (T element) {
227 if (test(element)) {
228 subscription.cancel();
229 future._setValue(true);
230 }
231 },
232 onError: future._setError,
233 onDone: () {
234 future._setValue(false);
235 },
236 unsubscribeOnError: true);
237 return future;
238 }
239
240
241 /** Counts the elements in the stream. */
242 Future<int> get length {
243 _FutureImpl<int> future = new _FutureImpl<int>();
244 int count = 0;
245 this.listen(
246 (_) { count++; },
247 onError: future._setError,
248 onDone: () {
249 future._setValue(count);
250 },
251 unsubscribeOnError: true);
252 return future;
253 }
254
255 /**
256 * Finds the least element in the stream.
257 *
258 * If the stream is empty, the result is [:null:].
259 * Otherwise the result is a value from the stream that is not greater
260 * than any other value from the stream (according to [compare], which must
261 * be a [Comparator]).
262 *
263 * If [compare] is omitted, it defaults to [Comparable.compare].
264 */
265 Future<T> min([int compare(T a, T b)]) {
266 if (compare == null) compare = Comparable.compare;
267 _FutureImpl<T> future = new _FutureImpl<T>();
268 StreamSubscription subscription;
269 T min = null;
270 subscription = this.listen(
271 (T value) {
272 min = value;
273 subscription.onData((T value) {
274 if (compare(min, value) > 0) min = value;
275 });
276 },
277 onError: future._setError,
278 onDone: () {
279 future._setValue(min);
280 },
281 unsubscribeOnError: true
282 );
283 return future;
284 }
285
286 /**
287 * Finds the least element in the stream.
288 *
289 * If the stream is empty, the result is [:null:].
290 * Otherwise the result is an value from the stream that is not greater
291 * than any other value from the stream (according to [compare], which must
292 * be a [Comparator]).
293 *
294 * If [compare] is omitted, it defaults to [Comparable.compare].
295 */
296 Future<T> max([int compare(T a, T b)]) {
297 if (compare == null) compare = Comparable.compare;
298 _FutureImpl<T> future = new _FutureImpl<T>();
299 StreamSubscription subscription;
300 T max = null;
301 subscription = this.listen(
302 (T value) {
303 max = value;
304 subscription.onData((T value) {
305 if (compare(max, value) < 0) max = value;
306 });
307 },
308 onError: future._setError,
309 onDone: () {
310 future._setValue(max);
311 },
312 unsubscribeOnError: true
313 );
314 return future;
315 }
316
317 /** Reports whether this stream contains any elements. */
318 Future<bool> get isEmpty {
319 _FutureImpl<bool> future = new _FutureImpl<bool>();
320 StreamSubscription subscription;
321 subscription = this.listen(
322 (_) {
323 subscription.cancel();
324 future._setValue(false);
325 },
326 onError: future._setError,
327 onDone: () {
328 future._setValue(true);
329 },
330 unsubscribeOnError: true);
331 return future;
332 }
333
334 /** Collect the data of this stream in a [List]. */
335 Future<List<T>> toList() {
336 List<T> result = <T>[];
337 _FutureImpl<List<T>> future = new _FutureImpl<List<T>>();
338 this.listen(
339 (T data) {
340 result.add(data);
341 },
342 onError: future._setError,
343 onDone: () {
344 future._setValue(result);
345 },
346 unsubscribeOnError: true);
347 return future;
348 }
349
350 /** Collect the data of this stream in a [Set]. */
351 Future<Set<T>> toSet() {
352 Set<T> result = new Set<T>();
353 _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>();
354 this.listen(
355 (T data) {
356 result.add(data);
357 },
358 onError: future._setError,
359 onDone: () {
360 future._setValue(result);
361 },
362 unsubscribeOnError: true);
363 return future;
364 }
365
366 /**
367 * Provide at most the first [n] values of this stream.
368 *
369 * Forwards the first [n] data events of this stream, and all error
370 * events, to the returned stream, and ends with a done event.
371 *
372 * If this stream produces fewer than [count] values before it's done,
373 * so will the returned stream.
374 */
375 Stream<T> take(int count) {
376 return this.transform(new TakeStream(count));
377 }
378
379 /**
380 * Forwards data events while [test] is successful.
381 *
382 * The returned stream provides the same events as this stream as long
383 * as [test] returns [:true:] for the event data. The stream is done
384 * when either this stream is done, or when this stream first provides
385 * a value that [test] doesn't accept.
386 */
387 Stream<T> takeWhile(bool test(T value)) {
388 return this.transform(new TakeWhileStream(test));
389 }
390
391 /**
392 * Skips the first [count] data events from this stream.
393 */
394 Stream<T> skip(int count) {
395 return this.transform(new SkipStream(count));
396 }
397
398 /**
399 * Skip data events from this stream while they are matched by [test].
400 *
401 * Error and done events are provided by the returned stream unmodified.
402 *
403 * Starting with the first data event where [test] returns true for the
404 * event data, the returned stream will have the same events as this stream.
405 */
406 Stream<T> skipWhile(bool test(T value)) {
407 return this.transform(new SkipWhileStream(test));
408 }
409
410 /**
411 * Skip data events if they are equal to the previous data event.
412 *
413 * The returned stream provides the same events as this stream, except
414 * that it never provides two consequtive data events that are equal.
415 *
416 * Equality is determined by the provided [equals] method. If that is
417 * omitted, the '==' operator on the last provided data element is used.
418 */
419 Stream<T> distinct([bool equals(T previous, T next)]) {
420 return this.transform(new DistinctStream(equals));
421 }
422
423 /**
424 * Returns the first element.
425 *
426 * If [this] is empty throws a [StateError]. Otherwise this method is
427 * equivalent to [:this.elementAt(0):]
428 */
429 Future<T> get first {
430 _FutureImpl<T> future = new _FutureImpl();
431 StreamSubscription subscription;
432 subscription = this.listen(
433 (T value) {
434 future._setValue(value);
435 subscription.cancel();
436 return;
437 },
438 onError: future._setError,
439 onDone: () {
440 future._setError(new AsyncError(new StateError("No elements")));
441 },
442 unsubscribeOnError: true);
443 return future;
444 }
445
446 /**
447 * Returns the last element.
448 *
449 * If [this] is empty throws a [StateError].
450 */
451 Future<T> get last {
452 _FutureImpl<T> future = new _FutureImpl<T>();
453 T result = null;
454 bool foundResult = false;
455 StreamSubscription subscription;
456 subscription = this.listen(
457 (T value) {
458 foundResult = true;
459 result = value;
460 },
461 onError: future._setError,
462 onDone: () {
463 if (foundResult) {
464 future._setValue(result);
465 return;
466 }
467 future._setError(new AsyncError(new StateError("No elements")));
468 },
469 unsubscribeOnError: true);
470 return future;
471 }
472
473 /**
474 * Returns the single element.
475 *
476 * If [this] is empty or has more than one element throws a [StateError].
477 */
478 Future<T> get single {
479 _FutureImpl<T> future = new _FutureImpl<T>();
480 T result = null;
481 bool foundResult = false;
482 StreamSubscription subscription;
483 subscription = this.listen(
484 (T value) {
485 if (foundResult) {
486 // This is the second element we get.
487 Error error = new StateError("More than one element");
488 future._setError(new AsyncError(error));
489 subscription.cancel();
490 return;
491 }
492 foundResult = true;
493 result = value;
494 },
495 onError: future._setError,
496 onDone: () {
497 if (foundResult) {
498 future._setValue(result);
499 return;
500 }
501 future._setError(new AsyncError(new StateError("No elements")));
502 },
503 unsubscribeOnError: true);
504 return future;
505 }
506
507 /**
508 * Find the first element of this stream matching [test].
509 *
510 * Returns a future that is filled with the first element of this stream
511 * that [test] returns true for.
512 *
513 * If no such element is found before this stream is done, and a
514 * [defaultValue] function is provided, the result of calling [defaultValue]
515 * becomes the value of the future.
516 *
517 * If an error occurs, or if this stream ends without finding a match and
518 * with no [defaultValue] function provided, the future will receive an
519 * error.
520 */
521 Future<T> firstMatching(bool test(T value), {T defaultValue()}) {
522 _FutureImpl<T> future = new _FutureImpl<T>();
523 StreamSubscription subscription;
524 subscription = this.listen(
525 (T value) {
526 bool matches;
527 try {
528 matches = (true == test(value));
529 } catch (e, s) {
530 future._setError(new AsyncError(e, s));
531 subscription.cancel();
532 return;
533 }
534 if (matches) {
535 future._setValue(value);
536 subscription.cancel();
537 }
538 },
539 onError: future._setError,
540 onDone: () {
541 if (defaultValue != null) {
542 T value;
543 try {
544 value = defaultValue();
545 } catch (e, s) {
546 future._setError(new AsyncError(e, s));
547 return;
548 }
549 future._setValue(value);
550 return;
551 }
552 future._setError(
553 new AsyncError(new StateError("firstMatch ended without match")));
554 },
555 unsubscribeOnError: true);
556 return future;
557 }
558
559 /**
560 * Finds the last element in this stream matching [test].
561 *
562 * As [firstMatching], except that the last matching element is found.
563 * That means that the result cannot be provided before this stream
564 * is done.
565 */
566 Future<T> lastMatching(bool test(T value), {T defaultValue()}) {
567 _FutureImpl<T> future = new _FutureImpl<T>();
568 T result = null;
569 bool foundResult = false;
570 StreamSubscription subscription;
571 subscription = this.listen(
572 (T value) {
573 bool matches;
574 try {
575 matches = (true == test(value));
576 } catch (e, s) {
577 future._setError(new AsyncError(e, s));
578 subscription.cancel();
579 return;
580 }
581 if (matches) {
582 foundResult = true;
583 result = value;
584 }
585 },
586 onError: future._setError,
587 onDone: () {
588 if (foundResult) {
589 future._setValue(result);
590 return;
591 }
592 if (defaultValue != null) {
593 T value;
594 try {
595 value = defaultValue();
596 } catch (e, s) {
597 future._setError(new AsyncError(e, s));
598 return;
599 }
600 future._setValue(value);
601 return;
602 }
603 future._setError(
604 new AsyncError(new StateError("lastMatch ended without match")));
605 },
606 unsubscribeOnError: true);
607 return future;
608 }
609
610 /**
611 * Finds the single element in this stream matching [test].
612 *
613 * Like [lastMatch], except that it is an error if more than one
614 * matching element occurs in the stream.
615 */
616 Future<T> singleMatching(bool test(T value)) {
617 _FutureImpl<T> future = new _FutureImpl<T>();
618 T result = null;
619 bool foundResult = false;
620 StreamSubscription subscription;
621 subscription = this.listen(
622 (T value) {
623 bool matches;
624 try {
625 matches = (true == test(value));
626 } catch (e, s) {
627 future._setError(new AsyncError(e, s));
628 subscription.cancel();
629 return;
630 }
631 if (matches) {
632 if (foundResult) {
633 future._setError(new AsyncError(
634 new StateError('Multiple matches for "single"')));
635 subscription.cancel();
636 return;
637 }
638 foundResult = true;
639 result = value;
640 }
641 },
642 onError: future._setError,
643 onDone: () {
644 if (foundResult) {
645 future._setValue(result);
646 return;
647 }
648 future._setError(
649 new AsyncError(new StateError("single ended without match")));
650 },
651 unsubscribeOnError: true);
652 return future;
653 }
654
655 /**
656 * Returns the value of the [index]th data event of this stream.
657 *
658 * If an error event occurs, the future will end with this error.
659 *
660 * If this stream provides fewer than [index] elements before closing,
661 * an error is reported.
662 */
663 Future<T> elementAt(int index) {
664 _FutureImpl<T> future = new _FutureImpl();
665 StreamSubscription subscription;
666 subscription = this.listen(
667 (T value) {
668 if (index == 0) {
669 future._setValue(value);
670 subscription.cancel();
671 return;
672 }
673 index -= 1;
674 },
675 onError: future._setError,
676 onDone: () {
677 future._setError(new AsyncError(
678 new StateError("Not enough elements for elementAt")));
679 },
680 unsubscribeOnError: true);
681 return future;
682 }
683 }
684
685 /**
686 * A control object for the subscription on a [Stream].
687 *
688 * When you subscribe on a [Stream] using [Stream.subscribe],
689 * a [StreamSubscription] object is returned. This object
690 * is used to later unsubscribe again, or to temporarily pause
691 * the stream's events.
692 */
693 abstract class StreamSubscription<T> {
694 /**
695 * Cancels this subscription. It will no longer receive events.
696 *
697 * If an event is currently firing, this unsubscription will only
698 * take effect after all subscribers have received the current event.
699 */
700 void cancel();
701
702 /** Set or override the data event handler of this subscription. */
703 void onData(void handleData(T data));
704
705 /** Set or override the error event handler of this subscription. */
706 void onError(void handleError(AsyncError error));
707
708 /** Set or override the done event handler of this subscription. */
709 void onDone(void handleDone());
710
711 /**
712 * Request that the stream pauses events until further notice.
713 *
714 * If [resumeSignal] is provided, the stream will undo the pause
715 * when the signal completes.
716 * A call to [resume] will also undo a pause.
717 *
718 * If the subscription is paused more than once, an equal number
719 * of resumes must be performed to resume the stream.
720 */
721 void pause([Signal resumeSignal]);
722
723 /**
724 * Resume after a pause.
725 */
726 void resume();
727 }
728
729
730 /**
731 * An interface that abstracts sending events into a [Stream].
732 */
733 abstract class StreamSink<T> implements Sink<T> {
734 void add(T event);
735 /** Signal an async error to the receivers of this sink's values. */
736 void signalError(AsyncError errorEvent);
737 void close();
738 }
739
740 /** [Stream] wrapper that only exposes the [Stream] interface. */
741 class StreamView<T> extends Stream<T> {
742 Stream<T> _stream;
743
744 StreamView(this._stream);
745
746 StreamSubscription<T> listen(void onData(T value),
747 { void onError(AsyncError error),
748 void onDone(),
749 bool unsubscribeOnError }) {
750 return _stream.listen(onData, onError: onError, onDone: onDone,
751 unsubscribeOnError: unsubscribeOnError);
752 }
753 }
754
755 /**
756 * [StreamSink] wrapper that only exposes the [StreamSink] interface.
757 */
758 class StreamSinkView<T> implements StreamSink<T> {
759 final StreamSink<T> _sink;
760
761 StreamSinkView(this._sink);
762
763 void add(T value) { _sink.add(value); }
764 void signalError(AsyncError error) { _sink.signalError(error); }
765 void close() { _sink.close(); }
766 }
767
768
769 /**
770 * The target of a [Stream.pipe] call.
771 *
772 * The [Stream.pipe] call will pass itself to this object, and then return
773 * the resulting [Future]. The pipe should complete the future when it's
774 * done.
775 */
776 abstract class StreamConsumer<S, T> {
777 Future<T> consume(Stream<S> stream);
778 }
779
780 /**
781 * The target of a [Stream.transform] call.
782 *
783 * The [Stream.transform] call will pass itself to this object and then return
784 * the resulting stream.
785 */
786 abstract class StreamTransformer<S, T> {
787 /**
788 * Create a [StreamTransformer] that delegates events to the given functions.
789 *
790 * If a parameter is omitted, a default handler is used that forwards the
791 * event directly to the sink.
792 *
793 * Pauses on the are forwarded to the input stream as well.
794 */
795 factory StreamTransformer.from({
796 void onData(S data, StreamSink<T> sink),
797 void onError(AsyncError error, StreamSink<T> sink),
798 void onDone(StreamSink<T> sink)}) = _StreamTransformerFunctionWrapper;
799
800 Stream<T> bind(Stream<S> stream);
801 }
802
803
804 // TODO(lrn): Remove this class.
805 /**
806 * A base class for configuration objects for [TransformStream].
807 *
808 * A default implementation forwards all incoming events to the output sink.
809 */
810 abstract class _StreamTransformer<S, T> implements StreamTransformer<S, T> {
811 const _StreamTransformer();
812
813 Stream<T> bind(Stream<S> input) {
814 return input.transform(new TransformStream<S, T>(this));
815 }
816
817 /**
818 * Handle an incoming data event.
819 */
820 void handleData(S data, StreamSink<T> sink) {
821 var outData = data;
822 return sink.add(outData);
823 }
824
825 /**
826 * Handle an incoming error event.
827 */
828 void handleError(AsyncError error, StreamSink<T> sink) {
829 sink.signalError(error);
830 }
831
832 /**
833 * Handle an incoming done event.
834 */
835 void handleDone(StreamSink<T> sink) {
836 sink.close();
837 }
838 }
OLDNEW
« no previous file with comments | « sdk/lib/async/signal.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698