OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 514 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
525 * into zero or more events. | 525 * into zero or more events. |
526 * | 526 * |
527 * Each incoming event is converted to an [Iterable] of new events, | 527 * Each incoming event is converted to an [Iterable] of new events, |
528 * and each of these new events are then sent by the returned stream | 528 * and each of these new events are then sent by the returned stream |
529 * in order. | 529 * in order. |
530 * | 530 * |
531 * The returned stream is a broadcast stream if this stream is. | 531 * The returned stream is a broadcast stream if this stream is. |
532 * If a broadcast stream is listened to more than once, each subscription | 532 * If a broadcast stream is listened to more than once, each subscription |
533 * will individually call `convert` and expand the events. | 533 * will individually call `convert` and expand the events. |
534 */ | 534 */ |
535 Stream/*<S>*/ expand(Iterable/*<S>*/ convert(T value)) { | 535 Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) { |
536 return new _ExpandStream<T, dynamic/*=S*/>(this, convert); | 536 return new _ExpandStream<T, dynamic/*=S*/>(this, convert); |
537 } | 537 } |
538 | 538 |
539 /** | 539 /** |
540 * Pipe the events of this stream into [streamConsumer]. | 540 * Pipe the events of this stream into [streamConsumer]. |
541 * | 541 * |
542 * The events of this stream are added to `streamConsumer` using | 542 * The events of this stream are added to `streamConsumer` using |
543 * [StreamConsumer.addStream]. | 543 * [StreamConsumer.addStream]. |
544 * The `streamConsumer` is closed when this stream has been successfully added | 544 * The `streamConsumer` is closed when this stream has been successfully added |
545 * to it - when the future returned by `addStream` completes without an error. | 545 * to it - when the future returned by `addStream` completes without an error. |
(...skipping 13 matching lines...) Expand all Loading... |
559 } | 559 } |
560 | 560 |
561 /** | 561 /** |
562 * Chains this stream as the input of the provided [StreamTransformer]. | 562 * Chains this stream as the input of the provided [StreamTransformer]. |
563 * | 563 * |
564 * Returns the result of [:streamTransformer.bind:] itself. | 564 * Returns the result of [:streamTransformer.bind:] itself. |
565 * | 565 * |
566 * The `streamTransformer` can decide whether it wants to return a | 566 * The `streamTransformer` can decide whether it wants to return a |
567 * broadcast stream or not. | 567 * broadcast stream or not. |
568 */ | 568 */ |
569 Stream transform(StreamTransformer<T, dynamic> streamTransformer) { | 569 Stream/*<S>*/ transform/*<S>*/( |
| 570 StreamTransformer<T, dynamic/*=S*/ > streamTransformer) { |
570 return streamTransformer.bind(this); | 571 return streamTransformer.bind(this); |
571 } | 572 } |
572 | 573 |
573 /** | 574 /** |
574 * Reduces a sequence of values by repeatedly applying [combine]. | 575 * Reduces a sequence of values by repeatedly applying [combine]. |
575 */ | 576 */ |
576 Future<T> reduce(T combine(T previous, T element)) { | 577 Future<T> reduce(T combine(T previous, T element)) { |
577 _Future<T> result = new _Future<T>(); | 578 _Future<T> result = new _Future<T>(); |
578 bool seenFirst = false; | 579 bool seenFirst = false; |
579 T value; | 580 T value; |
(...skipping 1109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1689 * }, | 1690 * }, |
1690 * onPause: () { subscription.pause(); }, | 1691 * onPause: () { subscription.pause(); }, |
1691 * onResume: () { subscription.resume(); }, | 1692 * onResume: () { subscription.resume(); }, |
1692 * onCancel: () { subscription.cancel(); }, | 1693 * onCancel: () { subscription.cancel(); }, |
1693 * sync: true); | 1694 * sync: true); |
1694 * return controller.stream.listen(null); | 1695 * return controller.stream.listen(null); |
1695 * }); | 1696 * }); |
1696 */ | 1697 */ |
1697 const factory StreamTransformer( | 1698 const factory StreamTransformer( |
1698 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) | 1699 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) |
1699 = _StreamSubscriptionTransformer; | 1700 = _StreamSubscriptionTransformer<S, T>; |
1700 | 1701 |
1701 /** | 1702 /** |
1702 * Creates a [StreamTransformer] that delegates events to the given functions. | 1703 * Creates a [StreamTransformer] that delegates events to the given functions. |
1703 * | 1704 * |
1704 * Example use of a duplicating transformer: | 1705 * Example use of a duplicating transformer: |
1705 * | 1706 * |
1706 * stringStream.transform(new StreamTransformer<String, String>.fromHandle
rs( | 1707 * stringStream.transform(new StreamTransformer<String, String>.fromHandle
rs( |
1707 * handleData: (String value, EventSink<String> sink) { | 1708 * handleData: (String value, EventSink<String> sink) { |
1708 * sink.add(value); | 1709 * sink.add(value); |
1709 * sink.add(value); // Duplicate the incoming events. | 1710 * sink.add(value); // Duplicate the incoming events. |
1710 * })); | 1711 * })); |
1711 */ | 1712 */ |
1712 factory StreamTransformer.fromHandlers({ | 1713 factory StreamTransformer.fromHandlers({ |
1713 void handleData(S data, EventSink<T> sink), | 1714 void handleData(S data, EventSink<T> sink), |
1714 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), | 1715 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), |
1715 void handleDone(EventSink<T> sink)}) | 1716 void handleDone(EventSink<T> sink)}) |
1716 = _StreamHandlerTransformer; | 1717 = _StreamHandlerTransformer<S, T>; |
1717 | 1718 |
1718 /** | 1719 /** |
1719 * Transform the incoming [stream]'s events. | 1720 * Transform the incoming [stream]'s events. |
1720 * | 1721 * |
1721 * Creates a new stream. | 1722 * Creates a new stream. |
1722 * When this stream is listened to, it will start listening on [stream], | 1723 * When this stream is listened to, it will start listening on [stream], |
1723 * and generate events on the new stream based on the events from [stream]. | 1724 * and generate events on the new stream based on the events from [stream]. |
1724 * | 1725 * |
1725 * Subscriptions on the returned stream should propagate pause state | 1726 * Subscriptions on the returned stream should propagate pause state |
1726 * to the subscription on [stream]. | 1727 * to the subscription on [stream]. |
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1800 class _ControllerEventSinkWrapper<T> implements EventSink<T> { | 1801 class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
1801 EventSink _sink; | 1802 EventSink _sink; |
1802 _ControllerEventSinkWrapper(this._sink); | 1803 _ControllerEventSinkWrapper(this._sink); |
1803 | 1804 |
1804 void add(T data) { _sink.add(data); } | 1805 void add(T data) { _sink.add(data); } |
1805 void addError(error, [StackTrace stackTrace]) { | 1806 void addError(error, [StackTrace stackTrace]) { |
1806 _sink.addError(error, stackTrace); | 1807 _sink.addError(error, stackTrace); |
1807 } | 1808 } |
1808 void close() { _sink.close(); } | 1809 void close() { _sink.close(); } |
1809 } | 1810 } |
OLD | NEW |