OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 1587 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1598 * Otherwise, the returned future will complete when either: | 1598 * Otherwise, the returned future will complete when either: |
1599 * | 1599 * |
1600 * * all events have been processed and the sink has been closed, or | 1600 * * all events have been processed and the sink has been closed, or |
1601 * * the sink has otherwise been stopped from handling more events | 1601 * * the sink has otherwise been stopped from handling more events |
1602 * (for example by cancelling a stream subscription). | 1602 * (for example by cancelling a stream subscription). |
1603 */ | 1603 */ |
1604 Future get done; | 1604 Future get done; |
1605 } | 1605 } |
1606 | 1606 |
1607 /** | 1607 /** |
1608 * The target of a [Stream.transform] call. | 1608 * Transforms a Stream. |
1609 * | 1609 * |
1610 * The [Stream.transform] call will pass itself to this object and then return | 1610 * When a stream's [Stream.transform] method is invoked with a |
1611 * the resulting stream. | 1611 * [StreamTransformer], the stream calls the [bind] method on the provided |
| 1612 * transformer. The resulting stream is then returned from the |
| 1613 * [Stream.transform] method. |
| 1614 * |
| 1615 * Conceptually, a transformer is simply a function from [Stream] to [Stream] |
| 1616 * that is encapsulated into a class. |
1612 * | 1617 * |
1613 * It is good practice to write transformers that can be used multiple times. | 1618 * It is good practice to write transformers that can be used multiple times. |
| 1619 * |
| 1620 * All other transforming methods on [Stream], such as [Stream.map], |
| 1621 * [Stream.where] or [Stream.expand] can be implemented using |
| 1622 * [Stream.transform]. A [StreamTransformer] is thus very powerful but often |
| 1623 * also a bit more complicated to use. |
1614 */ | 1624 */ |
1615 abstract class StreamTransformer<S, T> { | 1625 abstract class StreamTransformer<S, T> { |
1616 /** | 1626 /** |
1617 * Creates a [StreamTransformer]. | 1627 * Creates a [StreamTransformer] based on the given [onListen] callback. |
1618 * | 1628 * |
1619 * The returned instance takes responsibility of implementing ([bind]). | 1629 * The returned stream transformer uses the provided [onListen] callback |
1620 * When the user invokes `bind` it returns a new "bound" stream. Only when | 1630 * when a transformed stream is listened to. At that time, the callback |
1621 * the user starts listening to the bound stream, the `listen` method | 1631 * receives the input stream (the one passed to [bind]) and a |
1622 * invokes the given closure [transformer]. | 1632 * boolean flag `cancelOnError` to create a [StreamSubscription]. |
1623 * | 1633 * |
1624 * The [transformer] closure receives the stream, that was bound, as argument | 1634 * The [onListen] callback does *not* receive the handlers that were passed |
1625 * and returns a [StreamSubscription]. In almost all cases the closure | 1635 * to [Stream.listen]. These are automatically set after the call to the |
1626 * listens itself to the stream that is given as argument. | 1636 * [onListen] callback (using [StreamSubscription.onData], |
| 1637 * [StreamSubscription.onError] and [StreamSubscription.onDone]). |
1627 * | 1638 * |
1628 * The result of invoking the [transformer] closure is a [StreamSubscription]. | 1639 * Most commonly, an [onListen] callback will first call [Stream.listen] on |
1629 * The bound stream-transformer (created by the `bind` method above) then sets | 1640 * the provided stream (with the corresponding `cancelOnError` flag), and then |
1630 * the handlers it received as part of the `listen` call. | 1641 * return a new [StreamSubscription]. |
1631 * | |
1632 * Conceptually this can be summarized as follows: | |
1633 * | |
1634 * 1. `var transformer = new StreamTransformer(transformerClosure);` | |
1635 * creates a `StreamTransformer` that supports the `bind` method. | |
1636 * 2. `var boundStream = stream.transform(transformer);` binds the `stream` | |
1637 * and returns a bound stream that has a pointer to `stream`. | |
1638 * 3. `boundStream.listen(f1, onError: f2, onDone: f3, cancelOnError: b)` | |
1639 * starts the listening and transformation. This is accomplished | |
1640 * in 2 steps: first the `boundStream` invokes the `transformerClosure` with | |
1641 * the `stream` it captured: `transformerClosure(stream, b)`. | |
1642 * The result `subscription`, a [StreamSubscription], is then | |
1643 * updated to receive its handlers: `subscription.onData(f1)`, | |
1644 * `subscription.onError(f2)`, `subscription(f3)`. Finally the subscription | |
1645 * is returned as result of the `listen` call. | |
1646 * | 1642 * |
1647 * There are two common ways to create a StreamSubscription: | 1643 * There are two common ways to create a StreamSubscription: |
1648 * | 1644 * |
1649 * 1. by creating a new class that implements [StreamSubscription]. | 1645 * 1. by allocating a [StreamController] and to return the result of |
| 1646 * listening to its stream. It's important to forward pause, resume and |
| 1647 * cancel events (unless the transformer intentionally wants to change |
| 1648 * this behavior). |
| 1649 * 2. by creating a new class that implements [StreamSubscription]. |
1650 * Note that the subscription should run callbacks in the [Zone] the | 1650 * Note that the subscription should run callbacks in the [Zone] the |
1651 * stream was listened to. | 1651 * stream was listened to (see [Zone] and [Zone.bindCallback]). |
1652 * 2. by allocating a [StreamController] and to return the result of | |
1653 * listening to its stream. | |
1654 * | 1652 * |
1655 * Example use of a duplicating transformer: | 1653 * Example: |
1656 * | 1654 * |
1657 * stringStream.transform(new StreamTransformer<String, String>( | 1655 * ``` |
1658 * (Stream<String> input, bool cancelOnError) { | 1656 * /// Starts listening to [input] and duplicates all non-error events. |
1659 * StreamController<String> controller; | 1657 * StreamSubscription<int> _onListen(Stream<int> input, bool cancelOnError) { |
1660 * StreamSubscription<String> subscription; | 1658 * StreamSubscription<String> subscription; |
1661 * controller = new StreamController<String>( | 1659 * // Create controller that forwards pause, resume and cancel events. |
1662 * onListen: () { | 1660 * var controller = new StreamController<String>( |
1663 * subscription = input.listen((data) { | 1661 * onPause: () { |
1664 * // Duplicate the data. | 1662 * subscription.pause(); |
1665 * controller.add(data); | 1663 * }, |
1666 * controller.add(data); | 1664 * onResume: () { |
1667 * }, | 1665 * subscription.resume(); |
1668 * onError: controller.addError, | 1666 * }, |
1669 * onDone: controller.close, | 1667 * onCancel: () => subscription.cancel(), |
1670 * cancelOnError: cancelOnError); | 1668 * sync: true); // "sync" is correct here, since events are forwarded. |
1671 * }, | 1669 * |
1672 * onPause: () { subscription.pause(); }, | 1670 * // Listen to the provided stream using `cancelOnError`. |
1673 * onResume: () { subscription.resume(); }, | 1671 * subscription = input.listen((data) { |
1674 * onCancel: () => subscription.cancel(), | 1672 * // Duplicate the data. |
1675 * sync: true); | 1673 * controller.add(data); |
1676 * return controller.stream.listen(null); | 1674 * controller.add(data); |
1677 * }); | 1675 * }, |
| 1676 * onError: controller.addError, |
| 1677 * onDone: controller.close, |
| 1678 * cancelOnError: cancelOnError); |
| 1679 * |
| 1680 * // Return a new [StreamSubscription] by listening to the controller's |
| 1681 * // stream. |
| 1682 * return controller.stream.listen(null); |
| 1683 * } |
| 1684 * |
| 1685 * // Instantiate a transformer: |
| 1686 * var duplicator = const StreamTransformer<int, int>(_onListen); |
| 1687 * |
| 1688 * // Use as follows: |
| 1689 * intStream.transform(duplicator); |
| 1690 * ``` |
1678 */ | 1691 */ |
1679 const factory StreamTransformer( | 1692 const factory StreamTransformer( |
1680 StreamSubscription<T> transformer( | 1693 StreamSubscription<T> onListen( |
1681 Stream<S> stream, bool cancelOnError)) = | 1694 Stream<S> stream, bool cancelOnError)) = |
1682 _StreamSubscriptionTransformer<S, T>; | 1695 _StreamSubscriptionTransformer<S, T>; |
1683 | 1696 |
1684 /** | 1697 /** |
1685 * Creates a [StreamTransformer] that delegates events to the given functions. | 1698 * Creates a [StreamTransformer] that delegates events to the given functions. |
1686 * | 1699 * |
1687 * Example use of a duplicating transformer: | 1700 * Example use of a duplicating transformer: |
1688 * | 1701 * |
1689 * stringStream.transform(new StreamTransformer<String, String>.fromHandle
rs( | 1702 * stringStream.transform(new StreamTransformer<String, String>.fromHandle
rs( |
1690 * handleData: (String value, EventSink<String> sink) { | 1703 * handleData: (String value, EventSink<String> sink) { |
1691 * sink.add(value); | 1704 * sink.add(value); |
1692 * sink.add(value); // Duplicate the incoming events. | 1705 * sink.add(value); // Duplicate the incoming events. |
1693 * })); | 1706 * })); |
1694 */ | 1707 */ |
1695 factory StreamTransformer.fromHandlers( | 1708 factory StreamTransformer.fromHandlers( |
1696 {void handleData(S data, EventSink<T> sink), | 1709 {void handleData(S data, EventSink<T> sink), |
1697 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), | 1710 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), |
1698 void handleDone(EventSink<T> sink)}) = _StreamHandlerTransformer<S, T>; | 1711 void handleDone(EventSink<T> sink)}) = _StreamHandlerTransformer<S, T>; |
1699 | 1712 |
1700 /** | 1713 /** |
1701 * Transform the incoming [stream]'s events. | 1714 * Transforms the provided [stream]. |
1702 * | 1715 * |
1703 * Creates a new stream. | 1716 * Returns a new stream with events that are computed from events of the |
1704 * When this stream is listened to, it will start listening on [stream], | 1717 * provided [stream]. |
1705 * and generate events on the new stream based on the events from [stream]. | |
1706 * | 1718 * |
1707 * Subscriptions on the returned stream should propagate pause state | 1719 * Implementors of the [StreamTransformer] interface should document |
1708 * to the subscription on [stream]. | 1720 * differences from the following expected behavior: |
| 1721 * |
| 1722 * * When the returned stream is listened to, it starts listening to the |
| 1723 * input [stream]. |
| 1724 * * Subscriptions of the returned stream forward (in a reasonable time) |
| 1725 * a [StreamSubscription.pause] call to the subscription of the input |
| 1726 * [stream]. |
| 1727 * * Similarly, canceling a subscription of the returned stream eventually |
| 1728 * (in reasonable time) cancels the subscription of the input [stream]. |
| 1729 * |
| 1730 * "Reasonable time" depends on the transformer and stream. Some transformers, |
| 1731 * like a "timeout" transformer, might make these operations depend on a |
| 1732 * duration. Others might not delay them at all, or just by a microtask. |
1709 */ | 1733 */ |
1710 Stream<T> bind(Stream<S> stream); | 1734 Stream<T> bind(Stream<S> stream); |
1711 } | 1735 } |
1712 | 1736 |
1713 /** | 1737 /** |
1714 * An [Iterator] like interface for the values of a [Stream]. | 1738 * An [Iterator] like interface for the values of a [Stream]. |
1715 * | 1739 * |
1716 * This wraps a [Stream] and a subscription on the stream. It listens | 1740 * This wraps a [Stream] and a subscription on the stream. It listens |
1717 * on the stream, and completes the future returned by [moveNext] when the | 1741 * on the stream, and completes the future returned by [moveNext] when the |
1718 * next value becomes available. | 1742 * next value becomes available. |
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1789 } | 1813 } |
1790 | 1814 |
1791 void addError(error, [StackTrace stackTrace]) { | 1815 void addError(error, [StackTrace stackTrace]) { |
1792 _sink.addError(error, stackTrace); | 1816 _sink.addError(error, stackTrace); |
1793 } | 1817 } |
1794 | 1818 |
1795 void close() { | 1819 void close() { |
1796 _sink.close(); | 1820 _sink.close(); |
1797 } | 1821 } |
1798 } | 1822 } |
OLD | NEW |