OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 1587 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1598 * Otherwise, the returned future will complete when either: | 1598 * Otherwise, the returned future will complete when either: |
1599 * | 1599 * |
1600 * * all events have been processed and the sink has been closed, or | 1600 * * all events have been processed and the sink has been closed, or |
1601 * * the sink has otherwise been stopped from handling more events | 1601 * * the sink has otherwise been stopped from handling more events |
1602 * (for example by cancelling a stream subscription). | 1602 * (for example by cancelling a stream subscription). |
1603 */ | 1603 */ |
1604 Future get done; | 1604 Future get done; |
1605 } | 1605 } |
1606 | 1606 |
1607 /** | 1607 /** |
1608 * The target of a [Stream.transform] call. | 1608 * Transforms a Stream. |
1609 * | 1609 * |
1610 * The [Stream.transform] call will pass itself to this object and then return | 1610 * When a stream's [Stream.transform] method is invoked with a |
1611 * the resulting stream. | 1611 * [StreamTransformer], the stream calls the [bind] method on the provided |
1612 * transformer. The resulting stream is then returned from the | |
1613 * [Stream.transform] method. | |
1614 * | |
1615 * Conceptually, a transformer is simply a function from [Stream] to [Stream] | |
1616 * that is encapsulated into a class. | |
1612 * | 1617 * |
1613 * It is good practice to write transformers that can be used multiple times. | 1618 * It is good practice to write transformers that can be used multiple times. |
1619 * | |
1620 * All other transforming methods on [Stream], such as [Stream.map], | |
1621 * [Stream.where] or [Stream.expand] can be implemented using | |
1622 * [Stream.transform]. A [StreamTransformer] is thus very powerful but often | |
1623 * also a bit more complicated to use. | |
1614 */ | 1624 */ |
1615 abstract class StreamTransformer<S, T> { | 1625 abstract class StreamTransformer<S, T> { |
1616 /** | 1626 /** |
1617 * Creates a [StreamTransformer]. | 1627 * Creates a [StreamTransformer] based on the given [onListen] callback. |
1618 * | 1628 * |
1619 * The returned instance takes responsibility of implementing ([bind]). | 1629 * The returned stream transformer uses the provided [onListen] callback |
1620 * When the user invokes `bind` it returns a new "bound" stream. Only when | 1630 * when a transformed stream is listened to. At that time, the callback |
1621 * the user starts listening to the bound stream, the `listen` method | 1631 * receives the input stream (the one passed to [bind]) and a |
1622 * invokes the given closure [transformer]. | 1632 * boolean flag `cancelOnError` to create a [StreamSubscription]. |
1623 * | 1633 * |
1624 * The [transformer] closure receives the stream, that was bound, as argument | 1634 * The [onListen] callback does *not* receive the handlers that were passed |
1625 * and returns a [StreamSubscription]. In almost all cases the closure | 1635 * to [Stream.listen]. These are automatically set after the call to the |
1626 * listens itself to the stream that is given as argument. | 1636 * [onListen] callback (using [StreamSubscription.onData], |
1637 * [StreamSubscription.onError] and [StreamSubscription.onDone]). | |
1627 * | 1638 * |
1628 * The result of invoking the [transformer] closure is a [StreamSubscription]. | 1639 * Most commonly, an [onListen] callback will first call [Stream.listen] on |
1629 * The bound stream-transformer (created by the `bind` method above) then sets | 1640 * the provided stream (with the corresponding `cancelOnError` flag), and then |
1630 * the handlers it received as part of the `listen` call. | 1641 * return a new [StreamSubscription]. |
1631 * | |
1632 * Conceptually this can be summarized as follows: | |
1633 * | |
1634 * 1. `var transformer = new StreamTransformer(transformerClosure);` | |
1635 * creates a `StreamTransformer` that supports the `bind` method. | |
1636 * 2. `var boundStream = stream.transform(transformer);` binds the `stream` | |
1637 * and returns a bound stream that has a pointer to `stream`. | |
1638 * 3. `boundStream.listen(f1, onError: f2, onDone: f3, cancelOnError: b)` | |
1639 * starts the listening and transformation. This is accomplished | |
1640 * in 2 steps: first the `boundStream` invokes the `transformerClosure` with | |
1641 * the `stream` it captured: `transformerClosure(stream, b)`. | |
1642 * The result `subscription`, a [StreamSubscription], is then | |
1643 * updated to receive its handlers: `subscription.onData(f1)`, | |
1644 * `subscription.onError(f2)`, `subscription(f3)`. Finally the subscription | |
1645 * is returned as result of the `listen` call. | |
1646 * | 1642 * |
1647 * There are two common ways to create a StreamSubscription: | 1643 * There are two common ways to create a StreamSubscription: |
1648 * | 1644 * |
1649 * 1. by creating a new class that implements [StreamSubscription]. | 1645 * 1. by allocating a [StreamController] and to return the result of |
1646 * listening to its stream. It's important to forward pause, resume and | |
1647 * cancel events (unless the transformer intentionally wants to change | |
1648 * this behavior). | |
1649 * 2. by creating a new class that implements [StreamSubscription]. | |
1650 * Note that the subscription should run callbacks in the [Zone] the | 1650 * Note that the subscription should run callbacks in the [Zone] the |
1651 * stream was listened to. | 1651 * stream was listened to (see [Zone] and [Zone.bindCallback]). |
1652 * 2. by allocating a [StreamController] and to return the result of | |
1653 * listening to its stream. | |
1654 * | 1652 * |
1655 * Example use of a duplicating transformer: | 1653 * Example use of a duplicating transformer: |
Lasse Reichstein Nielsen
2017/05/08 07:08:55
Example use of -> Example of use to create
floitsch
2017/05/08 09:29:50
slightly changed.
done.
| |
1656 * | 1654 * |
1657 * stringStream.transform(new StreamTransformer<String, String>( | 1655 * ``` |
1658 * (Stream<String> input, bool cancelOnError) { | 1656 * StreamSubscription<int> _onListen(Stream<int> input, bool cancelOnError) { |
1659 * StreamController<String> controller; | 1657 * StreamSubscription<String> subscription; |
1660 * StreamSubscription<String> subscription; | 1658 * // Create controller that forwards pause, resume and cancel events. |
1661 * controller = new StreamController<String>( | 1659 * var controller = new StreamController<int>( |
1662 * onListen: () { | 1660 * onPause: () { subscription.pause(); }, |
1663 * subscription = input.listen((data) { | 1661 * onResume: () { subscription.resume(); }, |
1664 * // Duplicate the data. | 1662 * onCancel: () => subscription.cancel(), |
1665 * controller.add(data); | 1663 * sync: true); // "sync" is correct here, since events are forwarded. |
1666 * controller.add(data); | 1664 * |
1667 * }, | 1665 * // Listen to the provided stream using `cancelOnError`. |
1668 * onError: controller.addError, | 1666 * subscription = input.listen((data) { |
1669 * onDone: controller.close, | 1667 * // Duplicate the data. |
1670 * cancelOnError: cancelOnError); | 1668 * controller.add(data); |
1671 * }, | 1669 * controller.add(data); |
1672 * onPause: () { subscription.pause(); }, | 1670 * }, onError: controller.addError, |
1673 * onResume: () { subscription.resume(); }, | 1671 * onDone: controller.close, |
1674 * onCancel: () => subscription.cancel(), | 1672 * cancelOnError: cancelOnError); |
Lasse Reichstein Nielsen
2017/05/08 07:08:54
I find the formatting slightly confusing.
Try putt
floitsch
2017/05/08 09:29:50
Not much of a difference.
done.
| |
1675 * sync: true); | 1673 * |
1676 * return controller.stream.listen(null); | 1674 * // Return a new [StreamSubscription] by listening to the controller's |
1677 * }); | 1675 * // stream. |
1676 * return controller.stream.listen(null); | |
1677 * } | |
1678 * | |
1679 * // Instantiate a transformer: | |
1680 * var duplicator = new StreamTransformer<int, int>(_onListen); | |
Lasse Reichstein Nielsen
2017/05/08 07:08:54
Consider making it const, just because you can.
Lasse Reichstein Nielsen
2017/05/08 07:08:55
Consider making it const, just because you can.
floitsch
2017/05/08 09:29:50
Acknowledged.
floitsch
2017/05/08 09:29:51
Done.
| |
1681 * | |
1682 * // Use as follows: | |
1683 * intStream.transform(duplicator); | |
1684 * ``` | |
1678 */ | 1685 */ |
1679 const factory StreamTransformer( | 1686 const factory StreamTransformer( |
1680 StreamSubscription<T> transformer( | 1687 StreamSubscription<T> onListen( |
1681 Stream<S> stream, bool cancelOnError)) = | 1688 Stream<S> stream, bool cancelOnError)) = |
1682 _StreamSubscriptionTransformer<S, T>; | 1689 _StreamSubscriptionTransformer<S, T>; |
1683 | 1690 |
1684 /** | 1691 /** |
1685 * Creates a [StreamTransformer] that delegates events to the given functions. | 1692 * Creates a [StreamTransformer] that delegates events to the given functions. |
1686 * | 1693 * |
1687 * Example use of a duplicating transformer: | 1694 * Example use of a duplicating transformer: |
1688 * | 1695 * |
1689 * stringStream.transform(new StreamTransformer<String, String>.fromHandle rs( | 1696 * stringStream.transform(new StreamTransformer<String, String>.fromHandle rs( |
1690 * handleData: (String value, EventSink<String> sink) { | 1697 * handleData: (String value, EventSink<String> sink) { |
1691 * sink.add(value); | 1698 * sink.add(value); |
1692 * sink.add(value); // Duplicate the incoming events. | 1699 * sink.add(value); // Duplicate the incoming events. |
1693 * })); | 1700 * })); |
1694 */ | 1701 */ |
1695 factory StreamTransformer.fromHandlers( | 1702 factory StreamTransformer.fromHandlers( |
1696 {void handleData(S data, EventSink<T> sink), | 1703 {void handleData(S data, EventSink<T> sink), |
1697 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), | 1704 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), |
1698 void handleDone(EventSink<T> sink)}) = _StreamHandlerTransformer<S, T>; | 1705 void handleDone(EventSink<T> sink)}) = _StreamHandlerTransformer<S, T>; |
1699 | 1706 |
1700 /** | 1707 /** |
1701 * Transform the incoming [stream]'s events. | 1708 * Transforms the provided [stream]. |
1702 * | 1709 * |
1703 * Creates a new stream. | 1710 * Returns a replacement stream for the provided input [stream]. |
Lasse Reichstein Nielsen
2017/05/08 07:08:55
Returns a new stream with events that are computed
floitsch
2017/05/08 09:29:50
Done.
| |
1704 * When this stream is listened to, it will start listening on [stream], | |
1705 * and generate events on the new stream based on the events from [stream]. | |
1706 * | 1711 * |
1707 * Subscriptions on the returned stream should propagate pause state | 1712 * Implementors of the [StreamTransformer] interface should document |
1708 * to the subscription on [stream]. | 1713 * differences from the following expected behavior: |
1714 * | |
1715 * * When the returned stream is listened to, it starts listening to the | |
1716 * input [stream]. | |
1717 * * Subscriptions of the returned stream forward (in a reasonable time) | |
1718 * a [StreamSubscription.pause] call to the subscription of the input | |
1719 * [stream]. | |
1720 * * Similarly, canceling a subscription of the returned stream eventually | |
1721 * (in reasonable time) cancels the subscription of the input [stream]. | |
1722 * | |
1723 * "Reasonable time" depends on the transformer and stream. Some transformers, | |
1724 * like a "timeout" transformer, might make these operations depend on a | |
1725 * duration. Others might delay them not at all, or just by a microtask. | |
Lasse Reichstein Nielsen
2017/05/08 07:08:55
delay them not at all -> not delay them at all
floitsch
2017/05/08 09:29:50
Done.
| |
1709 */ | 1726 */ |
1710 Stream<T> bind(Stream<S> stream); | 1727 Stream<T> bind(Stream<S> stream); |
1711 } | 1728 } |
1712 | 1729 |
1713 /** | 1730 /** |
1714 * An [Iterator] like interface for the values of a [Stream]. | 1731 * An [Iterator] like interface for the values of a [Stream]. |
1715 * | 1732 * |
1716 * This wraps a [Stream] and a subscription on the stream. It listens | 1733 * This wraps a [Stream] and a subscription on the stream. It listens |
1717 * on the stream, and completes the future returned by [moveNext] when the | 1734 * on the stream, and completes the future returned by [moveNext] when the |
1718 * next value becomes available. | 1735 * next value becomes available. |
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1789 } | 1806 } |
1790 | 1807 |
1791 void addError(error, [StackTrace stackTrace]) { | 1808 void addError(error, [StackTrace stackTrace]) { |
1792 _sink.addError(error, stackTrace); | 1809 _sink.addError(error, stackTrace); |
1793 } | 1810 } |
1794 | 1811 |
1795 void close() { | 1812 void close() { |
1796 _sink.close(); | 1813 _sink.close(); |
1797 } | 1814 } |
1798 } | 1815 } |
OLD | NEW |