Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2501)

Unified Diff: sdk/lib/async/stream.dart

Issue 25354003: Redo StreamTransformers so they work with Stack traces. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/async_sources.gypi ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream.dart
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
index 659360ffd2c094de2fc9bee69614512955e72abb..ab31d16a5dbf6837bf9d648fa6500a7a0ef1bfde 100644
--- a/sdk/lib/async/stream.dart
+++ b/sdk/lib/async/stream.dart
@@ -147,6 +147,49 @@ abstract class Stream<T> {
}
/**
+ * Creates a stream where all events of an existing stream are piped through
+ * a sink-transformation.
+ *
+ * The given [mapSink] closure is invoked when the returned stream is
+ * listened to. All events from the [source] are added into the event sink
+ * that is returned from the invocation. The transformation puts all
+ * transformed events into the sink the [mapSink] closure received during
+ * its invocation. Conceptually the [mapSink] creates a transformation pipe
+ * with the input sink being the returned [EventSink] and the output sink
+ * being the sink it received.
+ *
+ * This constructor is frequently used to build transformers.
+ *
+ * Example use for a duplicating transformer:
+ *
+ * class DuplicationSink implements EventSink<String> {
+ * final EventSink<String> _outputSink;
+ * DuplicationSink(this._outputSink);
+ *
+ * void add(String data) {
+ * _outputSink.add(data);
+ * _outputSink.add(data);
+ * }
+ *
+ * void addError(e, [st]) => _outputSink(e, st);
+ * void close() => _outputSink.close();
+ * }
+ *
+ * class DuplicationTransformer implements StreamTransformer<String, String> {
+ * // Some generic types ommitted for brevety.
+ * Stream bind(Stream stream) => new Stream<String>.eventTransform(
+ * stream,
+ * (EventSink sink) => new DuplicationSink(sink));
+ * }
+ *
+ * stringStream.transform(new DuplicationTransformer());
+ */
+ factory Stream.eventTransformed(Stream source,
+ EventSink mapSink(EventSink<T> sink)) {
+ return new _BoundSinkStream(source, mapSink);
+ }
+
+ /**
* Reports whether this stream is a broadcast stream.
*/
bool get isBroadcast => false;
@@ -956,7 +999,7 @@ abstract class EventSink<T> {
/** Create a data event */
void add(T event);
/** Create an async error. */
- void addError(errorEvent);
+ void addError(errorEvent, [StackTrace stackTrace]);
/** Request a stream to close. */
void close();
}
@@ -1037,222 +1080,98 @@ abstract class StreamSink<S> implements StreamConsumer<S>, EventSink<S> {
*
* The [Stream.transform] call will pass itself to this object and then return
* the resulting stream.
+ *
+ * It is good practice to write transformers that can be used multiple times.
*/
abstract class StreamTransformer<S, T> {
+
/**
- * Create a [StreamTransformer] that delegates events to the given functions.
+ * Creates a [StreamTransformer].
+ *
+ * The returned instance takes responsibility of implementing ([bind]).
+ * When the user invokes `bind` it returns a new "bound" stream. Only when
+ * the user starts listening to the bound stream, the `listen` method
+ * invokes the given closure [transformer].
+ *
+ * The [transformer] closure receives the stream, that was bound, as argument
+ * and returns a [StreamSubscription]. In almost all cases the closure
+ * listens itself to the stream that is given as argument.
+ *
+ * The result of invoking the [transformer] closure is a [StreamSubscription].
+ * The bound stream-transformer (created by the `bind` method above) then sets
+ * the handlers it received as part of the `listen` call.
+ *
+ * Conceptually this can be summarized as follows:
+ *
+ * 1. `var transformer = new StreamTransformer(transformerClosure);`
+ * creates a `StreamTransformer` that supports the `bind` method.
+ * 2. `var boundStream = stream.transform(transformer);` binds the `stream`
+ * and returns a bound stream that has a pointer to `stream`.
+ * 3. `boundStream.listen(f1, onError: f2, onDone: f3, cancelOnError: b)`
+ * starts the listening and transformation. This is accomplished
+ * in 2 steps: first the `boundStream` invokes the `transformerClosure` with
+ * the `stream` it captured: `transformerClosure(stream, b)`.
+ * The result `subscription`, a [StreamSubscription], is then
+ * updated to receive its handlers: `subscription.onData(f1)`,
+ * `subscription.onError(f2)`, `subscription(f3)`. Finally the subscription
+ * is returned as result of the `listen` call.
+ *
+ * There are two common ways to create a StreamSubscription:
+ *
+ * 1. by creating a new class that implements [StreamSubscription].
+ * Note that the subscription should run callbacks in the [Zone] the
+ * stream was listened to.
+ * 2. by allocating a [StreamController] and to return the result of
+ * listening to its stream.
+ *
+ * Example use of a duplicating transformer:
*
- * This is actually a [StreamEventTransformer] where the event handling is
- * performed by the function arguments.
- * If an argument is omitted, it acts as the corresponding default method from
- * [StreamEventTransformer].
+ * stringStream.transform(new StreamTransformer<String, String>(
+ * (Stream<String> input, bool cancelOnError) {
+ * StreamController<String> controller;
+ * StreamSubscription<String> subscription;
+ * controller = new StreamController<String>(
+ * onListen: () {
+ * subscription = input.listen((data) {
+ * // Duplicate the data.
+ * controller.add(data);
+ * controller.add(data);
+ * },
+ * onError: controller.addError,
+ * onDone: controller.close,
+ * cancelOnError: cancelOnError);
+ * },
+ * onPause: subscription.pause,
+ * onResume: subscription.resume,
+ * onCancel: subscription.cancel,
+ * sync: true);
+ * return controller.stream.listen(null);
+ * });
+ */
+ const factory StreamTransformer(
+ StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError))
+ = _StreamSubscriptionTransformer;
+
+ /**
+ * Creates a [StreamTransformer] that delegates events to the given functions.
*
- * Example use:
+ * Example use of a duplicating transformer:
*
- * stringStream.transform(new StreamTransformer<String, String>(
+ * stringStream.transform(new StreamTransformer<String, String>.fromHandlers(
* handleData: (String value, EventSink<String> sink) {
* sink.add(value);
* sink.add(value); // Duplicate the incoming events.
* }));
- *
*/
- factory StreamTransformer({
+ factory StreamTransformer.fromHandlers({
void handleData(S data, EventSink<T> sink),
- Function handleError,
- void handleDone(EventSink<T> sink)}) {
- return new _StreamTransformerImpl<S, T>(handleData,
- handleError,
- handleDone);
- }
+ void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
+ void handleDone(EventSink<T> sink)})
+ = _StreamHandlerTransformer;
Stream<T> bind(Stream<S> stream);
}
-
-/**
- * Base class for transformers that modifies stream events.
- *
- * A [StreamEventTransformer] transforms incoming Stream
- * events of one kind into outgoing events of (possibly) another kind.
- *
- * Subscribing on the stream returned by [bind] is the same as subscribing on
- * the source stream, except that events are passed through the [transformer]
- * before being emitted. The transformer may generate any number and
- * types of events for each incoming event. Pauses on the returned
- * subscription are forwarded to this stream.
- *
- * An example that duplicates all data events:
- *
- * class DoubleTransformer<T> extends StreamEventTransformer<T, T> {
- * void handleData(T data, EventSink<T> sink) {
- * sink.add(value);
- * sink.add(value);
- * }
- * }
- * someTypeStream.transform(new DoubleTransformer<Type>());
- *
- * The default implementations of the "handle" methods forward
- * the events unmodified. If using the default [handleData] the generic type [T]
- * needs to be assignable to [S].
- */
-abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> {
- const StreamEventTransformer();
-
- Stream<T> bind(Stream<S> source) {
- return new EventTransformStream<S, T>(source, this);
- }
-
- /**
- * Act on incoming data event.
- *
- * The method may generate any number of events on the sink, but should
- * not throw.
- */
- void handleData(S event, EventSink<T> sink) {
- var data = event;
- sink.add(data);
- }
-
- /**
- * Act on incoming error event.
- *
- * The method may generate any number of events on the sink, but should
- * not throw.
- */
- void handleError(error, EventSink<T> sink) {
- sink.addError(error);
- }
-
- /**
- * Act on incoming done event.
- *
- * The method may generate any number of events on the sink, but should
- * not throw.
- */
- void handleDone(EventSink<T> sink){
- sink.close();
- }
-}
-
-
-/**
- * Stream that transforms another stream by intercepting and replacing events.
- *
- * This [Stream] is a transformation of a source stream. Listening on this
- * stream is the same as listening on the source stream, except that events
- * are intercepted and modified by a [StreamEventTransformer] before becoming
- * events on this stream.
- */
-class EventTransformStream<S, T> extends Stream<T> {
- final Stream<S> _source;
- final StreamEventTransformer _transformer;
- EventTransformStream(Stream<S> source,
- StreamEventTransformer<S, T> transformer)
- : _source = source, _transformer = transformer;
-
- StreamSubscription<T> listen(void onData(T data),
- { Function onError,
- void onDone(),
- bool cancelOnError }) {
- if (onData == null) onData = _nullDataHandler;
- if (onError == null) onError = _nullErrorHandler;
- if (onDone == null) onDone = _nullDoneHandler;
- cancelOnError = identical(true, cancelOnError);
- return new _EventTransformStreamSubscription(_source, _transformer,
- onData, onError, onDone,
- cancelOnError);
- }
-}
-
-class _EventTransformStreamSubscription<S, T>
- extends _BufferingStreamSubscription<T> {
- /** The transformer used to transform events. */
- final StreamEventTransformer<S, T> _transformer;
-
- /** Whether this stream has sent a done event. */
- bool _isClosed = false;
-
- /** Source of incoming events. */
- StreamSubscription<S> _subscription;
-
- /** Cached EventSink wrapper for this class. */
- EventSink<T> _sink;
-
- _EventTransformStreamSubscription(Stream<S> source,
- this._transformer,
- void onData(T data),
- Function onError,
- void onDone(),
- bool cancelOnError)
- : super(onData, onError, onDone, cancelOnError) {
- _sink = new _EventSinkAdapter<T>(this);
- _subscription = source.listen(_handleData,
- onError: _handleError,
- onDone: _handleDone);
- }
-
- /** Whether this subscription is still subscribed to its source. */
- bool get _isSubscribed => _subscription != null;
-
- void _onPause() {
- if (_isSubscribed) _subscription.pause();
- }
-
- void _onResume() {
- if (_isSubscribed) _subscription.resume();
- }
-
- void _onCancel() {
- if (_isSubscribed) {
- StreamSubscription subscription = _subscription;
- _subscription = null;
- subscription.cancel();
- }
- _isClosed = true;
- }
-
- void _handleData(S data) {
- try {
- _transformer.handleData(data, _sink);
- } catch (e, s) {
- _addError(_asyncError(e, s), s);
- }
- }
-
- void _handleError(error, [stackTrace]) {
- try {
- _transformer.handleError(error, _sink);
- } catch (e, s) {
- if (identical(e, error)) {
- _addError(error, stackTrace);
- } else {
- _addError(_asyncError(e, s), s);
- }
- }
- }
-
- void _handleDone() {
- try {
- _subscription = null;
- _transformer.handleDone(_sink);
- } catch (e, s) {
- _addError(_asyncError(e, s), s);
- }
- }
-}
-
-class _EventSinkAdapter<T> implements EventSink<T> {
- _EventSink _sink;
- _EventSinkAdapter(this._sink);
-
- void add(T data) { _sink._add(data); }
- void addError(error, [StackTrace stackTrace]) {
- _sink._addError(error, stackTrace);
- }
- void close() { _sink._close(); }
-}
-
-
/**
* An [Iterable] like interface for the values of a [Stream].
*
« no previous file with comments | « sdk/lib/async/async_sources.gypi ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698