Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1008)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 25094002: Adapt streams for additional stackTrace argument. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Remove types in closures. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/broadcast_stream_controller.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 173 matching lines...) Expand 10 before | Expand all | Expand 10 after
184 184
185 /** 185 /**
186 * Adds a subscription to this stream. 186 * Adds a subscription to this stream.
187 * 187 *
188 * On each data event from this stream, the subscriber's [onData] handler 188 * On each data event from this stream, the subscriber's [onData] handler
189 * is called. If [onData] is null, nothing happens. 189 * is called. If [onData] is null, nothing happens.
190 * 190 *
191 * On errors from this stream, the [onError] handler is given a 191 * On errors from this stream, the [onError] handler is given a
192 * object describing the error. 192 * object describing the error.
193 * 193 *
194 * The [onError] callback must be of type `void onError(error)` or
195 * `void onError(error, StackTrace stackTrace)`. If [onError] accepts
196 * two arguments it is called with the stack trace (which could be `null` if
197 * the stream itself received an error without stack trace).
198 * Otherwise it is called with just the error object.
199 *
194 * If this stream closes, the [onDone] handler is called. 200 * If this stream closes, the [onDone] handler is called.
195 * 201 *
196 * If [cancelOnError] is true, the subscription is ended when 202 * If [cancelOnError] is true, the subscription is ended when
197 * the first error is reported. The default is false. 203 * the first error is reported. The default is false.
198 */ 204 */
199 StreamSubscription<T> listen(void onData(T event), 205 StreamSubscription<T> listen(void onData(T event),
200 { void onError(error), 206 { Function onError,
201 void onDone(), 207 void onDone(),
202 bool cancelOnError}); 208 bool cancelOnError});
203 209
204 /** 210 /**
205 * Creates a new stream from this stream that discards some data events. 211 * Creates a new stream from this stream that discards some data events.
206 * 212 *
207 * The new stream sends the same error and done events as this stream, 213 * The new stream sends the same error and done events as this stream,
208 * but it only sends the data events that satisfy the [test]. 214 * but it only sends the data events that satisfy the [test].
209 */ 215 */
210 Stream<T> where(bool test(T event)) { 216 Stream<T> where(bool test(T event)) {
211 return new _WhereStream<T>(this, test); 217 return new _WhereStream<T>(this, test);
212 } 218 }
213 219
214 /** 220 /**
215 * Creates a new stream that converts each element of this stream 221 * Creates a new stream that converts each element of this stream
216 * to a new value using the [convert] function. 222 * to a new value using the [convert] function.
217 */ 223 */
218 Stream map(convert(T event)) { 224 Stream map(convert(T event)) {
219 return new _MapStream<T, dynamic>(this, convert); 225 return new _MapStream<T, dynamic>(this, convert);
220 } 226 }
221 227
222 /** 228 /**
223 * Creates a wrapper Stream that intercepts some errors from this stream. 229 * Creates a wrapper Stream that intercepts some errors from this stream.
224 * 230 *
225 * If this stream sends an error that matches [test], then it is intercepted 231 * If this stream sends an error that matches [test], then it is intercepted
226 * by the [handle] function. 232 * by the [handle] function.
227 * 233 *
234 * The [onError] callback must be of type `void onError(error)` or
235 * `void onError(error, StackTrace stackTrace)`. Depending on the function
236 * type the the stream either invokes [onError] with or without a stack
237 * trace. The stack trace argument might be `null` if the stream itself
238 * received an error without stack trace.
239 *
228 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns 240 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns
229 * true. If [test] is omitted, every error is considered matching. 241 * true. If [test] is omitted, every error is considered matching.
230 * 242 *
231 * If the error is intercepted, the [handle] function can decide what to do 243 * If the error is intercepted, the [handle] function can decide what to do
232 * with it. It can throw if it wants to raise a new (or the same) error, 244 * with it. It can throw if it wants to raise a new (or the same) error,
233 * or simply return to make the stream forget the error. 245 * or simply return to make the stream forget the error.
234 * 246 *
235 * If you need to transform an error into a data event, use the more generic 247 * If you need to transform an error into a data event, use the more generic
236 * [Stream.transform] to handle the event by writing a data event to 248 * [Stream.transform] to handle the event by writing a data event to
237 * the output sink 249 * the output sink
238 */ 250 */
239 Stream<T> handleError(void handle( error), { bool test(error) }) { 251 Stream<T> handleError(Function onError, { bool test(error) }) {
240 return new _HandleErrorStream<T>(this, handle, test); 252 return new _HandleErrorStream<T>(this, onError, test);
241 } 253 }
242 254
243 /** 255 /**
244 * Creates a new stream from this stream that converts each element 256 * Creates a new stream from this stream that converts each element
245 * into zero or more events. 257 * into zero or more events.
246 * 258 *
247 * Each incoming event is converted to an [Iterable] of new events, 259 * Each incoming event is converted to an [Iterable] of new events,
248 * and each of these new events are then sent by the returned stream 260 * and each of these new events are then sent by the returned stream
249 * in order. 261 * in order.
250 */ 262 */
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
306 var value = initialValue; 318 var value = initialValue;
307 StreamSubscription subscription; 319 StreamSubscription subscription;
308 subscription = this.listen( 320 subscription = this.listen(
309 (T element) { 321 (T element) {
310 _runUserCode( 322 _runUserCode(
311 () => combine(value, element), 323 () => combine(value, element),
312 (newValue) { value = newValue; }, 324 (newValue) { value = newValue; },
313 _cancelAndError(subscription, result) 325 _cancelAndError(subscription, result)
314 ); 326 );
315 }, 327 },
316 onError: (e) { 328 onError: (e, st) {
317 result._completeError(e); 329 result._completeError(e, st);
318 }, 330 },
319 onDone: () { 331 onDone: () {
320 result._complete(value); 332 result._complete(value);
321 }, 333 },
322 cancelOnError: true); 334 cancelOnError: true);
323 return result; 335 return result;
324 } 336 }
325 337
326 /** 338 /**
327 * Collects string of data events' string representations. 339 * Collects string of data events' string representations.
(...skipping 551 matching lines...) Expand 10 before | Expand all | Expand 10 after
879 * Cancels this subscription. It will no longer receive events. 891 * Cancels this subscription. It will no longer receive events.
880 * 892 *
881 * If an event is currently firing, this unsubscription will only 893 * If an event is currently firing, this unsubscription will only
882 * take effect after all subscribers have received the current event. 894 * take effect after all subscribers have received the current event.
883 */ 895 */
884 void cancel(); 896 void cancel();
885 897
886 /** Set or override the data event handler of this subscription. */ 898 /** Set or override the data event handler of this subscription. */
887 void onData(void handleData(T data)); 899 void onData(void handleData(T data));
888 900
889 /** Set or override the error event handler of this subscription. */ 901 /**
890 void onError(void handleError(error)); 902 * Set or override the error event handler of this subscription.
903 *
904 * This method overrides the handler that has been set at the invocation of
905 * [Stream.listen].
906 */
907 void onError(Function handleError);
891 908
892 /** Set or override the done event handler of this subscription. */ 909 /** Set or override the done event handler of this subscription. */
893 void onDone(void handleDone()); 910 void onDone(void handleDone());
894 911
895 /** 912 /**
896 * Request that the stream pauses events until further notice. 913 * Request that the stream pauses events until further notice.
897 * 914 *
898 * If [resumeSignal] is provided, the stream will undo the pause 915 * If [resumeSignal] is provided, the stream will undo the pause
899 * when the future completes. If the future completes with an error, 916 * when the future completes. If the future completes with an error,
900 * it will not be handled! 917 * it will not be handled!
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
951 968
952 StreamView(this._stream); 969 StreamView(this._stream);
953 970
954 bool get isBroadcast => _stream.isBroadcast; 971 bool get isBroadcast => _stream.isBroadcast;
955 972
956 Stream<T> asBroadcastStream({void onListen(StreamSubscription subscription), 973 Stream<T> asBroadcastStream({void onListen(StreamSubscription subscription),
957 void onCancel(StreamSubscription subscription)}) 974 void onCancel(StreamSubscription subscription)})
958 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); 975 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel);
959 976
960 StreamSubscription<T> listen(void onData(T value), 977 StreamSubscription<T> listen(void onData(T value),
961 { void onError(error), 978 { Function onError,
962 void onDone(), 979 void onDone(),
963 bool cancelOnError }) { 980 bool cancelOnError }) {
964 return _stream.listen(onData, onError: onError, onDone: onDone, 981 return _stream.listen(onData, onError: onError, onDone: onDone,
965 cancelOnError: cancelOnError); 982 cancelOnError: cancelOnError);
966 } 983 }
967 } 984 }
968 985
969 986
970 /** 987 /**
971 * The target of a [Stream.pipe] call. 988 * The target of a [Stream.pipe] call.
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
1034 * 1051 *
1035 * stringStream.transform(new StreamTransformer<String, String>( 1052 * stringStream.transform(new StreamTransformer<String, String>(
1036 * handleData: (String value, EventSink<String> sink) { 1053 * handleData: (String value, EventSink<String> sink) {
1037 * sink.add(value); 1054 * sink.add(value);
1038 * sink.add(value); // Duplicate the incoming events. 1055 * sink.add(value); // Duplicate the incoming events.
1039 * })); 1056 * }));
1040 * 1057 *
1041 */ 1058 */
1042 factory StreamTransformer({ 1059 factory StreamTransformer({
1043 void handleData(S data, EventSink<T> sink), 1060 void handleData(S data, EventSink<T> sink),
1044 void handleError(error, EventSink<T> sink), 1061 Function handleError,
1045 void handleDone(EventSink<T> sink)}) { 1062 void handleDone(EventSink<T> sink)}) {
1046 return new _StreamTransformerImpl<S, T>(handleData, 1063 return new _StreamTransformerImpl<S, T>(handleData,
1047 handleError, 1064 handleError,
1048 handleDone); 1065 handleDone);
1049 } 1066 }
1050 1067
1051 Stream<T> bind(Stream<S> stream); 1068 Stream<T> bind(Stream<S> stream);
1052 } 1069 }
1053 1070
1054 1071
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
1127 * events on this stream. 1144 * events on this stream.
1128 */ 1145 */
1129 class EventTransformStream<S, T> extends Stream<T> { 1146 class EventTransformStream<S, T> extends Stream<T> {
1130 final Stream<S> _source; 1147 final Stream<S> _source;
1131 final StreamEventTransformer _transformer; 1148 final StreamEventTransformer _transformer;
1132 EventTransformStream(Stream<S> source, 1149 EventTransformStream(Stream<S> source,
1133 StreamEventTransformer<S, T> transformer) 1150 StreamEventTransformer<S, T> transformer)
1134 : _source = source, _transformer = transformer; 1151 : _source = source, _transformer = transformer;
1135 1152
1136 StreamSubscription<T> listen(void onData(T data), 1153 StreamSubscription<T> listen(void onData(T data),
1137 { void onError(error), 1154 { Function onError,
1138 void onDone(), 1155 void onDone(),
1139 bool cancelOnError }) { 1156 bool cancelOnError }) {
1140 if (onData == null) onData = _nullDataHandler; 1157 if (onData == null) onData = _nullDataHandler;
1141 if (onError == null) onError = _nullErrorHandler; 1158 if (onError == null) onError = _nullErrorHandler;
1142 if (onDone == null) onDone = _nullDoneHandler; 1159 if (onDone == null) onDone = _nullDoneHandler;
1143 cancelOnError = identical(true, cancelOnError); 1160 cancelOnError = identical(true, cancelOnError);
1144 return new _EventTransformStreamSubscription(_source, _transformer, 1161 return new _EventTransformStreamSubscription(_source, _transformer,
1145 onData, onError, onDone, 1162 onData, onError, onDone,
1146 cancelOnError); 1163 cancelOnError);
1147 } 1164 }
1148 } 1165 }
1149 1166
1150 class _EventTransformStreamSubscription<S, T> 1167 class _EventTransformStreamSubscription<S, T>
1151 extends _BufferingStreamSubscription<T> { 1168 extends _BufferingStreamSubscription<T> {
1152 /** The transformer used to transform events. */ 1169 /** The transformer used to transform events. */
1153 final StreamEventTransformer<S, T> _transformer; 1170 final StreamEventTransformer<S, T> _transformer;
1154 1171
1155 /** Whether this stream has sent a done event. */ 1172 /** Whether this stream has sent a done event. */
1156 bool _isClosed = false; 1173 bool _isClosed = false;
1157 1174
1158 /** Source of incoming events. */ 1175 /** Source of incoming events. */
1159 StreamSubscription<S> _subscription; 1176 StreamSubscription<S> _subscription;
1160 1177
1161 /** Cached EventSink wrapper for this class. */ 1178 /** Cached EventSink wrapper for this class. */
1162 EventSink<T> _sink; 1179 EventSink<T> _sink;
1163 1180
1164 _EventTransformStreamSubscription(Stream<S> source, 1181 _EventTransformStreamSubscription(Stream<S> source,
1165 this._transformer, 1182 this._transformer,
1166 void onData(T data), 1183 void onData(T data),
1167 void onError(error), 1184 Function onError,
1168 void onDone(), 1185 void onDone(),
1169 bool cancelOnError) 1186 bool cancelOnError)
1170 : super(onData, onError, onDone, cancelOnError) { 1187 : super(onData, onError, onDone, cancelOnError) {
1171 _sink = new _EventSinkAdapter<T>(this); 1188 _sink = new _EventSinkAdapter<T>(this);
1172 _subscription = source.listen(_handleData, 1189 _subscription = source.listen(_handleData,
1173 onError: _handleError, 1190 onError: _handleError,
1174 onDone: _handleDone); 1191 onDone: _handleDone);
1175 } 1192 }
1176 1193
1177 /** Whether this subscription is still subscribed to its source. */ 1194 /** Whether this subscription is still subscribed to its source. */
(...skipping 13 matching lines...) Expand all
1191 _subscription = null; 1208 _subscription = null;
1192 subscription.cancel(); 1209 subscription.cancel();
1193 } 1210 }
1194 _isClosed = true; 1211 _isClosed = true;
1195 } 1212 }
1196 1213
1197 void _handleData(S data) { 1214 void _handleData(S data) {
1198 try { 1215 try {
1199 _transformer.handleData(data, _sink); 1216 _transformer.handleData(data, _sink);
1200 } catch (e, s) { 1217 } catch (e, s) {
1201 _addError(_asyncError(e, s)); 1218 _addError(_asyncError(e, s), s);
1202 } 1219 }
1203 } 1220 }
1204 1221
1205 void _handleError(error) { 1222 void _handleError(error, [stackTrace]) {
1206 try { 1223 try {
1207 _transformer.handleError(error, _sink); 1224 _transformer.handleError(error, _sink);
1208 } catch (e, s) { 1225 } catch (e, s) {
1209 _addError(_asyncError(e, s)); 1226 if (identical(e, error)) {
1227 _addError(error, stackTrace);
1228 } else {
1229 _addError(_asyncError(e, s), s);
1230 }
1210 } 1231 }
1211 } 1232 }
1212 1233
1213 void _handleDone() { 1234 void _handleDone() {
1214 try { 1235 try {
1215 _subscription = null; 1236 _subscription = null;
1216 _transformer.handleDone(_sink); 1237 _transformer.handleDone(_sink);
1217 } catch (e, s) { 1238 } catch (e, s) {
1218 _addError(_asyncError(e, s)); 1239 _addError(_asyncError(e, s), s);
1219 } 1240 }
1220 } 1241 }
1221 } 1242 }
1222 1243
1223 class _EventSinkAdapter<T> implements EventSink<T> { 1244 class _EventSinkAdapter<T> implements EventSink<T> {
1224 _EventSink _sink; 1245 _EventSink _sink;
1225 _EventSinkAdapter(this._sink); 1246 _EventSinkAdapter(this._sink);
1226 1247
1227 void add(T data) { _sink._add(data); } 1248 void add(T data) { _sink._add(data); }
1228 void addError(error) { _sink._addError(error); } 1249 void addError(error, [StackTrace stackTrace]) {
1250 _sink._addError(error, stackTrace);
1251 }
1229 void close() { _sink._close(); } 1252 void close() { _sink._close(); }
1230 } 1253 }
1231 1254
1232 1255
1233 /** 1256 /**
1234 * An [Iterable] like interface for the values of a [Stream]. 1257 * An [Iterable] like interface for the values of a [Stream].
1235 * 1258 *
1236 * This wraps a [Stream] and a subscription on the stream. It listens 1259 * This wraps a [Stream] and a subscription on the stream. It listens
1237 * on the stream, and completes the future returned by [moveNext] when the 1260 * on the stream, and completes the future returned by [moveNext] when the
1238 * next value becomes available. 1261 * next value becomes available.
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
1274 * 1297 *
1275 * If a [moveNext] call has been made, it will complete with `false` as value, 1298 * If a [moveNext] call has been made, it will complete with `false` as value,
1276 * as will all further calls to [moveNext]. 1299 * as will all further calls to [moveNext].
1277 * 1300 *
1278 * If you need to stop listening for values before the stream iterator is 1301 * If you need to stop listening for values before the stream iterator is
1279 * automatically closed, you must call [cancel] to ensure that the stream 1302 * automatically closed, you must call [cancel] to ensure that the stream
1280 * is properly closed. 1303 * is properly closed.
1281 */ 1304 */
1282 void cancel(); 1305 void cancel();
1283 } 1306 }
OLDNEW
« no previous file with comments | « sdk/lib/async/broadcast_stream_controller.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698