OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
55 abstract class Stream<T> { | 55 abstract class Stream<T> { |
56 Stream(); | 56 Stream(); |
57 | 57 |
58 /** | 58 /** |
59 * Creates a new single-subscription stream from the future. | 59 * Creates a new single-subscription stream from the future. |
60 * | 60 * |
61 * When the future completes, the stream will fire one event, either | 61 * When the future completes, the stream will fire one event, either |
62 * data or error, and then close with a done-event. | 62 * data or error, and then close with a done-event. |
63 */ | 63 */ |
64 factory Stream.fromFuture(Future<T> future) { | 64 factory Stream.fromFuture(Future<T> future) { |
65 StreamController<T> controller = new StreamController<T>(); | 65 _StreamImpl<T> stream = new _SingleStreamImpl<T>(); |
66 future.then((value) { | 66 future.then((value) { |
67 controller.add(value); | 67 stream._add(value); |
68 controller.close(); | 68 stream._close(); |
69 }, | 69 }, |
70 onError: (error) { | 70 onError: (error) { |
71 controller.addError(error); | 71 stream._addError(error); |
72 controller.close(); | 72 stream._close(); |
73 }); | 73 }); |
74 return controller.stream; | 74 return stream; |
75 } | 75 } |
76 | 76 |
77 /** | 77 /** |
78 * Creates a single-subscription stream that gets its data from [data]. | 78 * Creates a single-subscription stream that gets its data from [data]. |
79 */ | 79 */ |
80 factory Stream.fromIterable(Iterable<T> data) { | 80 factory Stream.fromIterable(Iterable<T> data) { |
81 return new _GeneratedStreamImpl<T>( | 81 _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data); |
82 () => new _IterablePendingEvents<T>(data)); | 82 return new _GeneratedSingleStreamImpl<T>(iterableEvents); |
83 } | 83 } |
84 | 84 |
85 /** | 85 /** |
86 * Creates a stream that repeatedly emits events at [period] intervals. | 86 * Creates a stream that repeatedly emits events at [period] intervals. |
87 * | 87 * |
88 * The event values are computed by invoking [computation]. The argument to | 88 * The event values are computed by invoking [computation]. The argument to |
89 * this callback is an integer that starts with 0 and is incremented for | 89 * this callback is an integer that starts with 0 and is incremented for |
90 * every event. | 90 * every event. |
91 * | 91 * |
92 * If [computation] is omitted the event values will all be `null`. | 92 * If [computation] is omitted the event values will all be `null`. |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
145 * Reports whether this stream is a broadcast stream. | 145 * Reports whether this stream is a broadcast stream. |
146 */ | 146 */ |
147 bool get isBroadcast => false; | 147 bool get isBroadcast => false; |
148 | 148 |
149 /** | 149 /** |
150 * Returns a multi-subscription stream that produces the same events as this. | 150 * Returns a multi-subscription stream that produces the same events as this. |
151 * | 151 * |
152 * If this stream is single-subscription, return a new stream that allows | 152 * If this stream is single-subscription, return a new stream that allows |
153 * multiple subscribers. It will subscribe to this stream when its first | 153 * multiple subscribers. It will subscribe to this stream when its first |
154 * subscriber is added, and unsubscribe again when the last subscription is | 154 * subscriber is added, and unsubscribe again when the last subscription is |
155 * canceled. | 155 * cancelled. |
156 * | 156 * |
157 * If this stream is already a broadcast stream, it is returned unmodified. | 157 * If this stream is already a broadcast stream, it is returned unmodified. |
158 */ | 158 */ |
159 Stream<T> asBroadcastStream() { | 159 Stream<T> asBroadcastStream() { |
160 if (isBroadcast) return this; | 160 if (isBroadcast) return this; |
161 return new _SingleStreamMultiplexer<T>(this); | 161 return new _SingleStreamMultiplexer<T>(this); |
162 } | 162 } |
163 | 163 |
164 /** | 164 /** |
165 * Adds a subscription to this stream. | 165 * Adds a subscription to this stream. |
(...skipping 876 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1042 * someTypeStream.transform(new DoubleTransformer<Type>()); | 1042 * someTypeStream.transform(new DoubleTransformer<Type>()); |
1043 * | 1043 * |
1044 * The default implementations of the "handle" methods forward | 1044 * The default implementations of the "handle" methods forward |
1045 * the events unmodified. If using the default [handleData] the generic type [T] | 1045 * the events unmodified. If using the default [handleData] the generic type [T] |
1046 * needs to be assignable to [S]. | 1046 * needs to be assignable to [S]. |
1047 */ | 1047 */ |
1048 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { | 1048 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { |
1049 const StreamEventTransformer(); | 1049 const StreamEventTransformer(); |
1050 | 1050 |
1051 Stream<T> bind(Stream<S> source) { | 1051 Stream<T> bind(Stream<S> source) { |
1052 return new EventTransformStream<S, T>(source, this); | 1052 // Hackish way of buffering data that goes out of the event-transformer. |
| 1053 // TODO(floitsch): replace this with a correct solution. |
| 1054 Stream transformingStream = new EventTransformStream<S, T>(source, this); |
| 1055 StreamController controller; |
| 1056 StreamSubscription subscription; |
| 1057 controller = new StreamController<T>( |
| 1058 onListen: () { |
| 1059 subscription = transformingStream.listen( |
| 1060 controller.add, |
| 1061 onError: controller.addError, |
| 1062 onDone: controller.close); |
| 1063 }, |
| 1064 onPause: () => subscription.pause(), |
| 1065 onResume: () => subscription.resume(), |
| 1066 onCancel: () => subscription.cancel()); |
| 1067 return controller.stream; |
1053 } | 1068 } |
1054 | 1069 |
1055 /** | 1070 /** |
1056 * Act on incoming data event. | 1071 * Act on incoming data event. |
1057 * | 1072 * |
1058 * The method may generate any number of events on the sink, but should | 1073 * The method may generate any number of events on the sink, but should |
1059 * not throw. | 1074 * not throw. |
1060 */ | 1075 */ |
1061 void handleData(S event, EventSink<T> sink) { | 1076 void handleData(S event, EventSink<T> sink) { |
1062 var data = event; | 1077 var data = event; |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1097 final Stream<S> _source; | 1112 final Stream<S> _source; |
1098 final StreamEventTransformer _transformer; | 1113 final StreamEventTransformer _transformer; |
1099 EventTransformStream(Stream<S> source, | 1114 EventTransformStream(Stream<S> source, |
1100 StreamEventTransformer<S, T> transformer) | 1115 StreamEventTransformer<S, T> transformer) |
1101 : _source = source, _transformer = transformer; | 1116 : _source = source, _transformer = transformer; |
1102 | 1117 |
1103 StreamSubscription<T> listen(void onData(T data), | 1118 StreamSubscription<T> listen(void onData(T data), |
1104 { void onError(error), | 1119 { void onError(error), |
1105 void onDone(), | 1120 void onDone(), |
1106 bool cancelOnError }) { | 1121 bool cancelOnError }) { |
1107 if (onData == null) onData = _nullDataHandler; | |
1108 if (onError == null) onError = _nullErrorHandler; | |
1109 if (onDone == null) onDone = _nullDoneHandler; | |
1110 cancelOnError = identical(true, cancelOnError); | 1122 cancelOnError = identical(true, cancelOnError); |
1111 return new _EventTransformStreamSubscription(_source, _transformer, | 1123 return new _EventTransformStreamSubscription(_source, _transformer, |
1112 onData, onError, onDone, | 1124 onData, onError, onDone, |
1113 cancelOnError); | 1125 cancelOnError); |
1114 } | 1126 } |
1115 } | 1127 } |
1116 | 1128 |
1117 class _EventTransformStreamSubscription<S, T> | 1129 class _EventTransformStreamSubscription<S, T> |
1118 extends _BufferingStreamSubscription<T> { | 1130 extends _BaseStreamSubscription<T> |
| 1131 implements _EventOutputSink<T> { |
1119 /** The transformer used to transform events. */ | 1132 /** The transformer used to transform events. */ |
1120 final StreamEventTransformer<S, T> _transformer; | 1133 final StreamEventTransformer<S, T> _transformer; |
1121 | 1134 /** Whether to unsubscribe when emitting an error. */ |
| 1135 final bool _cancelOnError; |
1122 /** Whether this stream has sent a done event. */ | 1136 /** Whether this stream has sent a done event. */ |
1123 bool _isClosed = false; | 1137 bool _isClosed = false; |
1124 | |
1125 /** Source of incoming events. */ | 1138 /** Source of incoming events. */ |
1126 StreamSubscription<S> _subscription; | 1139 StreamSubscription<S> _subscription; |
1127 | |
1128 /** Cached EventSink wrapper for this class. */ | 1140 /** Cached EventSink wrapper for this class. */ |
1129 EventSink<T> _sink; | 1141 EventSink<T> _sink; |
1130 | 1142 |
1131 _EventTransformStreamSubscription(Stream<S> source, | 1143 _EventTransformStreamSubscription(Stream<S> source, |
1132 this._transformer, | 1144 this._transformer, |
1133 void onData(T data), | 1145 void onData(T data), |
1134 void onError(error), | 1146 void onError(error), |
1135 void onDone(), | 1147 void onDone(), |
1136 bool cancelOnError) | 1148 this._cancelOnError) |
1137 : super(onData, onError, onDone, cancelOnError) { | 1149 : super(onData, onError, onDone) { |
1138 _sink = new _EventSinkAdapter<T>(this); | 1150 _sink = new _EventOutputSinkWrapper<T>(this); |
1139 _subscription = source.listen(_handleData, | 1151 _subscription = source.listen(_handleData, |
1140 onError: _handleError, | 1152 onError: _handleError, |
1141 onDone: _handleDone); | 1153 onDone: _handleDone); |
1142 } | 1154 } |
1143 | 1155 |
1144 /** Whether this subscription is still subscribed to its source. */ | 1156 /** Whether this subscription is still subscribed to its source. */ |
1145 bool get _isSubscribed => _subscription != null; | 1157 bool get _isSubscribed => _subscription != null; |
1146 | 1158 |
1147 void _onPause() { | 1159 void pause([Future pauseSignal]) { |
1148 if (_isSubscribed) _subscription.pause(); | 1160 if (_isSubscribed) _subscription.pause(pauseSignal); |
1149 } | 1161 } |
1150 | 1162 |
1151 void _onResume() { | 1163 void resume() { |
1152 if (_isSubscribed) _subscription.resume(); | 1164 if (_isSubscribed) _subscription.resume(); |
1153 } | 1165 } |
1154 | 1166 |
1155 void _onCancel() { | 1167 bool get isPaused => _isSubscribed ? _subscription.isPaused : false; |
| 1168 |
| 1169 void cancel() { |
1156 if (_isSubscribed) { | 1170 if (_isSubscribed) { |
1157 StreamSubscription subscription = _subscription; | 1171 StreamSubscription subscription = _subscription; |
1158 _subscription = null; | 1172 _subscription = null; |
1159 subscription.cancel(); | 1173 subscription.cancel(); |
1160 } | 1174 } |
1161 _isClosed = true; | 1175 _isClosed = true; |
1162 } | 1176 } |
1163 | 1177 |
1164 void _handleData(S data) { | 1178 void _handleData(S data) { |
1165 try { | 1179 try { |
1166 _transformer.handleData(data, _sink); | 1180 _transformer.handleData(data, _sink); |
1167 } catch (e, s) { | 1181 } catch (e, s) { |
1168 _addError(_asyncError(e, s)); | 1182 _sendError(_asyncError(e, s)); |
1169 } | 1183 } |
1170 } | 1184 } |
1171 | 1185 |
1172 void _handleError(error) { | 1186 void _handleError(error) { |
1173 try { | 1187 try { |
1174 _transformer.handleError(error, _sink); | 1188 _transformer.handleError(error, _sink); |
1175 } catch (e, s) { | 1189 } catch (e, s) { |
1176 _addError(_asyncError(e, s)); | 1190 _sendError(_asyncError(e, s)); |
1177 } | 1191 } |
1178 } | 1192 } |
1179 | 1193 |
1180 void _handleDone() { | 1194 void _handleDone() { |
1181 try { | 1195 try { |
1182 _subscription = null; | 1196 _subscription = null; |
1183 _transformer.handleDone(_sink); | 1197 _transformer.handleDone(_sink); |
1184 } catch (e, s) { | 1198 } catch (e, s) { |
1185 _addError(_asyncError(e, s)); | 1199 _sendError(_asyncError(e, s)); |
1186 } | 1200 } |
1187 } | 1201 } |
| 1202 |
| 1203 // EventOutputSink interface. |
| 1204 void _sendData(T data) { |
| 1205 if (_isClosed) return; |
| 1206 _onData(data); |
| 1207 } |
| 1208 |
| 1209 void _sendError(error) { |
| 1210 if (_isClosed) return; |
| 1211 _onError(error); |
| 1212 if (_cancelOnError) { |
| 1213 cancel(); |
| 1214 } |
| 1215 } |
| 1216 |
| 1217 void _sendDone() { |
| 1218 if (_isClosed) throw new StateError("Already closed."); |
| 1219 _isClosed = true; |
| 1220 if (_isSubscribed) { |
| 1221 _subscription.cancel(); |
| 1222 _subscription = null; |
| 1223 } |
| 1224 _onDone(); |
| 1225 } |
1188 } | 1226 } |
1189 | 1227 |
1190 class _EventSinkAdapter<T> implements EventSink<T> { | 1228 class _EventOutputSinkWrapper<T> extends EventSink<T> { |
1191 _EventSink _sink; | 1229 _EventOutputSink _sink; |
1192 _EventSinkAdapter(this._sink); | 1230 _EventOutputSinkWrapper(this._sink); |
1193 | 1231 |
1194 void add(T data) { _sink._add(data); } | 1232 void add(T data) { _sink._sendData(data); } |
1195 void addError(error) { _sink._addError(error); } | 1233 void addError(error) { _sink._sendError(error); } |
1196 void close() { _sink._close(); } | 1234 void close() { _sink._sendDone(); } |
1197 } | 1235 } |
1198 | |
1199 | |
1200 /** | |
1201 * An [Iterable] like interface for the values of a [Stream]. | |
1202 * | |
1203 * This wraps a [Stream] and a subscription on the stream. It listens | |
1204 * on the stream, and completes the future returned by [moveNext] when the | |
1205 * next value becomes available. | |
1206 * | |
1207 * NOTICE: This is a tentative design. This class may change. | |
1208 */ | |
1209 abstract class StreamIterator<T> { | |
1210 | |
1211 /** Create a [StreamIterator] on [stream]. */ | |
1212 factory StreamIterator(Stream<T> stream) | |
1213 // TODO(lrn): use redirecting factory constructor when type | |
1214 // arguments are supported. | |
1215 => new _StreamIteratorImpl<T>(stream); | |
1216 | |
1217 /** | |
1218 * Wait for the next stream value to be available. | |
1219 * | |
1220 * It is not allowed to call this function again until the future has | |
1221 * completed. If the returned future completes with anything except `true`, | |
1222 * the iterator is done, and no new value will ever be available. | |
1223 * | |
1224 * The future may complete with an error, if the stream produces an error. | |
1225 */ | |
1226 Future<bool> moveNext(); | |
1227 | |
1228 /** | |
1229 * The current value of the stream. | |
1230 * | |
1231 * Only valid when the future returned by [moveNext] completes with `true` | |
1232 * as value, and only until the next call to [moveNext]. | |
1233 */ | |
1234 T get current; | |
1235 | |
1236 /** | |
1237 * Cancels the stream iterator (and the underlying stream subscription) early. | |
1238 * | |
1239 * The stream iterator is automatically canceled if the [moveNext] future | |
1240 * completes with either `false` or an error. | |
1241 * | |
1242 * If a [moveNext] call has been made, it will complete with `false` as value, | |
1243 * as will all further calls to [moveNext]. | |
1244 * | |
1245 * If you need to stop listening for values before the stream iterator is | |
1246 * automatically closed, you must call [cancel] to ensure that the stream | |
1247 * is properly closed. | |
1248 */ | |
1249 void cancel(); | |
1250 } | |
OLD | NEW |