Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 140 }); | 140 }); |
| 141 }, | 141 }, |
| 142 onCancel: () { | 142 onCancel: () { |
| 143 if (timer != null) timer.cancel(); | 143 if (timer != null) timer.cancel(); |
| 144 timer = null; | 144 timer = null; |
| 145 }); | 145 }); |
| 146 return controller.stream; | 146 return controller.stream; |
| 147 } | 147 } |
| 148 | 148 |
| 149 /** | 149 /** |
| 150 * Creates a stream where all events of an existing stream are piped through | |
| 151 * a sink-transformation. | |
| 152 * | |
| 153 * The given [mapSink] closure is invoked when the returned stream is | |
| 154 * listened to. All events from the [source] are added into the event sink | |
| 155 * that is returned from the invocation. The transformation puts all | |
| 156 * transformed events into the sink the [mapSink] closure received during | |
| 157 * its invocation. Conceptually the [mapSink] creates a transformation pipe | |
| 158 * with the input sink being the returned [EventSink] and the output sink | |
| 159 * being the sink it received. | |
| 160 * | |
| 161 * This constructor is frequently used to build transformers. | |
| 162 * | |
| 163 * Example use for a duplicating transformer: | |
| 164 * | |
| 165 * class DuplicationSink implements EventSink<String> { | |
| 166 * final EventSink<String> _outputSink; | |
| 167 * DuplicationSink(this._outputSink); | |
| 168 * | |
| 169 * void add(String data) { | |
| 170 * _outputSink.add(data); | |
| 171 * _outputSink.add(data); | |
| 172 * } | |
| 173 * | |
| 174 * void addError(e, [st]) => _outputSink(e, st); | |
| 175 * void close() => _outputSink.close(); | |
| 176 * } | |
| 177 * | |
| 178 * class DuplicationTransformer implements StreamTransformer<String, Strin g> { | |
| 179 * // Some generic types ommitted for brevety. | |
| 180 * Stream bind(Stream stream) => new Stream<String>.eventTransform( | |
| 181 * stream, | |
| 182 * (EventSink sink) => new DuplicationSink(sink)); | |
| 183 * } | |
| 184 * | |
| 185 * stringStream.transform(new DuplicationTransformer()); | |
| 186 */ | |
| 187 factory Stream.eventTransformed(Stream source, | |
| 188 EventSink mapSink(EventSink<T> sink)) { | |
| 189 return new _BoundSinkStream(source, mapSink); | |
| 190 } | |
| 191 | |
| 192 /** | |
| 150 * Reports whether this stream is a broadcast stream. | 193 * Reports whether this stream is a broadcast stream. |
| 151 */ | 194 */ |
| 152 bool get isBroadcast => false; | 195 bool get isBroadcast => false; |
| 153 | 196 |
| 154 /** | 197 /** |
| 155 * Returns a multi-subscription stream that produces the same events as this. | 198 * Returns a multi-subscription stream that produces the same events as this. |
| 156 * | 199 * |
| 157 * If this stream is already a broadcast stream, it is returned unmodified. | 200 * If this stream is already a broadcast stream, it is returned unmodified. |
| 158 * | 201 * |
| 159 * If this stream is single-subscription, return a new stream that allows | 202 * If this stream is single-subscription, return a new stream that allows |
| (...skipping 783 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 943 } | 986 } |
| 944 | 987 |
| 945 | 988 |
| 946 /** | 989 /** |
| 947 * An interface that abstracts creation or handling of [Stream] events. | 990 * An interface that abstracts creation or handling of [Stream] events. |
| 948 */ | 991 */ |
| 949 abstract class EventSink<T> { | 992 abstract class EventSink<T> { |
| 950 /** Create a data event */ | 993 /** Create a data event */ |
| 951 void add(T event); | 994 void add(T event); |
| 952 /** Create an async error. */ | 995 /** Create an async error. */ |
| 953 void addError(errorEvent); | 996 void addError(errorEvent, [StackTrace stackTrace]); |
| 954 /** Request a stream to close. */ | 997 /** Request a stream to close. */ |
| 955 void close(); | 998 void close(); |
| 956 } | 999 } |
| 957 | 1000 |
| 958 | 1001 |
| 959 /** [Stream] wrapper that only exposes the [Stream] interface. */ | 1002 /** [Stream] wrapper that only exposes the [Stream] interface. */ |
| 960 class StreamView<T> extends Stream<T> { | 1003 class StreamView<T> extends Stream<T> { |
| 961 Stream<T> _stream; | 1004 Stream<T> _stream; |
| 962 | 1005 |
| 963 StreamView(this._stream); | 1006 StreamView(this._stream); |
| (...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1024 */ | 1067 */ |
| 1025 Future get done; | 1068 Future get done; |
| 1026 } | 1069 } |
| 1027 | 1070 |
| 1028 | 1071 |
| 1029 /** | 1072 /** |
| 1030 * The target of a [Stream.transform] call. | 1073 * The target of a [Stream.transform] call. |
| 1031 * | 1074 * |
| 1032 * The [Stream.transform] call will pass itself to this object and then return | 1075 * The [Stream.transform] call will pass itself to this object and then return |
| 1033 * the resulting stream. | 1076 * the resulting stream. |
| 1077 * | |
| 1078 * It is good practice to create transformers that can be used multiple times. | |
|
Lasse Reichstein Nielsen
2013/10/07 11:47:00
By "create", do you mean that it is good practice
floitsch
2013/10/10 15:39:57
Done.
| |
| 1034 */ | 1079 */ |
| 1035 abstract class StreamTransformer<S, T> { | 1080 abstract class StreamTransformer<S, T> { |
| 1081 | |
| 1036 /** | 1082 /** |
| 1037 * Create a [StreamTransformer] that delegates events to the given functions. | 1083 * Creates a [StreamTransformer]. |
| 1038 * | 1084 * |
| 1039 * This is actually a [StreamEventTransformer] where the event handling is | 1085 * The returned instance takes responsibility of binding ([bind]) and only |
| 1040 * performed by the function arguments. | 1086 * invokes the given closure [transformer] when a user starts listening on |
| 1041 * If an argument is omitted, it acts as the corresponding default method from | 1087 * the bound stream. At that point the transformer should start listening |
|
Lasse Reichstein Nielsen
2013/10/07 11:47:00
User listens on the returned stream, not the bound
floitsch
2013/10/10 15:39:57
rewritten.
| |
| 1042 * [StreamEventTransformer]. | 1088 * to the bound stream (which is given as argument) and return a |
| 1089 * [StreamSubscription]. The bound stream-transformer then sets the handlers | |
|
Lasse Reichstein Nielsen
2013/10/07 11:47:00
How is the stream-transformer bound, and to what?
floitsch
2013/10/10 15:39:57
rewritten.
| |
| 1090 * it received as part of the `listen` call. Missing handlers (or the ones | |
|
Lasse Reichstein Nielsen
2013/10/07 11:47:00
Sounds like the handlers, which were received as p
floitsch
2013/10/10 15:39:57
rewritten.
| |
| 1091 * that are `null`) are also set (with `null`). | |
|
Lasse Reichstein Nielsen
2013/10/07 11:47:00
Last sentence seems superflous.
floitsch
2013/10/10 15:39:57
It was meant to say that the Subscription can rely
| |
| 1043 * | 1092 * |
| 1044 * Example use: | 1093 * There are two common ways to create a StreamSubscription: |
| 1094 * | |
| 1095 * 1. by creating a new class that implements [StreamSubscription]. | |
| 1096 * Note that the subscription should run callbacks in the [Zone] the | |
| 1097 * stream was listened to. | |
| 1098 * 2. by allocating a [StreamController] and to return the result of | |
| 1099 * listening to its stream. | |
| 1100 * | |
| 1101 * Example use of a duplicating transformer: | |
| 1045 * | 1102 * |
| 1046 * stringStream.transform(new StreamTransformer<String, String>( | 1103 * stringStream.transform(new StreamTransformer<String, String>( |
| 1104 * (Stream<String> input, bool cancelOnError) { | |
| 1105 * StreamController<String> controller; | |
| 1106 * StreamSubscription<String> subscription; | |
| 1107 * controller = new StreamController<String>( | |
| 1108 * onListen: () { | |
| 1109 * subscription = input.listen((data) { | |
| 1110 * // Duplicate the data. | |
| 1111 * controller.add(data); | |
| 1112 * controller.add(data); | |
| 1113 * }, | |
| 1114 * onError: controller.addError, | |
| 1115 * onDone: controller.close, | |
| 1116 * cancelOnError: cancelOnError); | |
| 1117 * }, | |
| 1118 * onPause: subscription.pause, | |
| 1119 * onResume: subscription.resume, | |
| 1120 * onCancel: subscription.cancel, | |
| 1121 * sync: true); // One of the few places, where sync is correct. | |
|
Lasse Reichstein Nielsen
2013/10/07 11:47:00
Either remove the comment or elaborate on why it i
floitsch
2013/10/10 15:39:57
Removed.
| |
| 1122 * return controller.stream.listen(null); | |
| 1123 * }); | |
| 1124 */ | |
| 1125 const factory StreamTransformer( | |
| 1126 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) | |
| 1127 = _StreamSubscriptionTransformer; | |
| 1128 | |
| 1129 /** | |
| 1130 * Creates a [StreamTransformer] that delegates events to the given functions. | |
| 1131 * | |
| 1132 * This constructor returns a transformer that can only be used once. | |
|
Lasse Reichstein Nielsen
2013/10/07 11:47:00
Why can it only be used once? If the handlers them
floitsch
2013/10/10 15:39:57
Done.
| |
| 1133 * | |
| 1134 * Example use of a duplicating transformer: | |
| 1135 * | |
| 1136 * stringStream.transform(new StreamTransformer<String, String>.fromHandle rs( | |
| 1047 * handleData: (String value, EventSink<String> sink) { | 1137 * handleData: (String value, EventSink<String> sink) { |
| 1048 * sink.add(value); | 1138 * sink.add(value); |
| 1049 * sink.add(value); // Duplicate the incoming events. | 1139 * sink.add(value); // Duplicate the incoming events. |
| 1050 * })); | 1140 * })); |
| 1051 * | |
| 1052 */ | 1141 */ |
| 1053 factory StreamTransformer({ | 1142 factory StreamTransformer.fromHandlers({ |
| 1054 void handleData(S data, EventSink<T> sink), | 1143 void handleData(S data, EventSink<T> sink), |
| 1055 Function handleError, | 1144 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), |
| 1056 void handleDone(EventSink<T> sink)}) { | 1145 void handleDone(EventSink<T> sink)}) |
| 1057 return new _StreamTransformerImpl<S, T>(handleData, | 1146 = _StreamHandlerTransformer; |
| 1058 handleError, | |
| 1059 handleDone); | |
| 1060 } | |
| 1061 | 1147 |
| 1062 Stream<T> bind(Stream<S> stream); | 1148 Stream<T> bind(Stream<S> stream); |
| 1063 } | 1149 } |
| 1064 | 1150 |
| 1065 | |
| 1066 /** | |
| 1067 * Base class for transformers that modifies stream events. | |
| 1068 * | |
| 1069 * A [StreamEventTransformer] transforms incoming Stream | |
| 1070 * events of one kind into outgoing events of (possibly) another kind. | |
| 1071 * | |
| 1072 * Subscribing on the stream returned by [bind] is the same as subscribing on | |
| 1073 * the source stream, except that events are passed through the [transformer] | |
| 1074 * before being emitted. The transformer may generate any number and | |
| 1075 * types of events for each incoming event. Pauses on the returned | |
| 1076 * subscription are forwarded to this stream. | |
| 1077 * | |
| 1078 * An example that duplicates all data events: | |
| 1079 * | |
| 1080 * class DoubleTransformer<T> extends StreamEventTransformer<T, T> { | |
| 1081 * void handleData(T data, EventSink<T> sink) { | |
| 1082 * sink.add(value); | |
| 1083 * sink.add(value); | |
| 1084 * } | |
| 1085 * } | |
| 1086 * someTypeStream.transform(new DoubleTransformer<Type>()); | |
| 1087 * | |
| 1088 * The default implementations of the "handle" methods forward | |
| 1089 * the events unmodified. If using the default [handleData] the generic type [T] | |
| 1090 * needs to be assignable to [S]. | |
| 1091 */ | |
| 1092 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { | |
| 1093 const StreamEventTransformer(); | |
| 1094 | |
| 1095 Stream<T> bind(Stream<S> source) { | |
| 1096 return new EventTransformStream<S, T>(source, this); | |
| 1097 } | |
| 1098 | |
| 1099 /** | |
| 1100 * Act on incoming data event. | |
| 1101 * | |
| 1102 * The method may generate any number of events on the sink, but should | |
| 1103 * not throw. | |
| 1104 */ | |
| 1105 void handleData(S event, EventSink<T> sink) { | |
| 1106 var data = event; | |
| 1107 sink.add(data); | |
| 1108 } | |
| 1109 | |
| 1110 /** | |
| 1111 * Act on incoming error event. | |
| 1112 * | |
| 1113 * The method may generate any number of events on the sink, but should | |
| 1114 * not throw. | |
| 1115 */ | |
| 1116 void handleError(error, EventSink<T> sink) { | |
| 1117 sink.addError(error); | |
| 1118 } | |
| 1119 | |
| 1120 /** | |
| 1121 * Act on incoming done event. | |
| 1122 * | |
| 1123 * The method may generate any number of events on the sink, but should | |
| 1124 * not throw. | |
| 1125 */ | |
| 1126 void handleDone(EventSink<T> sink){ | |
| 1127 sink.close(); | |
| 1128 } | |
| 1129 } | |
| 1130 | |
| 1131 | |
| 1132 /** | |
| 1133 * Stream that transforms another stream by intercepting and replacing events. | |
| 1134 * | |
| 1135 * This [Stream] is a transformation of a source stream. Listening on this | |
| 1136 * stream is the same as listening on the source stream, except that events | |
| 1137 * are intercepted and modified by a [StreamEventTransformer] before becoming | |
| 1138 * events on this stream. | |
| 1139 */ | |
| 1140 class EventTransformStream<S, T> extends Stream<T> { | |
| 1141 final Stream<S> _source; | |
| 1142 final StreamEventTransformer _transformer; | |
| 1143 EventTransformStream(Stream<S> source, | |
| 1144 StreamEventTransformer<S, T> transformer) | |
| 1145 : _source = source, _transformer = transformer; | |
| 1146 | |
| 1147 StreamSubscription<T> listen(void onData(T data), | |
| 1148 { Function onError, | |
| 1149 void onDone(), | |
| 1150 bool cancelOnError }) { | |
| 1151 if (onData == null) onData = _nullDataHandler; | |
| 1152 if (onError == null) onError = _nullErrorHandler; | |
| 1153 if (onDone == null) onDone = _nullDoneHandler; | |
| 1154 cancelOnError = identical(true, cancelOnError); | |
| 1155 return new _EventTransformStreamSubscription(_source, _transformer, | |
| 1156 onData, onError, onDone, | |
| 1157 cancelOnError); | |
| 1158 } | |
| 1159 } | |
| 1160 | |
| 1161 class _EventTransformStreamSubscription<S, T> | |
| 1162 extends _BufferingStreamSubscription<T> { | |
| 1163 /** The transformer used to transform events. */ | |
| 1164 final StreamEventTransformer<S, T> _transformer; | |
| 1165 | |
| 1166 /** Whether this stream has sent a done event. */ | |
| 1167 bool _isClosed = false; | |
| 1168 | |
| 1169 /** Source of incoming events. */ | |
| 1170 StreamSubscription<S> _subscription; | |
| 1171 | |
| 1172 /** Cached EventSink wrapper for this class. */ | |
| 1173 EventSink<T> _sink; | |
| 1174 | |
| 1175 _EventTransformStreamSubscription(Stream<S> source, | |
| 1176 this._transformer, | |
| 1177 void onData(T data), | |
| 1178 Function onError, | |
| 1179 void onDone(), | |
| 1180 bool cancelOnError) | |
| 1181 : super(onData, onError, onDone, cancelOnError) { | |
| 1182 _sink = new _EventSinkAdapter<T>(this); | |
| 1183 _subscription = source.listen(_handleData, | |
| 1184 onError: _handleError, | |
| 1185 onDone: _handleDone); | |
| 1186 } | |
| 1187 | |
| 1188 /** Whether this subscription is still subscribed to its source. */ | |
| 1189 bool get _isSubscribed => _subscription != null; | |
| 1190 | |
| 1191 void _onPause() { | |
| 1192 if (_isSubscribed) _subscription.pause(); | |
| 1193 } | |
| 1194 | |
| 1195 void _onResume() { | |
| 1196 if (_isSubscribed) _subscription.resume(); | |
| 1197 } | |
| 1198 | |
| 1199 void _onCancel() { | |
| 1200 if (_isSubscribed) { | |
| 1201 StreamSubscription subscription = _subscription; | |
| 1202 _subscription = null; | |
| 1203 subscription.cancel(); | |
| 1204 } | |
| 1205 _isClosed = true; | |
| 1206 } | |
| 1207 | |
| 1208 void _handleData(S data) { | |
| 1209 try { | |
| 1210 _transformer.handleData(data, _sink); | |
| 1211 } catch (e, s) { | |
| 1212 _addError(_asyncError(e, s), s); | |
| 1213 } | |
| 1214 } | |
| 1215 | |
| 1216 void _handleError(error, [stackTrace]) { | |
| 1217 try { | |
| 1218 _transformer.handleError(error, _sink); | |
| 1219 } catch (e, s) { | |
| 1220 if (identical(e, error)) { | |
| 1221 _addError(error, stackTrace); | |
| 1222 } else { | |
| 1223 _addError(_asyncError(e, s), s); | |
| 1224 } | |
| 1225 } | |
| 1226 } | |
| 1227 | |
| 1228 void _handleDone() { | |
| 1229 try { | |
| 1230 _subscription = null; | |
| 1231 _transformer.handleDone(_sink); | |
| 1232 } catch (e, s) { | |
| 1233 _addError(_asyncError(e, s), s); | |
| 1234 } | |
| 1235 } | |
| 1236 } | |
| 1237 | |
| 1238 class _EventSinkAdapter<T> implements EventSink<T> { | |
| 1239 _EventSink _sink; | |
| 1240 _EventSinkAdapter(this._sink); | |
| 1241 | |
| 1242 void add(T data) { _sink._add(data); } | |
| 1243 void addError(error, [StackTrace stackTrace]) { | |
| 1244 _sink._addError(error, stackTrace); | |
| 1245 } | |
| 1246 void close() { _sink._close(); } | |
| 1247 } | |
| 1248 | |
| 1249 | |
| 1250 /** | 1151 /** |
| 1251 * An [Iterable] like interface for the values of a [Stream]. | 1152 * An [Iterable] like interface for the values of a [Stream]. |
| 1252 * | 1153 * |
| 1253 * This wraps a [Stream] and a subscription on the stream. It listens | 1154 * This wraps a [Stream] and a subscription on the stream. It listens |
| 1254 * on the stream, and completes the future returned by [moveNext] when the | 1155 * on the stream, and completes the future returned by [moveNext] when the |
| 1255 * next value becomes available. | 1156 * next value becomes available. |
| 1256 * | 1157 * |
| 1257 * NOTICE: This is a tentative design. This class may change. | 1158 * NOTICE: This is a tentative design. This class may change. |
| 1258 */ | 1159 */ |
| 1259 abstract class StreamIterator<T> { | 1160 abstract class StreamIterator<T> { |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1291 * | 1192 * |
| 1292 * If a [moveNext] call has been made, it will complete with `false` as value, | 1193 * If a [moveNext] call has been made, it will complete with `false` as value, |
| 1293 * as will all further calls to [moveNext]. | 1194 * as will all further calls to [moveNext]. |
| 1294 * | 1195 * |
| 1295 * If you need to stop listening for values before the stream iterator is | 1196 * If you need to stop listening for values before the stream iterator is |
| 1296 * automatically closed, you must call [cancel] to ensure that the stream | 1197 * automatically closed, you must call [cancel] to ensure that the stream |
| 1297 * is properly closed. | 1198 * is properly closed. |
| 1298 */ | 1199 */ |
| 1299 void cancel(); | 1200 void cancel(); |
| 1300 } | 1201 } |
| OLD | NEW |