Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(457)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 2864863002: Update StreamTransformer documentation. (Closed)
Patch Set: Address comments. Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | sdk/lib/async/stream_transformers.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 1587 matching lines...) Expand 10 before | Expand all | Expand 10 after
1598 * Otherwise, the returned future will complete when either: 1598 * Otherwise, the returned future will complete when either:
1599 * 1599 *
1600 * * all events have been processed and the sink has been closed, or 1600 * * all events have been processed and the sink has been closed, or
1601 * * the sink has otherwise been stopped from handling more events 1601 * * the sink has otherwise been stopped from handling more events
1602 * (for example by cancelling a stream subscription). 1602 * (for example by cancelling a stream subscription).
1603 */ 1603 */
1604 Future get done; 1604 Future get done;
1605 } 1605 }
1606 1606
1607 /** 1607 /**
1608 * The target of a [Stream.transform] call. 1608 * Transforms a Stream.
1609 * 1609 *
1610 * The [Stream.transform] call will pass itself to this object and then return 1610 * When a stream's [Stream.transform] method is invoked with a
1611 * the resulting stream. 1611 * [StreamTransformer], the stream calls the [bind] method on the provided
1612 * transformer. The resulting stream is then returned from the
1613 * [Stream.transform] method.
1614 *
1615 * Conceptually, a transformer is simply a function from [Stream] to [Stream]
1616 * that is encapsulated into a class.
1612 * 1617 *
1613 * It is good practice to write transformers that can be used multiple times. 1618 * It is good practice to write transformers that can be used multiple times.
1619 *
1620 * All other transforming methods on [Stream], such as [Stream.map],
1621 * [Stream.where] or [Stream.expand] can be implemented using
1622 * [Stream.transform]. A [StreamTransformer] is thus very powerful but often
1623 * also a bit more complicated to use.
1614 */ 1624 */
1615 abstract class StreamTransformer<S, T> { 1625 abstract class StreamTransformer<S, T> {
1616 /** 1626 /**
1617 * Creates a [StreamTransformer]. 1627 * Creates a [StreamTransformer] based on the given [onListen] callback.
1618 * 1628 *
1619 * The returned instance takes responsibility of implementing ([bind]). 1629 * The returned stream transformer uses the provided [onListen] callback
1620 * When the user invokes `bind` it returns a new "bound" stream. Only when 1630 * when a transformed stream is listened to. At that time, the callback
1621 * the user starts listening to the bound stream, the `listen` method 1631 * receives the input stream (the one passed to [bind]) and a
1622 * invokes the given closure [transformer]. 1632 * boolean flag `cancelOnError` to create a [StreamSubscription].
1623 * 1633 *
1624 * The [transformer] closure receives the stream, that was bound, as argument 1634 * The [onListen] callback does *not* receive the handlers that were passed
1625 * and returns a [StreamSubscription]. In almost all cases the closure 1635 * to [Stream.listen]. These are automatically set after the call to the
1626 * listens itself to the stream that is given as argument. 1636 * [onListen] callback (using [StreamSubscription.onData],
1637 * [StreamSubscription.onError] and [StreamSubscription.onDone]).
1627 * 1638 *
1628 * The result of invoking the [transformer] closure is a [StreamSubscription]. 1639 * Most commonly, an [onListen] callback will first call [Stream.listen] on
1629 * The bound stream-transformer (created by the `bind` method above) then sets 1640 * the provided stream (with the corresponding `cancelOnError` flag), and then
1630 * the handlers it received as part of the `listen` call. 1641 * return a new [StreamSubscription].
1631 *
1632 * Conceptually this can be summarized as follows:
1633 *
1634 * 1. `var transformer = new StreamTransformer(transformerClosure);`
1635 * creates a `StreamTransformer` that supports the `bind` method.
1636 * 2. `var boundStream = stream.transform(transformer);` binds the `stream`
1637 * and returns a bound stream that has a pointer to `stream`.
1638 * 3. `boundStream.listen(f1, onError: f2, onDone: f3, cancelOnError: b)`
1639 * starts the listening and transformation. This is accomplished
1640 * in 2 steps: first the `boundStream` invokes the `transformerClosure` with
1641 * the `stream` it captured: `transformerClosure(stream, b)`.
1642 * The result `subscription`, a [StreamSubscription], is then
1643 * updated to receive its handlers: `subscription.onData(f1)`,
1644 * `subscription.onError(f2)`, `subscription(f3)`. Finally the subscription
1645 * is returned as result of the `listen` call.
1646 * 1642 *
1647 * There are two common ways to create a StreamSubscription: 1643 * There are two common ways to create a StreamSubscription:
1648 * 1644 *
1649 * 1. by creating a new class that implements [StreamSubscription]. 1645 * 1. by allocating a [StreamController] and to return the result of
1646 * listening to its stream. It's important to forward pause, resume and
1647 * cancel events (unless the transformer intentionally wants to change
1648 * this behavior).
1649 * 2. by creating a new class that implements [StreamSubscription].
1650 * Note that the subscription should run callbacks in the [Zone] the 1650 * Note that the subscription should run callbacks in the [Zone] the
1651 * stream was listened to. 1651 * stream was listened to (see [Zone] and [Zone.bindCallback]).
1652 * 2. by allocating a [StreamController] and to return the result of
1653 * listening to its stream.
1654 * 1652 *
1655 * Example use of a duplicating transformer: 1653 * Example:
1656 * 1654 *
1657 * stringStream.transform(new StreamTransformer<String, String>( 1655 * ```
1658 * (Stream<String> input, bool cancelOnError) { 1656 * /// Starts listening to [input] and duplicates all non-error events.
1659 * StreamController<String> controller; 1657 * StreamSubscription<int> _onListen(Stream<int> input, bool cancelOnError) {
1660 * StreamSubscription<String> subscription; 1658 * StreamSubscription<String> subscription;
1661 * controller = new StreamController<String>( 1659 * // Create controller that forwards pause, resume and cancel events.
1662 * onListen: () { 1660 * var controller = new StreamController<String>(
1663 * subscription = input.listen((data) { 1661 * onPause: () {
1664 * // Duplicate the data. 1662 * subscription.pause();
1665 * controller.add(data); 1663 * },
1666 * controller.add(data); 1664 * onResume: () {
1667 * }, 1665 * subscription.resume();
1668 * onError: controller.addError, 1666 * },
1669 * onDone: controller.close, 1667 * onCancel: () => subscription.cancel(),
1670 * cancelOnError: cancelOnError); 1668 * sync: true); // "sync" is correct here, since events are forwarded.
1671 * }, 1669 *
1672 * onPause: () { subscription.pause(); }, 1670 * // Listen to the provided stream using `cancelOnError`.
1673 * onResume: () { subscription.resume(); }, 1671 * subscription = input.listen((data) {
1674 * onCancel: () => subscription.cancel(), 1672 * // Duplicate the data.
1675 * sync: true); 1673 * controller.add(data);
1676 * return controller.stream.listen(null); 1674 * controller.add(data);
1677 * }); 1675 * },
1676 * onError: controller.addError,
1677 * onDone: controller.close,
1678 * cancelOnError: cancelOnError);
1679 *
1680 * // Return a new [StreamSubscription] by listening to the controller's
1681 * // stream.
1682 * return controller.stream.listen(null);
1683 * }
1684 *
1685 * // Instantiate a transformer:
1686 * var duplicator = const StreamTransformer<int, int>(_onListen);
1687 *
1688 * // Use as follows:
1689 * intStream.transform(duplicator);
1690 * ```
1678 */ 1691 */
1679 const factory StreamTransformer( 1692 const factory StreamTransformer(
1680 StreamSubscription<T> transformer( 1693 StreamSubscription<T> onListen(
1681 Stream<S> stream, bool cancelOnError)) = 1694 Stream<S> stream, bool cancelOnError)) =
1682 _StreamSubscriptionTransformer<S, T>; 1695 _StreamSubscriptionTransformer<S, T>;
1683 1696
1684 /** 1697 /**
1685 * Creates a [StreamTransformer] that delegates events to the given functions. 1698 * Creates a [StreamTransformer] that delegates events to the given functions.
1686 * 1699 *
1687 * Example use of a duplicating transformer: 1700 * Example use of a duplicating transformer:
1688 * 1701 *
1689 * stringStream.transform(new StreamTransformer<String, String>.fromHandle rs( 1702 * stringStream.transform(new StreamTransformer<String, String>.fromHandle rs(
1690 * handleData: (String value, EventSink<String> sink) { 1703 * handleData: (String value, EventSink<String> sink) {
1691 * sink.add(value); 1704 * sink.add(value);
1692 * sink.add(value); // Duplicate the incoming events. 1705 * sink.add(value); // Duplicate the incoming events.
1693 * })); 1706 * }));
1694 */ 1707 */
1695 factory StreamTransformer.fromHandlers( 1708 factory StreamTransformer.fromHandlers(
1696 {void handleData(S data, EventSink<T> sink), 1709 {void handleData(S data, EventSink<T> sink),
1697 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), 1710 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
1698 void handleDone(EventSink<T> sink)}) = _StreamHandlerTransformer<S, T>; 1711 void handleDone(EventSink<T> sink)}) = _StreamHandlerTransformer<S, T>;
1699 1712
1700 /** 1713 /**
1701 * Transform the incoming [stream]'s events. 1714 * Transforms the provided [stream].
1702 * 1715 *
1703 * Creates a new stream. 1716 * Returns a new stream with events that are computed from events of the
1704 * When this stream is listened to, it will start listening on [stream], 1717 * provided [stream].
1705 * and generate events on the new stream based on the events from [stream].
1706 * 1718 *
1707 * Subscriptions on the returned stream should propagate pause state 1719 * Implementors of the [StreamTransformer] interface should document
1708 * to the subscription on [stream]. 1720 * differences from the following expected behavior:
1721 *
1722 * * When the returned stream is listened to, it starts listening to the
1723 * input [stream].
1724 * * Subscriptions of the returned stream forward (in a reasonable time)
1725 * a [StreamSubscription.pause] call to the subscription of the input
1726 * [stream].
1727 * * Similarly, canceling a subscription of the returned stream eventually
1728 * (in reasonable time) cancels the subscription of the input [stream].
1729 *
1730 * "Reasonable time" depends on the transformer and stream. Some transformers,
1731 * like a "timeout" transformer, might make these operations depend on a
1732 * duration. Others might not delay them at all, or just by a microtask.
1709 */ 1733 */
1710 Stream<T> bind(Stream<S> stream); 1734 Stream<T> bind(Stream<S> stream);
1711 } 1735 }
1712 1736
1713 /** 1737 /**
1714 * An [Iterator] like interface for the values of a [Stream]. 1738 * An [Iterator] like interface for the values of a [Stream].
1715 * 1739 *
1716 * This wraps a [Stream] and a subscription on the stream. It listens 1740 * This wraps a [Stream] and a subscription on the stream. It listens
1717 * on the stream, and completes the future returned by [moveNext] when the 1741 * on the stream, and completes the future returned by [moveNext] when the
1718 * next value becomes available. 1742 * next value becomes available.
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
1789 } 1813 }
1790 1814
1791 void addError(error, [StackTrace stackTrace]) { 1815 void addError(error, [StackTrace stackTrace]) {
1792 _sink.addError(error, stackTrace); 1816 _sink.addError(error, stackTrace);
1793 } 1817 }
1794 1818
1795 void close() { 1819 void close() {
1796 _sink.close(); 1820 _sink.close();
1797 } 1821 }
1798 } 1822 }
OLDNEW
« no previous file with comments | « no previous file | sdk/lib/async/stream_transformers.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698