OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
140 }); | 140 }); |
141 }, | 141 }, |
142 onCancel: () { | 142 onCancel: () { |
143 if (timer != null) timer.cancel(); | 143 if (timer != null) timer.cancel(); |
144 timer = null; | 144 timer = null; |
145 }); | 145 }); |
146 return controller.stream; | 146 return controller.stream; |
147 } | 147 } |
148 | 148 |
149 /** | 149 /** |
| 150 * Creates a stream where all events of an existing stream are piped through |
| 151 * a sink-transformation. |
| 152 * |
| 153 * The given [mapSink] closure is invoked when the returned stream is |
| 154 * listened to. All events from the [source] are added into the event sink |
| 155 * that is returned from the invocation. The transformation puts all |
| 156 * transformed events into the sink the [mapSink] closure received during |
| 157 * its invocation. Conceptually the [mapSink] creates a transformation pipe |
| 158 * with the input sink being the returned [EventSink] and the output sink |
| 159 * being the sink it received. |
| 160 * |
| 161 * This constructor is frequently used to build transformers. |
| 162 * |
| 163 * Example use for a duplicating transformer: |
| 164 * |
| 165 * class DuplicationSink implements EventSink<String> { |
| 166 * final EventSink<String> _outputSink; |
| 167 * DuplicationSink(this._outputSink); |
| 168 * |
| 169 * void add(String data) { |
| 170 * _outputSink.add(data); |
| 171 * _outputSink.add(data); |
| 172 * } |
| 173 * |
| 174 * void addError(e, [st]) => _outputSink(e, st); |
| 175 * void close() => _outputSink.close(); |
| 176 * } |
| 177 * |
| 178 * class DuplicationTransformer implements StreamTransformer<String, Strin
g> { |
| 179 * // Some generic types ommitted for brevety. |
| 180 * Stream bind(Stream stream) => new Stream<String>.eventTransform( |
| 181 * stream, |
| 182 * (EventSink sink) => new DuplicationSink(sink)); |
| 183 * } |
| 184 * |
| 185 * stringStream.transform(new DuplicationTransformer()); |
| 186 */ |
| 187 factory Stream.eventTransformed(Stream source, |
| 188 EventSink mapSink(EventSink<T> sink)) { |
| 189 return new _BoundSinkStream(source, mapSink); |
| 190 } |
| 191 |
| 192 /** |
150 * Reports whether this stream is a broadcast stream. | 193 * Reports whether this stream is a broadcast stream. |
151 */ | 194 */ |
152 bool get isBroadcast => false; | 195 bool get isBroadcast => false; |
153 | 196 |
154 /** | 197 /** |
155 * Returns a multi-subscription stream that produces the same events as this. | 198 * Returns a multi-subscription stream that produces the same events as this. |
156 * | 199 * |
157 * If this stream is already a broadcast stream, it is returned unmodified. | 200 * If this stream is already a broadcast stream, it is returned unmodified. |
158 * | 201 * |
159 * If this stream is single-subscription, return a new stream that allows | 202 * If this stream is single-subscription, return a new stream that allows |
(...skipping 789 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
949 } | 992 } |
950 | 993 |
951 | 994 |
952 /** | 995 /** |
953 * An interface that abstracts creation or handling of [Stream] events. | 996 * An interface that abstracts creation or handling of [Stream] events. |
954 */ | 997 */ |
955 abstract class EventSink<T> { | 998 abstract class EventSink<T> { |
956 /** Create a data event */ | 999 /** Create a data event */ |
957 void add(T event); | 1000 void add(T event); |
958 /** Create an async error. */ | 1001 /** Create an async error. */ |
959 void addError(errorEvent); | 1002 void addError(errorEvent, [StackTrace stackTrace]); |
960 /** Request a stream to close. */ | 1003 /** Request a stream to close. */ |
961 void close(); | 1004 void close(); |
962 } | 1005 } |
963 | 1006 |
964 | 1007 |
965 /** [Stream] wrapper that only exposes the [Stream] interface. */ | 1008 /** [Stream] wrapper that only exposes the [Stream] interface. */ |
966 class StreamView<T> extends Stream<T> { | 1009 class StreamView<T> extends Stream<T> { |
967 Stream<T> _stream; | 1010 Stream<T> _stream; |
968 | 1011 |
969 StreamView(this._stream); | 1012 StreamView(this._stream); |
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1030 */ | 1073 */ |
1031 Future get done; | 1074 Future get done; |
1032 } | 1075 } |
1033 | 1076 |
1034 | 1077 |
1035 /** | 1078 /** |
1036 * The target of a [Stream.transform] call. | 1079 * The target of a [Stream.transform] call. |
1037 * | 1080 * |
1038 * The [Stream.transform] call will pass itself to this object and then return | 1081 * The [Stream.transform] call will pass itself to this object and then return |
1039 * the resulting stream. | 1082 * the resulting stream. |
| 1083 * |
| 1084 * It is good practice to write transformers that can be used multiple times. |
1040 */ | 1085 */ |
1041 abstract class StreamTransformer<S, T> { | 1086 abstract class StreamTransformer<S, T> { |
| 1087 |
1042 /** | 1088 /** |
1043 * Create a [StreamTransformer] that delegates events to the given functions. | 1089 * Creates a [StreamTransformer]. |
1044 * | 1090 * |
1045 * This is actually a [StreamEventTransformer] where the event handling is | 1091 * The returned instance takes responsibility of implementing ([bind]). |
1046 * performed by the function arguments. | 1092 * When the user invokes `bind` it returns a new "bound" stream. Only when |
1047 * If an argument is omitted, it acts as the corresponding default method from | 1093 * the user starts listening to the bound stream, the `listen` method |
1048 * [StreamEventTransformer]. | 1094 * invokes the given closure [transformer]. |
1049 * | 1095 * |
1050 * Example use: | 1096 * The [transformer] closure receives the stream, that was bound, as argument |
| 1097 * and returns a [StreamSubscription]. In almost all cases the closure |
| 1098 * listens itself to the stream that is given as argument. |
| 1099 * |
| 1100 * The result of invoking the [transformer] closure is a [StreamSubscription]. |
| 1101 * The bound stream-transformer (created by the `bind` method above) then sets |
| 1102 * the handlers it received as part of the `listen` call. |
| 1103 * |
| 1104 * Conceptually this can be summarized as follows: |
| 1105 * |
| 1106 * 1. `var transformer = new StreamTransformer(transformerClosure);` |
| 1107 * creates a `StreamTransformer` that supports the `bind` method. |
| 1108 * 2. `var boundStream = stream.transform(transformer);` binds the `stream` |
| 1109 * and returns a bound stream that has a pointer to `stream`. |
| 1110 * 3. `boundStream.listen(f1, onError: f2, onDone: f3, cancelOnError: b)` |
| 1111 * starts the listening and transformation. This is accomplished |
| 1112 * in 2 steps: first the `boundStream` invokes the `transformerClosure` with |
| 1113 * the `stream` it captured: `transformerClosure(stream, b)`. |
| 1114 * The result `subscription`, a [StreamSubscription], is then |
| 1115 * updated to receive its handlers: `subscription.onData(f1)`, |
| 1116 * `subscription.onError(f2)`, `subscription(f3)`. Finally the subscription |
| 1117 * is returned as result of the `listen` call. |
| 1118 * |
| 1119 * There are two common ways to create a StreamSubscription: |
| 1120 * |
| 1121 * 1. by creating a new class that implements [StreamSubscription]. |
| 1122 * Note that the subscription should run callbacks in the [Zone] the |
| 1123 * stream was listened to. |
| 1124 * 2. by allocating a [StreamController] and to return the result of |
| 1125 * listening to its stream. |
| 1126 * |
| 1127 * Example use of a duplicating transformer: |
1051 * | 1128 * |
1052 * stringStream.transform(new StreamTransformer<String, String>( | 1129 * stringStream.transform(new StreamTransformer<String, String>( |
| 1130 * (Stream<String> input, bool cancelOnError) { |
| 1131 * StreamController<String> controller; |
| 1132 * StreamSubscription<String> subscription; |
| 1133 * controller = new StreamController<String>( |
| 1134 * onListen: () { |
| 1135 * subscription = input.listen((data) { |
| 1136 * // Duplicate the data. |
| 1137 * controller.add(data); |
| 1138 * controller.add(data); |
| 1139 * }, |
| 1140 * onError: controller.addError, |
| 1141 * onDone: controller.close, |
| 1142 * cancelOnError: cancelOnError); |
| 1143 * }, |
| 1144 * onPause: subscription.pause, |
| 1145 * onResume: subscription.resume, |
| 1146 * onCancel: subscription.cancel, |
| 1147 * sync: true); |
| 1148 * return controller.stream.listen(null); |
| 1149 * }); |
| 1150 */ |
| 1151 const factory StreamTransformer( |
| 1152 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) |
| 1153 = _StreamSubscriptionTransformer; |
| 1154 |
| 1155 /** |
| 1156 * Creates a [StreamTransformer] that delegates events to the given functions. |
| 1157 * |
| 1158 * Example use of a duplicating transformer: |
| 1159 * |
| 1160 * stringStream.transform(new StreamTransformer<String, String>.fromHandle
rs( |
1053 * handleData: (String value, EventSink<String> sink) { | 1161 * handleData: (String value, EventSink<String> sink) { |
1054 * sink.add(value); | 1162 * sink.add(value); |
1055 * sink.add(value); // Duplicate the incoming events. | 1163 * sink.add(value); // Duplicate the incoming events. |
1056 * })); | 1164 * })); |
1057 * | |
1058 */ | 1165 */ |
1059 factory StreamTransformer({ | 1166 factory StreamTransformer.fromHandlers({ |
1060 void handleData(S data, EventSink<T> sink), | 1167 void handleData(S data, EventSink<T> sink), |
1061 Function handleError, | 1168 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), |
1062 void handleDone(EventSink<T> sink)}) { | 1169 void handleDone(EventSink<T> sink)}) |
1063 return new _StreamTransformerImpl<S, T>(handleData, | 1170 = _StreamHandlerTransformer; |
1064 handleError, | |
1065 handleDone); | |
1066 } | |
1067 | 1171 |
1068 Stream<T> bind(Stream<S> stream); | 1172 Stream<T> bind(Stream<S> stream); |
1069 } | 1173 } |
1070 | 1174 |
1071 | |
1072 /** | |
1073 * Base class for transformers that modifies stream events. | |
1074 * | |
1075 * A [StreamEventTransformer] transforms incoming Stream | |
1076 * events of one kind into outgoing events of (possibly) another kind. | |
1077 * | |
1078 * Subscribing on the stream returned by [bind] is the same as subscribing on | |
1079 * the source stream, except that events are passed through the [transformer] | |
1080 * before being emitted. The transformer may generate any number and | |
1081 * types of events for each incoming event. Pauses on the returned | |
1082 * subscription are forwarded to this stream. | |
1083 * | |
1084 * An example that duplicates all data events: | |
1085 * | |
1086 * class DoubleTransformer<T> extends StreamEventTransformer<T, T> { | |
1087 * void handleData(T data, EventSink<T> sink) { | |
1088 * sink.add(value); | |
1089 * sink.add(value); | |
1090 * } | |
1091 * } | |
1092 * someTypeStream.transform(new DoubleTransformer<Type>()); | |
1093 * | |
1094 * The default implementations of the "handle" methods forward | |
1095 * the events unmodified. If using the default [handleData] the generic type [T] | |
1096 * needs to be assignable to [S]. | |
1097 */ | |
1098 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { | |
1099 const StreamEventTransformer(); | |
1100 | |
1101 Stream<T> bind(Stream<S> source) { | |
1102 return new EventTransformStream<S, T>(source, this); | |
1103 } | |
1104 | |
1105 /** | |
1106 * Act on incoming data event. | |
1107 * | |
1108 * The method may generate any number of events on the sink, but should | |
1109 * not throw. | |
1110 */ | |
1111 void handleData(S event, EventSink<T> sink) { | |
1112 var data = event; | |
1113 sink.add(data); | |
1114 } | |
1115 | |
1116 /** | |
1117 * Act on incoming error event. | |
1118 * | |
1119 * The method may generate any number of events on the sink, but should | |
1120 * not throw. | |
1121 */ | |
1122 void handleError(error, EventSink<T> sink) { | |
1123 sink.addError(error); | |
1124 } | |
1125 | |
1126 /** | |
1127 * Act on incoming done event. | |
1128 * | |
1129 * The method may generate any number of events on the sink, but should | |
1130 * not throw. | |
1131 */ | |
1132 void handleDone(EventSink<T> sink){ | |
1133 sink.close(); | |
1134 } | |
1135 } | |
1136 | |
1137 | |
1138 /** | |
1139 * Stream that transforms another stream by intercepting and replacing events. | |
1140 * | |
1141 * This [Stream] is a transformation of a source stream. Listening on this | |
1142 * stream is the same as listening on the source stream, except that events | |
1143 * are intercepted and modified by a [StreamEventTransformer] before becoming | |
1144 * events on this stream. | |
1145 */ | |
1146 class EventTransformStream<S, T> extends Stream<T> { | |
1147 final Stream<S> _source; | |
1148 final StreamEventTransformer _transformer; | |
1149 EventTransformStream(Stream<S> source, | |
1150 StreamEventTransformer<S, T> transformer) | |
1151 : _source = source, _transformer = transformer; | |
1152 | |
1153 StreamSubscription<T> listen(void onData(T data), | |
1154 { Function onError, | |
1155 void onDone(), | |
1156 bool cancelOnError }) { | |
1157 if (onData == null) onData = _nullDataHandler; | |
1158 if (onError == null) onError = _nullErrorHandler; | |
1159 if (onDone == null) onDone = _nullDoneHandler; | |
1160 cancelOnError = identical(true, cancelOnError); | |
1161 return new _EventTransformStreamSubscription(_source, _transformer, | |
1162 onData, onError, onDone, | |
1163 cancelOnError); | |
1164 } | |
1165 } | |
1166 | |
1167 class _EventTransformStreamSubscription<S, T> | |
1168 extends _BufferingStreamSubscription<T> { | |
1169 /** The transformer used to transform events. */ | |
1170 final StreamEventTransformer<S, T> _transformer; | |
1171 | |
1172 /** Whether this stream has sent a done event. */ | |
1173 bool _isClosed = false; | |
1174 | |
1175 /** Source of incoming events. */ | |
1176 StreamSubscription<S> _subscription; | |
1177 | |
1178 /** Cached EventSink wrapper for this class. */ | |
1179 EventSink<T> _sink; | |
1180 | |
1181 _EventTransformStreamSubscription(Stream<S> source, | |
1182 this._transformer, | |
1183 void onData(T data), | |
1184 Function onError, | |
1185 void onDone(), | |
1186 bool cancelOnError) | |
1187 : super(onData, onError, onDone, cancelOnError) { | |
1188 _sink = new _EventSinkAdapter<T>(this); | |
1189 _subscription = source.listen(_handleData, | |
1190 onError: _handleError, | |
1191 onDone: _handleDone); | |
1192 } | |
1193 | |
1194 /** Whether this subscription is still subscribed to its source. */ | |
1195 bool get _isSubscribed => _subscription != null; | |
1196 | |
1197 void _onPause() { | |
1198 if (_isSubscribed) _subscription.pause(); | |
1199 } | |
1200 | |
1201 void _onResume() { | |
1202 if (_isSubscribed) _subscription.resume(); | |
1203 } | |
1204 | |
1205 void _onCancel() { | |
1206 if (_isSubscribed) { | |
1207 StreamSubscription subscription = _subscription; | |
1208 _subscription = null; | |
1209 subscription.cancel(); | |
1210 } | |
1211 _isClosed = true; | |
1212 } | |
1213 | |
1214 void _handleData(S data) { | |
1215 try { | |
1216 _transformer.handleData(data, _sink); | |
1217 } catch (e, s) { | |
1218 _addError(_asyncError(e, s), s); | |
1219 } | |
1220 } | |
1221 | |
1222 void _handleError(error, [stackTrace]) { | |
1223 try { | |
1224 _transformer.handleError(error, _sink); | |
1225 } catch (e, s) { | |
1226 if (identical(e, error)) { | |
1227 _addError(error, stackTrace); | |
1228 } else { | |
1229 _addError(_asyncError(e, s), s); | |
1230 } | |
1231 } | |
1232 } | |
1233 | |
1234 void _handleDone() { | |
1235 try { | |
1236 _subscription = null; | |
1237 _transformer.handleDone(_sink); | |
1238 } catch (e, s) { | |
1239 _addError(_asyncError(e, s), s); | |
1240 } | |
1241 } | |
1242 } | |
1243 | |
1244 class _EventSinkAdapter<T> implements EventSink<T> { | |
1245 _EventSink _sink; | |
1246 _EventSinkAdapter(this._sink); | |
1247 | |
1248 void add(T data) { _sink._add(data); } | |
1249 void addError(error, [StackTrace stackTrace]) { | |
1250 _sink._addError(error, stackTrace); | |
1251 } | |
1252 void close() { _sink._close(); } | |
1253 } | |
1254 | |
1255 | |
1256 /** | 1175 /** |
1257 * An [Iterable] like interface for the values of a [Stream]. | 1176 * An [Iterable] like interface for the values of a [Stream]. |
1258 * | 1177 * |
1259 * This wraps a [Stream] and a subscription on the stream. It listens | 1178 * This wraps a [Stream] and a subscription on the stream. It listens |
1260 * on the stream, and completes the future returned by [moveNext] when the | 1179 * on the stream, and completes the future returned by [moveNext] when the |
1261 * next value becomes available. | 1180 * next value becomes available. |
1262 * | 1181 * |
1263 * NOTICE: This is a tentative design. This class may change. | 1182 * NOTICE: This is a tentative design. This class may change. |
1264 */ | 1183 */ |
1265 abstract class StreamIterator<T> { | 1184 abstract class StreamIterator<T> { |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1297 * | 1216 * |
1298 * If a [moveNext] call has been made, it will complete with `false` as value, | 1217 * If a [moveNext] call has been made, it will complete with `false` as value, |
1299 * as will all further calls to [moveNext]. | 1218 * as will all further calls to [moveNext]. |
1300 * | 1219 * |
1301 * If you need to stop listening for values before the stream iterator is | 1220 * If you need to stop listening for values before the stream iterator is |
1302 * automatically closed, you must call [cancel] to ensure that the stream | 1221 * automatically closed, you must call [cancel] to ensure that the stream |
1303 * is properly closed. | 1222 * is properly closed. |
1304 */ | 1223 */ |
1305 void cancel(); | 1224 void cancel(); |
1306 } | 1225 } |
OLD | NEW |