Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(7)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 25354003: Redo StreamTransformers so they work with Stack traces. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Fix two more tests. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after
140 }); 140 });
141 }, 141 },
142 onCancel: () { 142 onCancel: () {
143 if (timer != null) timer.cancel(); 143 if (timer != null) timer.cancel();
144 timer = null; 144 timer = null;
145 }); 145 });
146 return controller.stream; 146 return controller.stream;
147 } 147 }
148 148
149 /** 149 /**
150 * Creates a stream where all events of an existing stream are piped through
151 * a sink-transformation.
152 *
153 * The given [mapSink] closure is invoked when the returned stream is
154 * listened to. All events from the [source] are added into the event sink
155 * that is returned from the invocation. The transformation puts all
156 * transformed events into the sink the [mapSink] closure received during
157 * its invocation. Conceptually the [mapSink] creates a transformation pipe
158 * with the input sink being the returned [EventSink] and the output sink
159 * being the sink it received.
160 *
161 * This constructor is frequently used to build transformers.
162 *
163 * Example use for a duplicating transformer:
164 *
165 * class DuplicationSink implements EventSink<String> {
166 * final EventSink<String> _outputSink;
167 * DuplicationSink(this._outputSink);
168 *
169 * void add(String data) {
170 * _outputSink.add(data);
171 * _outputSink.add(data);
172 * }
173 *
174 * void addError(e, [st]) => _outputSink(e, st);
175 * void close() => _outputSink.close();
176 * }
177 *
178 * class DuplicationTransformer implements StreamTransformer<String, Strin g> {
Lasse Reichstein Nielsen 2013/10/04 14:35:53 Long line.
floitsch 2013/10/05 18:47:40 I know, but since the actual code is less than 80
179 * // Some generic types ommitted for brevety.
180 * Stream bind(Stream stream) => new Stream<String>.eventTransform(
181 * stream,
182 * (EventSink sink) => new DuplicationSink(sink));
183 * }
184 *
185 * stringStream.transform(new DuplicationTransformer());
186 */
187 factory Stream.eventTransformed(Stream source,
188 EventSink mapSink(EventSink<T> sink)) {
189 return new _BoundSinkStream(source, mapSink);
190 }
191
192 /**
150 * Reports whether this stream is a broadcast stream. 193 * Reports whether this stream is a broadcast stream.
151 */ 194 */
152 bool get isBroadcast => false; 195 bool get isBroadcast => false;
153 196
154 /** 197 /**
155 * Returns a multi-subscription stream that produces the same events as this. 198 * Returns a multi-subscription stream that produces the same events as this.
156 * 199 *
157 * If this stream is already a broadcast stream, it is returned unmodified. 200 * If this stream is already a broadcast stream, it is returned unmodified.
158 * 201 *
159 * If this stream is single-subscription, return a new stream that allows 202 * If this stream is single-subscription, return a new stream that allows
(...skipping 784 matching lines...) Expand 10 before | Expand all | Expand 10 after
944 } 987 }
945 988
946 989
947 /** 990 /**
948 * An interface that abstracts creation or handling of [Stream] events. 991 * An interface that abstracts creation or handling of [Stream] events.
949 */ 992 */
950 abstract class EventSink<T> { 993 abstract class EventSink<T> {
951 /** Create a data event */ 994 /** Create a data event */
952 void add(T event); 995 void add(T event);
953 /** Create an async error. */ 996 /** Create an async error. */
954 void addError(errorEvent); 997 void addError(errorEvent, [StackTrace stackTrace]);
955 /** Request a stream to close. */ 998 /** Request a stream to close. */
956 void close(); 999 void close();
957 } 1000 }
958 1001
959 1002
960 /** [Stream] wrapper that only exposes the [Stream] interface. */ 1003 /** [Stream] wrapper that only exposes the [Stream] interface. */
961 class StreamView<T> extends Stream<T> { 1004 class StreamView<T> extends Stream<T> {
962 Stream<T> _stream; 1005 Stream<T> _stream;
963 1006
964 StreamView(this._stream); 1007 StreamView(this._stream);
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
1025 */ 1068 */
1026 Future get done; 1069 Future get done;
1027 } 1070 }
1028 1071
1029 1072
1030 /** 1073 /**
1031 * The target of a [Stream.transform] call. 1074 * The target of a [Stream.transform] call.
1032 * 1075 *
1033 * The [Stream.transform] call will pass itself to this object and then return 1076 * The [Stream.transform] call will pass itself to this object and then return
1034 * the resulting stream. 1077 * the resulting stream.
1078 *
1079 * It is good practice to create transformers that can be used multiple times.
1035 */ 1080 */
1036 abstract class StreamTransformer<S, T> { 1081 abstract class StreamTransformer<S, T> {
1082
1037 /** 1083 /**
1038 * Create a [StreamTransformer] that delegates events to the given functions. 1084 * Creates a [StreamTransformer].
1039 * 1085 *
1040 * This is actually a [StreamEventTransformer] where the event handling is 1086 * The returned instance takes responsibility of binding ([bind]) and only
1041 * performed by the function arguments. 1087 * invokes the given closure [transformer] when a user starts listening on
1042 * If an argument is omitted, it acts as the corresponding default method from 1088 * the bound stream. At that point the transformer should start listening
1043 * [StreamEventTransformer]. 1089 * to the bound stream (which is given as argument) and return a
1090 * [StreamSubscription]. The bound stream-transformer then sets the handlers
1091 * it received as part of the `listen` call.
1044 * 1092 *
1045 * Example use: 1093 * There are two common ways to create a StreamSubscription:
1094 *
1095 * 1. by creating a new class that implements [StreamSubscription].
1096 * Note that the subscription should run callbacks in the [Zone] the
1097 * stream was listened to.
1098 * 2. by allocating a [StreamController] and to return the result of
1099 * listening to its stream.
1100 *
1101 * Example use of a duplicating transformer:
1046 * 1102 *
1047 * stringStream.transform(new StreamTransformer<String, String>( 1103 * stringStream.transform(new StreamTransformer<String, String>(
1104 * (Stream<String> input, bool cancelOnError) {
1105 * StreamController<String> controller;
1106 * StreamSubscription<String> subscription;
1107 * controller = new StreamController<String>(
1108 * onListen: () {
1109 * subscription = input.listen((data) {
1110 * // Duplicate the data.
1111 * controller.add(data);
1112 * controller.add(data);
1113 * },
1114 * onError: controller.addError,
1115 * onDone: controller.close,
1116 * cancelOnError: cancelOnError);
1117 * },
1118 * onPause: subscription.pause,
1119 * onResume: subscription.resume,
1120 * onCancel: subscription.cancel,
1121 * sync: true); // One of the few places, where sync is correct.
1122 * return controller.stream.listen(null);
1123 * });
1124 */
1125 const factory StreamTransformer(
1126 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError))
1127 = _StreamSubscriptionTransformer;
1128
1129 /**
1130 * Creates a [StreamTransformer] that delegates events to the given functions.
1131 *
1132 * This constructor returns a transformer that can only be used once.
1133 *
1134 * Example use of a duplicating transformer:
1135 *
1136 * stringStream.transform(new StreamTransformer<String, String>.fromHandle rs(
1048 * handleData: (String value, EventSink<String> sink) { 1137 * handleData: (String value, EventSink<String> sink) {
1049 * sink.add(value); 1138 * sink.add(value);
1050 * sink.add(value); // Duplicate the incoming events. 1139 * sink.add(value); // Duplicate the incoming events.
1051 * })); 1140 * }));
1052 *
1053 */ 1141 */
1054 factory StreamTransformer({ 1142 factory StreamTransformer.fromHandlers({
1055 void handleData(S data, EventSink<T> sink), 1143 void handleData(S data, EventSink<T> sink),
1056 Function handleError, 1144 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
1057 void handleDone(EventSink<T> sink)}) { 1145 void handleDone(EventSink<T> sink)})
1058 return new _StreamTransformerImpl<S, T>(handleData, 1146 = _StreamHandlerTransformer;
1059 handleError,
1060 handleDone);
1061 }
1062 1147
1063 Stream<T> bind(Stream<S> stream); 1148 Stream<T> bind(Stream<S> stream);
1064 } 1149 }
1065 1150
1066
1067 /**
1068 * Base class for transformers that modifies stream events.
1069 *
1070 * A [StreamEventTransformer] transforms incoming Stream
1071 * events of one kind into outgoing events of (possibly) another kind.
1072 *
1073 * Subscribing on the stream returned by [bind] is the same as subscribing on
1074 * the source stream, except that events are passed through the [transformer]
1075 * before being emitted. The transformer may generate any number and
1076 * types of events for each incoming event. Pauses on the returned
1077 * subscription are forwarded to this stream.
1078 *
1079 * An example that duplicates all data events:
1080 *
1081 * class DoubleTransformer<T> extends StreamEventTransformer<T, T> {
1082 * void handleData(T data, EventSink<T> sink) {
1083 * sink.add(value);
1084 * sink.add(value);
1085 * }
1086 * }
1087 * someTypeStream.transform(new DoubleTransformer<Type>());
1088 *
1089 * The default implementations of the "handle" methods forward
1090 * the events unmodified. If using the default [handleData] the generic type [T]
1091 * needs to be assignable to [S].
1092 */
1093 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> {
1094 const StreamEventTransformer();
1095
1096 Stream<T> bind(Stream<S> source) {
1097 return new EventTransformStream<S, T>(source, this);
1098 }
1099
1100 /**
1101 * Act on incoming data event.
1102 *
1103 * The method may generate any number of events on the sink, but should
1104 * not throw.
1105 */
1106 void handleData(S event, EventSink<T> sink) {
1107 var data = event;
1108 sink.add(data);
1109 }
1110
1111 /**
1112 * Act on incoming error event.
1113 *
1114 * The method may generate any number of events on the sink, but should
1115 * not throw.
1116 */
1117 void handleError(error, EventSink<T> sink) {
1118 sink.addError(error);
1119 }
1120
1121 /**
1122 * Act on incoming done event.
1123 *
1124 * The method may generate any number of events on the sink, but should
1125 * not throw.
1126 */
1127 void handleDone(EventSink<T> sink){
1128 sink.close();
1129 }
1130 }
1131
1132
1133 /**
1134 * Stream that transforms another stream by intercepting and replacing events.
1135 *
1136 * This [Stream] is a transformation of a source stream. Listening on this
1137 * stream is the same as listening on the source stream, except that events
1138 * are intercepted and modified by a [StreamEventTransformer] before becoming
1139 * events on this stream.
1140 */
1141 class EventTransformStream<S, T> extends Stream<T> {
1142 final Stream<S> _source;
1143 final StreamEventTransformer _transformer;
1144 EventTransformStream(Stream<S> source,
1145 StreamEventTransformer<S, T> transformer)
1146 : _source = source, _transformer = transformer;
1147
1148 StreamSubscription<T> listen(void onData(T data),
1149 { Function onError,
1150 void onDone(),
1151 bool cancelOnError }) {
1152 if (onData == null) onData = _nullDataHandler;
1153 if (onError == null) onError = _nullErrorHandler;
1154 if (onDone == null) onDone = _nullDoneHandler;
1155 cancelOnError = identical(true, cancelOnError);
1156 return new _EventTransformStreamSubscription(_source, _transformer,
1157 onData, onError, onDone,
1158 cancelOnError);
1159 }
1160 }
1161
1162 class _EventTransformStreamSubscription<S, T>
1163 extends _BufferingStreamSubscription<T> {
1164 /** The transformer used to transform events. */
1165 final StreamEventTransformer<S, T> _transformer;
1166
1167 /** Whether this stream has sent a done event. */
1168 bool _isClosed = false;
1169
1170 /** Source of incoming events. */
1171 StreamSubscription<S> _subscription;
1172
1173 /** Cached EventSink wrapper for this class. */
1174 EventSink<T> _sink;
1175
1176 _EventTransformStreamSubscription(Stream<S> source,
1177 this._transformer,
1178 void onData(T data),
1179 Function onError,
1180 void onDone(),
1181 bool cancelOnError)
1182 : super(onData, onError, onDone, cancelOnError) {
1183 _sink = new _EventSinkAdapter<T>(this);
1184 _subscription = source.listen(_handleData,
1185 onError: _handleError,
1186 onDone: _handleDone);
1187 }
1188
1189 /** Whether this subscription is still subscribed to its source. */
1190 bool get _isSubscribed => _subscription != null;
1191
1192 void _onPause() {
1193 if (_isSubscribed) _subscription.pause();
1194 }
1195
1196 void _onResume() {
1197 if (_isSubscribed) _subscription.resume();
1198 }
1199
1200 void _onCancel() {
1201 if (_isSubscribed) {
1202 StreamSubscription subscription = _subscription;
1203 _subscription = null;
1204 subscription.cancel();
1205 }
1206 _isClosed = true;
1207 }
1208
1209 void _handleData(S data) {
1210 try {
1211 _transformer.handleData(data, _sink);
1212 } catch (e, s) {
1213 _addError(_asyncError(e, s), s);
1214 }
1215 }
1216
1217 void _handleError(error, [stackTrace]) {
1218 try {
1219 _transformer.handleError(error, _sink);
1220 } catch (e, s) {
1221 if (identical(e, error)) {
1222 _addError(error, stackTrace);
1223 } else {
1224 _addError(_asyncError(e, s), s);
1225 }
1226 }
1227 }
1228
1229 void _handleDone() {
1230 try {
1231 _subscription = null;
1232 _transformer.handleDone(_sink);
1233 } catch (e, s) {
1234 _addError(_asyncError(e, s), s);
1235 }
1236 }
1237 }
1238
1239 class _EventSinkAdapter<T> implements EventSink<T> {
1240 _EventSink _sink;
1241 _EventSinkAdapter(this._sink);
1242
1243 void add(T data) { _sink._add(data); }
1244 void addError(error, [StackTrace stackTrace]) {
1245 _sink._addError(error, stackTrace);
1246 }
1247 void close() { _sink._close(); }
1248 }
1249
1250
1251 /** 1151 /**
1252 * An [Iterable] like interface for the values of a [Stream]. 1152 * An [Iterable] like interface for the values of a [Stream].
1253 * 1153 *
1254 * This wraps a [Stream] and a subscription on the stream. It listens 1154 * This wraps a [Stream] and a subscription on the stream. It listens
1255 * on the stream, and completes the future returned by [moveNext] when the 1155 * on the stream, and completes the future returned by [moveNext] when the
1256 * next value becomes available. 1156 * next value becomes available.
1257 * 1157 *
1258 * NOTICE: This is a tentative design. This class may change. 1158 * NOTICE: This is a tentative design. This class may change.
1259 */ 1159 */
1260 abstract class StreamIterator<T> { 1160 abstract class StreamIterator<T> {
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
1292 * 1192 *
1293 * If a [moveNext] call has been made, it will complete with `false` as value, 1193 * If a [moveNext] call has been made, it will complete with `false` as value,
1294 * as will all further calls to [moveNext]. 1194 * as will all further calls to [moveNext].
1295 * 1195 *
1296 * If you need to stop listening for values before the stream iterator is 1196 * If you need to stop listening for values before the stream iterator is
1297 * automatically closed, you must call [cancel] to ensure that the stream 1197 * automatically closed, you must call [cancel] to ensure that the stream
1298 * is properly closed. 1198 * is properly closed.
1299 */ 1199 */
1300 void cancel(); 1200 void cancel();
1301 } 1201 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698