Index: sdk/lib/async/stream.dart |
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
index 9a2eb69f9390aa370fc7373353837076cfa41d0f..ca5c97b1d2f6be6ffc3871bd84355b232a46d4fc 100644 |
--- a/sdk/lib/async/stream.dart |
+++ b/sdk/lib/async/stream.dart |
@@ -147,6 +147,49 @@ abstract class Stream<T> { |
} |
/** |
+ * Creates a stream where all events of an existing stream are piped through |
+ * a sink-transformation. |
+ * |
+ * The given [mapSink] closure is invoked when the returned stream is |
+ * listened to. All events from the [source] are added into the event sink |
+ * that is returned from the invocation. The transformation puts all |
+ * transformed events into the sink the [mapSink] closure received during |
+ * its invocation. Conceptually the [mapSink] creates a transformation pipe |
+ * with the input sink being the returned [EventSink] and the output sink |
+ * being the sink it received. |
+ * |
+ * This constructor is frequently used to build transformers. |
+ * |
+ * Example use for a duplicating transformer: |
+ * |
+ * class DuplicationSink implements EventSink<String> { |
+ * final EventSink<String> _outputSink; |
+ * DuplicationSink(this._outputSink); |
+ * |
+ * void add(String data) { |
+ * _outputSink.add(data); |
+ * _outputSink.add(data); |
+ * } |
+ * |
+ * void addError(e, [st]) => _outputSink(e, st); |
+ * void close() => _outputSink.close(); |
+ * } |
+ * |
+ * class DuplicationTransformer implements StreamTransformer<String, String> { |
+ * // Some generic types ommitted for brevety. |
+ * Stream bind(Stream stream) => new Stream<String>.eventTransform( |
+ * stream, |
+ * (EventSink sink) => new DuplicationSink(sink)); |
+ * } |
+ * |
+ * stringStream.transform(new DuplicationTransformer()); |
+ */ |
+ factory Stream.eventTransformed(Stream source, |
+ EventSink mapSink(EventSink<T> sink)) { |
+ return new _BoundSinkStream(source, mapSink); |
+ } |
+ |
+ /** |
* Reports whether this stream is a broadcast stream. |
*/ |
bool get isBroadcast => false; |
@@ -950,7 +993,7 @@ abstract class EventSink<T> { |
/** Create a data event */ |
void add(T event); |
/** Create an async error. */ |
- void addError(errorEvent); |
+ void addError(errorEvent, [StackTrace stackTrace]); |
/** Request a stream to close. */ |
void close(); |
} |
@@ -1031,222 +1074,80 @@ abstract class StreamSink<S> implements StreamConsumer<S>, EventSink<S> { |
* |
* The [Stream.transform] call will pass itself to this object and then return |
* the resulting stream. |
+ * |
+ * It is good practice to create transformers that can be used multiple times. |
Lasse Reichstein Nielsen
2013/10/07 11:47:00
By "create", do you mean that it is good practice
floitsch
2013/10/10 15:39:57
Done.
|
*/ |
abstract class StreamTransformer<S, T> { |
+ |
/** |
- * Create a [StreamTransformer] that delegates events to the given functions. |
+ * Creates a [StreamTransformer]. |
* |
- * This is actually a [StreamEventTransformer] where the event handling is |
- * performed by the function arguments. |
- * If an argument is omitted, it acts as the corresponding default method from |
- * [StreamEventTransformer]. |
+ * The returned instance takes responsibility of binding ([bind]) and only |
+ * invokes the given closure [transformer] when a user starts listening on |
+ * the bound stream. At that point the transformer should start listening |
Lasse Reichstein Nielsen
2013/10/07 11:47:00
User listens on the returned stream, not the bound
floitsch
2013/10/10 15:39:57
rewritten.
|
+ * to the bound stream (which is given as argument) and return a |
+ * [StreamSubscription]. The bound stream-transformer then sets the handlers |
Lasse Reichstein Nielsen
2013/10/07 11:47:00
How is the stream-transformer bound, and to what?
floitsch
2013/10/10 15:39:57
rewritten.
|
+ * it received as part of the `listen` call. Missing handlers (or the ones |
Lasse Reichstein Nielsen
2013/10/07 11:47:00
Sounds like the handlers, which were received as p
floitsch
2013/10/10 15:39:57
rewritten.
|
+ * that are `null`) are also set (with `null`). |
Lasse Reichstein Nielsen
2013/10/07 11:47:00
Last sentence seems superflous.
floitsch
2013/10/10 15:39:57
It was meant to say that the Subscription can rely
|
* |
- * Example use: |
+ * There are two common ways to create a StreamSubscription: |
* |
- * stringStream.transform(new StreamTransformer<String, String>( |
- * handleData: (String value, EventSink<String> sink) { |
- * sink.add(value); |
- * sink.add(value); // Duplicate the incoming events. |
- * })); |
+ * 1. by creating a new class that implements [StreamSubscription]. |
+ * Note that the subscription should run callbacks in the [Zone] the |
+ * stream was listened to. |
+ * 2. by allocating a [StreamController] and to return the result of |
+ * listening to its stream. |
* |
- */ |
- factory StreamTransformer({ |
- void handleData(S data, EventSink<T> sink), |
- Function handleError, |
- void handleDone(EventSink<T> sink)}) { |
- return new _StreamTransformerImpl<S, T>(handleData, |
- handleError, |
- handleDone); |
- } |
- |
- Stream<T> bind(Stream<S> stream); |
-} |
- |
- |
-/** |
- * Base class for transformers that modifies stream events. |
- * |
- * A [StreamEventTransformer] transforms incoming Stream |
- * events of one kind into outgoing events of (possibly) another kind. |
- * |
- * Subscribing on the stream returned by [bind] is the same as subscribing on |
- * the source stream, except that events are passed through the [transformer] |
- * before being emitted. The transformer may generate any number and |
- * types of events for each incoming event. Pauses on the returned |
- * subscription are forwarded to this stream. |
- * |
- * An example that duplicates all data events: |
- * |
- * class DoubleTransformer<T> extends StreamEventTransformer<T, T> { |
- * void handleData(T data, EventSink<T> sink) { |
- * sink.add(value); |
- * sink.add(value); |
- * } |
- * } |
- * someTypeStream.transform(new DoubleTransformer<Type>()); |
- * |
- * The default implementations of the "handle" methods forward |
- * the events unmodified. If using the default [handleData] the generic type [T] |
- * needs to be assignable to [S]. |
- */ |
-abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { |
- const StreamEventTransformer(); |
- |
- Stream<T> bind(Stream<S> source) { |
- return new EventTransformStream<S, T>(source, this); |
- } |
- |
- /** |
- * Act on incoming data event. |
+ * Example use of a duplicating transformer: |
* |
- * The method may generate any number of events on the sink, but should |
- * not throw. |
+ * stringStream.transform(new StreamTransformer<String, String>( |
+ * (Stream<String> input, bool cancelOnError) { |
+ * StreamController<String> controller; |
+ * StreamSubscription<String> subscription; |
+ * controller = new StreamController<String>( |
+ * onListen: () { |
+ * subscription = input.listen((data) { |
+ * // Duplicate the data. |
+ * controller.add(data); |
+ * controller.add(data); |
+ * }, |
+ * onError: controller.addError, |
+ * onDone: controller.close, |
+ * cancelOnError: cancelOnError); |
+ * }, |
+ * onPause: subscription.pause, |
+ * onResume: subscription.resume, |
+ * onCancel: subscription.cancel, |
+ * sync: true); // One of the few places, where sync is correct. |
Lasse Reichstein Nielsen
2013/10/07 11:47:00
Either remove the comment or elaborate on why it i
floitsch
2013/10/10 15:39:57
Removed.
|
+ * return controller.stream.listen(null); |
+ * }); |
*/ |
- void handleData(S event, EventSink<T> sink) { |
- var data = event; |
- sink.add(data); |
- } |
+ const factory StreamTransformer( |
+ StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) |
+ = _StreamSubscriptionTransformer; |
/** |
- * Act on incoming error event. |
+ * Creates a [StreamTransformer] that delegates events to the given functions. |
* |
- * The method may generate any number of events on the sink, but should |
- * not throw. |
- */ |
- void handleError(error, EventSink<T> sink) { |
- sink.addError(error); |
- } |
- |
- /** |
- * Act on incoming done event. |
+ * This constructor returns a transformer that can only be used once. |
Lasse Reichstein Nielsen
2013/10/07 11:47:00
Why can it only be used once? If the handlers them
floitsch
2013/10/10 15:39:57
Done.
|
+ * |
+ * Example use of a duplicating transformer: |
* |
- * The method may generate any number of events on the sink, but should |
- * not throw. |
+ * stringStream.transform(new StreamTransformer<String, String>.fromHandlers( |
+ * handleData: (String value, EventSink<String> sink) { |
+ * sink.add(value); |
+ * sink.add(value); // Duplicate the incoming events. |
+ * })); |
*/ |
- void handleDone(EventSink<T> sink){ |
- sink.close(); |
- } |
-} |
- |
- |
-/** |
- * Stream that transforms another stream by intercepting and replacing events. |
- * |
- * This [Stream] is a transformation of a source stream. Listening on this |
- * stream is the same as listening on the source stream, except that events |
- * are intercepted and modified by a [StreamEventTransformer] before becoming |
- * events on this stream. |
- */ |
-class EventTransformStream<S, T> extends Stream<T> { |
- final Stream<S> _source; |
- final StreamEventTransformer _transformer; |
- EventTransformStream(Stream<S> source, |
- StreamEventTransformer<S, T> transformer) |
- : _source = source, _transformer = transformer; |
- |
- StreamSubscription<T> listen(void onData(T data), |
- { Function onError, |
- void onDone(), |
- bool cancelOnError }) { |
- if (onData == null) onData = _nullDataHandler; |
- if (onError == null) onError = _nullErrorHandler; |
- if (onDone == null) onDone = _nullDoneHandler; |
- cancelOnError = identical(true, cancelOnError); |
- return new _EventTransformStreamSubscription(_source, _transformer, |
- onData, onError, onDone, |
- cancelOnError); |
- } |
-} |
- |
-class _EventTransformStreamSubscription<S, T> |
- extends _BufferingStreamSubscription<T> { |
- /** The transformer used to transform events. */ |
- final StreamEventTransformer<S, T> _transformer; |
- |
- /** Whether this stream has sent a done event. */ |
- bool _isClosed = false; |
- |
- /** Source of incoming events. */ |
- StreamSubscription<S> _subscription; |
- |
- /** Cached EventSink wrapper for this class. */ |
- EventSink<T> _sink; |
- |
- _EventTransformStreamSubscription(Stream<S> source, |
- this._transformer, |
- void onData(T data), |
- Function onError, |
- void onDone(), |
- bool cancelOnError) |
- : super(onData, onError, onDone, cancelOnError) { |
- _sink = new _EventSinkAdapter<T>(this); |
- _subscription = source.listen(_handleData, |
- onError: _handleError, |
- onDone: _handleDone); |
- } |
- |
- /** Whether this subscription is still subscribed to its source. */ |
- bool get _isSubscribed => _subscription != null; |
- |
- void _onPause() { |
- if (_isSubscribed) _subscription.pause(); |
- } |
- |
- void _onResume() { |
- if (_isSubscribed) _subscription.resume(); |
- } |
- |
- void _onCancel() { |
- if (_isSubscribed) { |
- StreamSubscription subscription = _subscription; |
- _subscription = null; |
- subscription.cancel(); |
- } |
- _isClosed = true; |
- } |
- |
- void _handleData(S data) { |
- try { |
- _transformer.handleData(data, _sink); |
- } catch (e, s) { |
- _addError(_asyncError(e, s), s); |
- } |
- } |
- |
- void _handleError(error, [stackTrace]) { |
- try { |
- _transformer.handleError(error, _sink); |
- } catch (e, s) { |
- if (identical(e, error)) { |
- _addError(error, stackTrace); |
- } else { |
- _addError(_asyncError(e, s), s); |
- } |
- } |
- } |
- |
- void _handleDone() { |
- try { |
- _subscription = null; |
- _transformer.handleDone(_sink); |
- } catch (e, s) { |
- _addError(_asyncError(e, s), s); |
- } |
- } |
-} |
- |
-class _EventSinkAdapter<T> implements EventSink<T> { |
- _EventSink _sink; |
- _EventSinkAdapter(this._sink); |
+ factory StreamTransformer.fromHandlers({ |
+ void handleData(S data, EventSink<T> sink), |
+ void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), |
+ void handleDone(EventSink<T> sink)}) |
+ = _StreamHandlerTransformer; |
- void add(T data) { _sink._add(data); } |
- void addError(error, [StackTrace stackTrace]) { |
- _sink._addError(error, stackTrace); |
- } |
- void close() { _sink._close(); } |
+ Stream<T> bind(Stream<S> stream); |
} |
- |
/** |
* An [Iterable] like interface for the values of a [Stream]. |
* |