Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(532)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 25094002: Adapt streams for additional stackTrace argument. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Upload Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 173 matching lines...) Expand 10 before | Expand all | Expand 10 after
184 184
185 /** 185 /**
186 * Adds a subscription to this stream. 186 * Adds a subscription to this stream.
187 * 187 *
188 * On each data event from this stream, the subscriber's [onData] handler 188 * On each data event from this stream, the subscriber's [onData] handler
189 * is called. If [onData] is null, nothing happens. 189 * is called. If [onData] is null, nothing happens.
190 * 190 *
191 * On errors from this stream, the [onError] handler is given a 191 * On errors from this stream, the [onError] handler is given a
192 * object describing the error. 192 * object describing the error.
193 * 193 *
194 * The [onError] callback must be of type `void onError(error)` or
195 * `void onError(error, StackTrace stackTrace)`. Depending on the function
196 * type the the stream either invokes [onError] with or without a stack
197 * trace. The stack trace argument might be `null` if the stream itself
198 * received an error without stack trace.
199 *
194 * If this stream closes, the [onDone] handler is called. 200 * If this stream closes, the [onDone] handler is called.
195 * 201 *
196 * If [cancelOnError] is true, the subscription is ended when 202 * If [cancelOnError] is true, the subscription is ended when
197 * the first error is reported. The default is false. 203 * the first error is reported. The default is false.
198 */ 204 */
199 StreamSubscription<T> listen(void onData(T event), 205 StreamSubscription<T> listen(void onData(T event),
200 { void onError(error), 206 { Function onError,
201 void onDone(), 207 void onDone(),
202 bool cancelOnError}); 208 bool cancelOnError});
203 209
204 /** 210 /**
205 * Creates a new stream from this stream that discards some data events. 211 * Creates a new stream from this stream that discards some data events.
206 * 212 *
207 * The new stream sends the same error and done events as this stream, 213 * The new stream sends the same error and done events as this stream,
208 * but it only sends the data events that satisfy the [test]. 214 * but it only sends the data events that satisfy the [test].
209 */ 215 */
210 Stream<T> where(bool test(T event)) { 216 Stream<T> where(bool test(T event)) {
211 return new _WhereStream<T>(this, test); 217 return new _WhereStream<T>(this, test);
212 } 218 }
213 219
214 /** 220 /**
215 * Creates a new stream that converts each element of this stream 221 * Creates a new stream that converts each element of this stream
216 * to a new value using the [convert] function. 222 * to a new value using the [convert] function.
217 */ 223 */
218 Stream map(convert(T event)) { 224 Stream map(convert(T event)) {
219 return new _MapStream<T, dynamic>(this, convert); 225 return new _MapStream<T, dynamic>(this, convert);
220 } 226 }
221 227
222 /** 228 /**
223 * Creates a wrapper Stream that intercepts some errors from this stream. 229 * Creates a wrapper Stream that intercepts some errors from this stream.
224 * 230 *
225 * If this stream sends an error that matches [test], then it is intercepted 231 * If this stream sends an error that matches [test], then it is intercepted
226 * by the [handle] function. 232 * by the [handle] function.
227 * 233 *
234 * The [onError] callback must be of type `void onError(error)` or
235 * `void onError(error, StackTrace stackTrace)`. Depending on the function
236 * type the the stream either invokes [onError] with or without a stack
237 * trace. The stack trace argument might be `null` if the stream itself
238 * received an error without stack trace.
Lasse Reichstein Nielsen 2013/10/04 08:45:17 "without a stack trace". What happens if onError
floitsch 2013/10/05 18:11:48 That's an error ;)
Lasse Reichstein Nielsen 2013/10/07 11:55:48 Ah, it's an error, but we don't say which error it
239 *
228 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns 240 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns
229 * true. If [test] is omitted, every error is considered matching. 241 * true. If [test] is omitted, every error is considered matching.
230 * 242 *
231 * If the error is intercepted, the [handle] function can decide what to do 243 * If the error is intercepted, the [handle] function can decide what to do
232 * with it. It can throw if it wants to raise a new (or the same) error, 244 * with it. It can throw if it wants to raise a new (or the same) error,
233 * or simply return to make the stream forget the error. 245 * or simply return to make the stream forget the error.
234 * 246 *
235 * If you need to transform an error into a data event, use the more generic 247 * If you need to transform an error into a data event, use the more generic
236 * [Stream.transform] to handle the event by writing a data event to 248 * [Stream.transform] to handle the event by writing a data event to
237 * the output sink 249 * the output sink
238 */ 250 */
239 Stream<T> handleError(void handle( error), { bool test(error) }) { 251 Stream<T> handleError(Function onError, { bool test(error) }) {
240 return new _HandleErrorStream<T>(this, handle, test); 252 return new _HandleErrorStream<T>(this, onError, test);
241 } 253 }
242 254
243 /** 255 /**
244 * Creates a new stream from this stream that converts each element 256 * Creates a new stream from this stream that converts each element
245 * into zero or more events. 257 * into zero or more events.
246 * 258 *
247 * Each incoming event is converted to an [Iterable] of new events, 259 * Each incoming event is converted to an [Iterable] of new events,
248 * and each of these new events are then sent by the returned stream 260 * and each of these new events are then sent by the returned stream
249 * in order. 261 * in order.
250 */ 262 */
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
306 var value = initialValue; 318 var value = initialValue;
307 StreamSubscription subscription; 319 StreamSubscription subscription;
308 subscription = this.listen( 320 subscription = this.listen(
309 (T element) { 321 (T element) {
310 _runUserCode( 322 _runUserCode(
311 () => combine(value, element), 323 () => combine(value, element),
312 (newValue) { value = newValue; }, 324 (newValue) { value = newValue; },
313 _cancelAndError(subscription, result) 325 _cancelAndError(subscription, result)
314 ); 326 );
315 }, 327 },
316 onError: (e) { 328 onError: (e, st) {
317 result._completeError(e); 329 result._completeError(e, st);
318 }, 330 },
319 onDone: () { 331 onDone: () {
320 result._complete(value); 332 result._complete(value);
321 }, 333 },
322 cancelOnError: true); 334 cancelOnError: true);
323 return result; 335 return result;
324 } 336 }
325 337
326 /** 338 /**
327 * Collects string of data events' string representations. 339 * Collects string of data events' string representations.
(...skipping 552 matching lines...) Expand 10 before | Expand all | Expand 10 after
880 * 892 *
881 * If an event is currently firing, this unsubscription will only 893 * If an event is currently firing, this unsubscription will only
882 * take effect after all subscribers have received the current event. 894 * take effect after all subscribers have received the current event.
883 */ 895 */
884 void cancel(); 896 void cancel();
885 897
886 /** Set or override the data event handler of this subscription. */ 898 /** Set or override the data event handler of this subscription. */
887 void onData(void handleData(T data)); 899 void onData(void handleData(T data));
888 900
889 /** Set or override the error event handler of this subscription. */ 901 /** Set or override the error event handler of this subscription. */
890 void onError(void handleError(error)); 902 void onError(Function handleError);
891 903
892 /** Set or override the done event handler of this subscription. */ 904 /** Set or override the done event handler of this subscription. */
893 void onDone(void handleDone()); 905 void onDone(void handleDone());
894 906
895 /** 907 /**
896 * Request that the stream pauses events until further notice. 908 * Request that the stream pauses events until further notice.
897 * 909 *
898 * If [resumeSignal] is provided, the stream will undo the pause 910 * If [resumeSignal] is provided, the stream will undo the pause
899 * when the future completes. If the future completes with an error, 911 * when the future completes. If the future completes with an error,
900 * it will not be handled! 912 * it will not be handled!
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
951 963
952 StreamView(this._stream); 964 StreamView(this._stream);
953 965
954 bool get isBroadcast => _stream.isBroadcast; 966 bool get isBroadcast => _stream.isBroadcast;
955 967
956 Stream<T> asBroadcastStream({void onListen(StreamSubscription subscription), 968 Stream<T> asBroadcastStream({void onListen(StreamSubscription subscription),
957 void onCancel(StreamSubscription subscription)}) 969 void onCancel(StreamSubscription subscription)})
958 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); 970 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel);
959 971
960 StreamSubscription<T> listen(void onData(T value), 972 StreamSubscription<T> listen(void onData(T value),
961 { void onError(error), 973 { Function onError,
962 void onDone(), 974 void onDone(),
963 bool cancelOnError }) { 975 bool cancelOnError }) {
964 return _stream.listen(onData, onError: onError, onDone: onDone, 976 return _stream.listen(onData, onError: onError, onDone: onDone,
965 cancelOnError: cancelOnError); 977 cancelOnError: cancelOnError);
966 } 978 }
967 } 979 }
968 980
969 981
970 /** 982 /**
971 * The target of a [Stream.pipe] call. 983 * The target of a [Stream.pipe] call.
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
1034 * 1046 *
1035 * stringStream.transform(new StreamTransformer<String, String>( 1047 * stringStream.transform(new StreamTransformer<String, String>(
1036 * handleData: (String value, EventSink<String> sink) { 1048 * handleData: (String value, EventSink<String> sink) {
1037 * sink.add(value); 1049 * sink.add(value);
1038 * sink.add(value); // Duplicate the incoming events. 1050 * sink.add(value); // Duplicate the incoming events.
1039 * })); 1051 * }));
1040 * 1052 *
1041 */ 1053 */
1042 factory StreamTransformer({ 1054 factory StreamTransformer({
1043 void handleData(S data, EventSink<T> sink), 1055 void handleData(S data, EventSink<T> sink),
1044 void handleError(error, EventSink<T> sink), 1056 Function handleError,
1045 void handleDone(EventSink<T> sink)}) { 1057 void handleDone(EventSink<T> sink)}) {
1046 return new _StreamTransformerImpl<S, T>(handleData, 1058 return new _StreamTransformerImpl<S, T>(handleData,
1047 handleError, 1059 handleError,
1048 handleDone); 1060 handleDone);
1049 } 1061 }
1050 1062
1051 Stream<T> bind(Stream<S> stream); 1063 Stream<T> bind(Stream<S> stream);
1052 } 1064 }
1053 1065
1054 1066
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
1127 * events on this stream. 1139 * events on this stream.
1128 */ 1140 */
1129 class EventTransformStream<S, T> extends Stream<T> { 1141 class EventTransformStream<S, T> extends Stream<T> {
1130 final Stream<S> _source; 1142 final Stream<S> _source;
1131 final StreamEventTransformer _transformer; 1143 final StreamEventTransformer _transformer;
1132 EventTransformStream(Stream<S> source, 1144 EventTransformStream(Stream<S> source,
1133 StreamEventTransformer<S, T> transformer) 1145 StreamEventTransformer<S, T> transformer)
1134 : _source = source, _transformer = transformer; 1146 : _source = source, _transformer = transformer;
1135 1147
1136 StreamSubscription<T> listen(void onData(T data), 1148 StreamSubscription<T> listen(void onData(T data),
1137 { void onError(error), 1149 { Function onError,
1138 void onDone(), 1150 void onDone(),
1139 bool cancelOnError }) { 1151 bool cancelOnError }) {
1140 if (onData == null) onData = _nullDataHandler; 1152 if (onData == null) onData = _nullDataHandler;
1141 if (onError == null) onError = _nullErrorHandler; 1153 if (onError == null) onError = _nullErrorHandler;
1142 if (onDone == null) onDone = _nullDoneHandler; 1154 if (onDone == null) onDone = _nullDoneHandler;
1143 cancelOnError = identical(true, cancelOnError); 1155 cancelOnError = identical(true, cancelOnError);
1144 return new _EventTransformStreamSubscription(_source, _transformer, 1156 return new _EventTransformStreamSubscription(_source, _transformer,
1145 onData, onError, onDone, 1157 onData, onError, onDone,
1146 cancelOnError); 1158 cancelOnError);
1147 } 1159 }
1148 } 1160 }
1149 1161
1150 class _EventTransformStreamSubscription<S, T> 1162 class _EventTransformStreamSubscription<S, T>
1151 extends _BufferingStreamSubscription<T> { 1163 extends _BufferingStreamSubscription<T> {
1152 /** The transformer used to transform events. */ 1164 /** The transformer used to transform events. */
1153 final StreamEventTransformer<S, T> _transformer; 1165 final StreamEventTransformer<S, T> _transformer;
1154 1166
1155 /** Whether this stream has sent a done event. */ 1167 /** Whether this stream has sent a done event. */
1156 bool _isClosed = false; 1168 bool _isClosed = false;
1157 1169
1158 /** Source of incoming events. */ 1170 /** Source of incoming events. */
1159 StreamSubscription<S> _subscription; 1171 StreamSubscription<S> _subscription;
1160 1172
1161 /** Cached EventSink wrapper for this class. */ 1173 /** Cached EventSink wrapper for this class. */
1162 EventSink<T> _sink; 1174 EventSink<T> _sink;
1163 1175
1164 _EventTransformStreamSubscription(Stream<S> source, 1176 _EventTransformStreamSubscription(Stream<S> source,
1165 this._transformer, 1177 this._transformer,
1166 void onData(T data), 1178 void onData(T data),
1167 void onError(error), 1179 Function onError,
1168 void onDone(), 1180 void onDone(),
1169 bool cancelOnError) 1181 bool cancelOnError)
1170 : super(onData, onError, onDone, cancelOnError) { 1182 : super(onData, onError, onDone, cancelOnError) {
1171 _sink = new _EventSinkAdapter<T>(this); 1183 _sink = new _EventSinkAdapter<T>(this);
1172 _subscription = source.listen(_handleData, 1184 _subscription = source.listen(_handleData,
1173 onError: _handleError, 1185 onError: _handleError,
1174 onDone: _handleDone); 1186 onDone: _handleDone);
1175 } 1187 }
1176 1188
1177 /** Whether this subscription is still subscribed to its source. */ 1189 /** Whether this subscription is still subscribed to its source. */
(...skipping 13 matching lines...) Expand all
1191 _subscription = null; 1203 _subscription = null;
1192 subscription.cancel(); 1204 subscription.cancel();
1193 } 1205 }
1194 _isClosed = true; 1206 _isClosed = true;
1195 } 1207 }
1196 1208
1197 void _handleData(S data) { 1209 void _handleData(S data) {
1198 try { 1210 try {
1199 _transformer.handleData(data, _sink); 1211 _transformer.handleData(data, _sink);
1200 } catch (e, s) { 1212 } catch (e, s) {
1201 _addError(_asyncError(e, s)); 1213 _addError(_asyncError(e, s), s);
1202 } 1214 }
1203 } 1215 }
1204 1216
1205 void _handleError(error) { 1217 void _handleError(error, [stackTrace]) {
1206 try { 1218 try {
1207 _transformer.handleError(error, _sink); 1219 _transformer.handleError(error, _sink);
1208 } catch (e, s) { 1220 } catch (e, s) {
1209 _addError(_asyncError(e, s)); 1221 if (identical(e, error)) {
1222 _addError(error, stackTrace);
1223 } else {
1224 _addError(_asyncError(e, s), s);
1225 }
1210 } 1226 }
1211 } 1227 }
1212 1228
1213 void _handleDone() { 1229 void _handleDone() {
1214 try { 1230 try {
1215 _subscription = null; 1231 _subscription = null;
1216 _transformer.handleDone(_sink); 1232 _transformer.handleDone(_sink);
1217 } catch (e, s) { 1233 } catch (e, s) {
1218 _addError(_asyncError(e, s)); 1234 _addError(_asyncError(e, s), s);
1219 } 1235 }
1220 } 1236 }
1221 } 1237 }
1222 1238
1223 class _EventSinkAdapter<T> implements EventSink<T> { 1239 class _EventSinkAdapter<T> implements EventSink<T> {
1224 _EventSink _sink; 1240 _EventSink _sink;
1225 _EventSinkAdapter(this._sink); 1241 _EventSinkAdapter(this._sink);
1226 1242
1227 void add(T data) { _sink._add(data); } 1243 void add(T data) { _sink._add(data); }
1228 void addError(error) { _sink._addError(error); } 1244 void addError(error, [StackTrace stackTrace]) {
1245 _sink._addError(error, stackTrace);
1246 }
1229 void close() { _sink._close(); } 1247 void close() { _sink._close(); }
1230 } 1248 }
1231 1249
1232 1250
1233 /** 1251 /**
1234 * An [Iterable] like interface for the values of a [Stream]. 1252 * An [Iterable] like interface for the values of a [Stream].
1235 * 1253 *
1236 * This wraps a [Stream] and a subscription on the stream. It listens 1254 * This wraps a [Stream] and a subscription on the stream. It listens
1237 * on the stream, and completes the future returned by [moveNext] when the 1255 * on the stream, and completes the future returned by [moveNext] when the
1238 * next value becomes available. 1256 * next value becomes available.
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
1274 * 1292 *
1275 * If a [moveNext] call has been made, it will complete with `false` as value, 1293 * If a [moveNext] call has been made, it will complete with `false` as value,
1276 * as will all further calls to [moveNext]. 1294 * as will all further calls to [moveNext].
1277 * 1295 *
1278 * If you need to stop listening for values before the stream iterator is 1296 * If you need to stop listening for values before the stream iterator is
1279 * automatically closed, you must call [cancel] to ensure that the stream 1297 * automatically closed, you must call [cancel] to ensure that the stream
1280 * is properly closed. 1298 * is properly closed.
1281 */ 1299 */
1282 void cancel(); 1300 void cancel();
1283 } 1301 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698