Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(147)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 25354003: Redo StreamTransformers so they work with Stack traces. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/async_sources.gypi ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after
140 }); 140 });
141 }, 141 },
142 onCancel: () { 142 onCancel: () {
143 if (timer != null) timer.cancel(); 143 if (timer != null) timer.cancel();
144 timer = null; 144 timer = null;
145 }); 145 });
146 return controller.stream; 146 return controller.stream;
147 } 147 }
148 148
149 /** 149 /**
150 * Creates a stream where all events of an existing stream are piped through
151 * a sink-transformation.
152 *
153 * The given [mapSink] closure is invoked when the returned stream is
154 * listened to. All events from the [source] are added into the event sink
155 * that is returned from the invocation. The transformation puts all
156 * transformed events into the sink the [mapSink] closure received during
157 * its invocation. Conceptually the [mapSink] creates a transformation pipe
158 * with the input sink being the returned [EventSink] and the output sink
159 * being the sink it received.
160 *
161 * This constructor is frequently used to build transformers.
162 *
163 * Example use for a duplicating transformer:
164 *
165 * class DuplicationSink implements EventSink<String> {
166 * final EventSink<String> _outputSink;
167 * DuplicationSink(this._outputSink);
168 *
169 * void add(String data) {
170 * _outputSink.add(data);
171 * _outputSink.add(data);
172 * }
173 *
174 * void addError(e, [st]) => _outputSink(e, st);
175 * void close() => _outputSink.close();
176 * }
177 *
178 * class DuplicationTransformer implements StreamTransformer<String, Strin g> {
179 * // Some generic types ommitted for brevety.
180 * Stream bind(Stream stream) => new Stream<String>.eventTransform(
181 * stream,
182 * (EventSink sink) => new DuplicationSink(sink));
183 * }
184 *
185 * stringStream.transform(new DuplicationTransformer());
186 */
187 factory Stream.eventTransformed(Stream source,
188 EventSink mapSink(EventSink<T> sink)) {
189 return new _BoundSinkStream(source, mapSink);
190 }
191
192 /**
150 * Reports whether this stream is a broadcast stream. 193 * Reports whether this stream is a broadcast stream.
151 */ 194 */
152 bool get isBroadcast => false; 195 bool get isBroadcast => false;
153 196
154 /** 197 /**
155 * Returns a multi-subscription stream that produces the same events as this. 198 * Returns a multi-subscription stream that produces the same events as this.
156 * 199 *
157 * If this stream is already a broadcast stream, it is returned unmodified. 200 * If this stream is already a broadcast stream, it is returned unmodified.
158 * 201 *
159 * If this stream is single-subscription, return a new stream that allows 202 * If this stream is single-subscription, return a new stream that allows
(...skipping 789 matching lines...) Expand 10 before | Expand all | Expand 10 after
949 } 992 }
950 993
951 994
952 /** 995 /**
953 * An interface that abstracts creation or handling of [Stream] events. 996 * An interface that abstracts creation or handling of [Stream] events.
954 */ 997 */
955 abstract class EventSink<T> { 998 abstract class EventSink<T> {
956 /** Create a data event */ 999 /** Create a data event */
957 void add(T event); 1000 void add(T event);
958 /** Create an async error. */ 1001 /** Create an async error. */
959 void addError(errorEvent); 1002 void addError(errorEvent, [StackTrace stackTrace]);
960 /** Request a stream to close. */ 1003 /** Request a stream to close. */
961 void close(); 1004 void close();
962 } 1005 }
963 1006
964 1007
965 /** [Stream] wrapper that only exposes the [Stream] interface. */ 1008 /** [Stream] wrapper that only exposes the [Stream] interface. */
966 class StreamView<T> extends Stream<T> { 1009 class StreamView<T> extends Stream<T> {
967 Stream<T> _stream; 1010 Stream<T> _stream;
968 1011
969 StreamView(this._stream); 1012 StreamView(this._stream);
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
1030 */ 1073 */
1031 Future get done; 1074 Future get done;
1032 } 1075 }
1033 1076
1034 1077
1035 /** 1078 /**
1036 * The target of a [Stream.transform] call. 1079 * The target of a [Stream.transform] call.
1037 * 1080 *
1038 * The [Stream.transform] call will pass itself to this object and then return 1081 * The [Stream.transform] call will pass itself to this object and then return
1039 * the resulting stream. 1082 * the resulting stream.
1083 *
1084 * It is good practice to write transformers that can be used multiple times.
1040 */ 1085 */
1041 abstract class StreamTransformer<S, T> { 1086 abstract class StreamTransformer<S, T> {
1087
1042 /** 1088 /**
1043 * Create a [StreamTransformer] that delegates events to the given functions. 1089 * Creates a [StreamTransformer].
1044 * 1090 *
1045 * This is actually a [StreamEventTransformer] where the event handling is 1091 * The returned instance takes responsibility of implementing ([bind]).
1046 * performed by the function arguments. 1092 * When the user invokes `bind` it returns a new "bound" stream. Only when
1047 * If an argument is omitted, it acts as the corresponding default method from 1093 * the user starts listening to the bound stream, the `listen` method
1048 * [StreamEventTransformer]. 1094 * invokes the given closure [transformer].
1049 * 1095 *
1050 * Example use: 1096 * The [transformer] closure receives the stream, that was bound, as argument
1097 * and returns a [StreamSubscription]. In almost all cases the closure
1098 * listens itself to the stream that is given as argument.
1099 *
1100 * The result of invoking the [transformer] closure is a [StreamSubscription].
1101 * The bound stream-transformer (created by the `bind` method above) then sets
1102 * the handlers it received as part of the `listen` call.
1103 *
1104 * Conceptually this can be summarized as follows:
1105 *
1106 * 1. `var transformer = new StreamTransformer(transformerClosure);`
1107 * creates a `StreamTransformer` that supports the `bind` method.
1108 * 2. `var boundStream = stream.transform(transformer);` binds the `stream`
1109 * and returns a bound stream that has a pointer to `stream`.
1110 * 3. `boundStream.listen(f1, onError: f2, onDone: f3, cancelOnError: b)`
1111 * starts the listening and transformation. This is accomplished
1112 * in 2 steps: first the `boundStream` invokes the `transformerClosure` with
1113 * the `stream` it captured: `transformerClosure(stream, b)`.
1114 * The result `subscription`, a [StreamSubscription], is then
1115 * updated to receive its handlers: `subscription.onData(f1)`,
1116 * `subscription.onError(f2)`, `subscription(f3)`. Finally the subscription
1117 * is returned as result of the `listen` call.
1118 *
1119 * There are two common ways to create a StreamSubscription:
1120 *
1121 * 1. by creating a new class that implements [StreamSubscription].
1122 * Note that the subscription should run callbacks in the [Zone] the
1123 * stream was listened to.
1124 * 2. by allocating a [StreamController] and to return the result of
1125 * listening to its stream.
1126 *
1127 * Example use of a duplicating transformer:
1051 * 1128 *
1052 * stringStream.transform(new StreamTransformer<String, String>( 1129 * stringStream.transform(new StreamTransformer<String, String>(
1130 * (Stream<String> input, bool cancelOnError) {
1131 * StreamController<String> controller;
1132 * StreamSubscription<String> subscription;
1133 * controller = new StreamController<String>(
1134 * onListen: () {
1135 * subscription = input.listen((data) {
1136 * // Duplicate the data.
1137 * controller.add(data);
1138 * controller.add(data);
1139 * },
1140 * onError: controller.addError,
1141 * onDone: controller.close,
1142 * cancelOnError: cancelOnError);
1143 * },
1144 * onPause: subscription.pause,
1145 * onResume: subscription.resume,
1146 * onCancel: subscription.cancel,
1147 * sync: true);
1148 * return controller.stream.listen(null);
1149 * });
1150 */
1151 const factory StreamTransformer(
1152 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError))
1153 = _StreamSubscriptionTransformer;
1154
1155 /**
1156 * Creates a [StreamTransformer] that delegates events to the given functions.
1157 *
1158 * Example use of a duplicating transformer:
1159 *
1160 * stringStream.transform(new StreamTransformer<String, String>.fromHandle rs(
1053 * handleData: (String value, EventSink<String> sink) { 1161 * handleData: (String value, EventSink<String> sink) {
1054 * sink.add(value); 1162 * sink.add(value);
1055 * sink.add(value); // Duplicate the incoming events. 1163 * sink.add(value); // Duplicate the incoming events.
1056 * })); 1164 * }));
1057 *
1058 */ 1165 */
1059 factory StreamTransformer({ 1166 factory StreamTransformer.fromHandlers({
1060 void handleData(S data, EventSink<T> sink), 1167 void handleData(S data, EventSink<T> sink),
1061 Function handleError, 1168 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
1062 void handleDone(EventSink<T> sink)}) { 1169 void handleDone(EventSink<T> sink)})
1063 return new _StreamTransformerImpl<S, T>(handleData, 1170 = _StreamHandlerTransformer;
1064 handleError,
1065 handleDone);
1066 }
1067 1171
1068 Stream<T> bind(Stream<S> stream); 1172 Stream<T> bind(Stream<S> stream);
1069 } 1173 }
1070 1174
1071
1072 /**
1073 * Base class for transformers that modifies stream events.
1074 *
1075 * A [StreamEventTransformer] transforms incoming Stream
1076 * events of one kind into outgoing events of (possibly) another kind.
1077 *
1078 * Subscribing on the stream returned by [bind] is the same as subscribing on
1079 * the source stream, except that events are passed through the [transformer]
1080 * before being emitted. The transformer may generate any number and
1081 * types of events for each incoming event. Pauses on the returned
1082 * subscription are forwarded to this stream.
1083 *
1084 * An example that duplicates all data events:
1085 *
1086 * class DoubleTransformer<T> extends StreamEventTransformer<T, T> {
1087 * void handleData(T data, EventSink<T> sink) {
1088 * sink.add(value);
1089 * sink.add(value);
1090 * }
1091 * }
1092 * someTypeStream.transform(new DoubleTransformer<Type>());
1093 *
1094 * The default implementations of the "handle" methods forward
1095 * the events unmodified. If using the default [handleData] the generic type [T]
1096 * needs to be assignable to [S].
1097 */
1098 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> {
1099 const StreamEventTransformer();
1100
1101 Stream<T> bind(Stream<S> source) {
1102 return new EventTransformStream<S, T>(source, this);
1103 }
1104
1105 /**
1106 * Act on incoming data event.
1107 *
1108 * The method may generate any number of events on the sink, but should
1109 * not throw.
1110 */
1111 void handleData(S event, EventSink<T> sink) {
1112 var data = event;
1113 sink.add(data);
1114 }
1115
1116 /**
1117 * Act on incoming error event.
1118 *
1119 * The method may generate any number of events on the sink, but should
1120 * not throw.
1121 */
1122 void handleError(error, EventSink<T> sink) {
1123 sink.addError(error);
1124 }
1125
1126 /**
1127 * Act on incoming done event.
1128 *
1129 * The method may generate any number of events on the sink, but should
1130 * not throw.
1131 */
1132 void handleDone(EventSink<T> sink){
1133 sink.close();
1134 }
1135 }
1136
1137
1138 /**
1139 * Stream that transforms another stream by intercepting and replacing events.
1140 *
1141 * This [Stream] is a transformation of a source stream. Listening on this
1142 * stream is the same as listening on the source stream, except that events
1143 * are intercepted and modified by a [StreamEventTransformer] before becoming
1144 * events on this stream.
1145 */
1146 class EventTransformStream<S, T> extends Stream<T> {
1147 final Stream<S> _source;
1148 final StreamEventTransformer _transformer;
1149 EventTransformStream(Stream<S> source,
1150 StreamEventTransformer<S, T> transformer)
1151 : _source = source, _transformer = transformer;
1152
1153 StreamSubscription<T> listen(void onData(T data),
1154 { Function onError,
1155 void onDone(),
1156 bool cancelOnError }) {
1157 if (onData == null) onData = _nullDataHandler;
1158 if (onError == null) onError = _nullErrorHandler;
1159 if (onDone == null) onDone = _nullDoneHandler;
1160 cancelOnError = identical(true, cancelOnError);
1161 return new _EventTransformStreamSubscription(_source, _transformer,
1162 onData, onError, onDone,
1163 cancelOnError);
1164 }
1165 }
1166
1167 class _EventTransformStreamSubscription<S, T>
1168 extends _BufferingStreamSubscription<T> {
1169 /** The transformer used to transform events. */
1170 final StreamEventTransformer<S, T> _transformer;
1171
1172 /** Whether this stream has sent a done event. */
1173 bool _isClosed = false;
1174
1175 /** Source of incoming events. */
1176 StreamSubscription<S> _subscription;
1177
1178 /** Cached EventSink wrapper for this class. */
1179 EventSink<T> _sink;
1180
1181 _EventTransformStreamSubscription(Stream<S> source,
1182 this._transformer,
1183 void onData(T data),
1184 Function onError,
1185 void onDone(),
1186 bool cancelOnError)
1187 : super(onData, onError, onDone, cancelOnError) {
1188 _sink = new _EventSinkAdapter<T>(this);
1189 _subscription = source.listen(_handleData,
1190 onError: _handleError,
1191 onDone: _handleDone);
1192 }
1193
1194 /** Whether this subscription is still subscribed to its source. */
1195 bool get _isSubscribed => _subscription != null;
1196
1197 void _onPause() {
1198 if (_isSubscribed) _subscription.pause();
1199 }
1200
1201 void _onResume() {
1202 if (_isSubscribed) _subscription.resume();
1203 }
1204
1205 void _onCancel() {
1206 if (_isSubscribed) {
1207 StreamSubscription subscription = _subscription;
1208 _subscription = null;
1209 subscription.cancel();
1210 }
1211 _isClosed = true;
1212 }
1213
1214 void _handleData(S data) {
1215 try {
1216 _transformer.handleData(data, _sink);
1217 } catch (e, s) {
1218 _addError(_asyncError(e, s), s);
1219 }
1220 }
1221
1222 void _handleError(error, [stackTrace]) {
1223 try {
1224 _transformer.handleError(error, _sink);
1225 } catch (e, s) {
1226 if (identical(e, error)) {
1227 _addError(error, stackTrace);
1228 } else {
1229 _addError(_asyncError(e, s), s);
1230 }
1231 }
1232 }
1233
1234 void _handleDone() {
1235 try {
1236 _subscription = null;
1237 _transformer.handleDone(_sink);
1238 } catch (e, s) {
1239 _addError(_asyncError(e, s), s);
1240 }
1241 }
1242 }
1243
1244 class _EventSinkAdapter<T> implements EventSink<T> {
1245 _EventSink _sink;
1246 _EventSinkAdapter(this._sink);
1247
1248 void add(T data) { _sink._add(data); }
1249 void addError(error, [StackTrace stackTrace]) {
1250 _sink._addError(error, stackTrace);
1251 }
1252 void close() { _sink._close(); }
1253 }
1254
1255
1256 /** 1175 /**
1257 * An [Iterable] like interface for the values of a [Stream]. 1176 * An [Iterable] like interface for the values of a [Stream].
1258 * 1177 *
1259 * This wraps a [Stream] and a subscription on the stream. It listens 1178 * This wraps a [Stream] and a subscription on the stream. It listens
1260 * on the stream, and completes the future returned by [moveNext] when the 1179 * on the stream, and completes the future returned by [moveNext] when the
1261 * next value becomes available. 1180 * next value becomes available.
1262 * 1181 *
1263 * NOTICE: This is a tentative design. This class may change. 1182 * NOTICE: This is a tentative design. This class may change.
1264 */ 1183 */
1265 abstract class StreamIterator<T> { 1184 abstract class StreamIterator<T> {
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
1297 * 1216 *
1298 * If a [moveNext] call has been made, it will complete with `false` as value, 1217 * If a [moveNext] call has been made, it will complete with `false` as value,
1299 * as will all further calls to [moveNext]. 1218 * as will all further calls to [moveNext].
1300 * 1219 *
1301 * If you need to stop listening for values before the stream iterator is 1220 * If you need to stop listening for values before the stream iterator is
1302 * automatically closed, you must call [cancel] to ensure that the stream 1221 * automatically closed, you must call [cancel] to ensure that the stream
1303 * is properly closed. 1222 * is properly closed.
1304 */ 1223 */
1305 void cancel(); 1224 void cancel();
1306 } 1225 }
OLDNEW
« no previous file with comments | « sdk/lib/async/async_sources.gypi ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698