Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 55 abstract class Stream<T> { | 55 abstract class Stream<T> { |
| 56 Stream(); | 56 Stream(); |
| 57 | 57 |
| 58 /** | 58 /** |
| 59 * Creates a new single-subscription stream from the future. | 59 * Creates a new single-subscription stream from the future. |
| 60 * | 60 * |
| 61 * When the future completes, the stream will fire one event, either | 61 * When the future completes, the stream will fire one event, either |
| 62 * data or error, and then close with a done-event. | 62 * data or error, and then close with a done-event. |
| 63 */ | 63 */ |
| 64 factory Stream.fromFuture(Future<T> future) { | 64 factory Stream.fromFuture(Future<T> future) { |
| 65 _StreamImpl<T> stream = new _SingleStreamImpl<T>(); | 65 StreamController<T> controller = new StreamController<T>(); |
| 66 future.then((value) { | 66 future.then((value) { |
| 67 stream._add(value); | 67 controller.add(value); |
| 68 stream._close(); | 68 controller.close(); |
| 69 }, | 69 }, |
| 70 onError: (error) { | 70 onError: (error) { |
| 71 stream._addError(error); | 71 controller.addError(error); |
| 72 stream._close(); | 72 controller.close(); |
| 73 }); | 73 }); |
| 74 return stream; | 74 return controller.stream; |
| 75 } | 75 } |
| 76 | 76 |
| 77 /** | 77 /** |
| 78 * Creates a single-subscription stream that gets its data from [data]. | 78 * Creates a single-subscription stream that gets its data from [data]. |
| 79 */ | 79 */ |
| 80 factory Stream.fromIterable(Iterable<T> data) { | 80 factory Stream.fromIterable(Iterable<T> data) { |
| 81 _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data); | 81 return new _GeneratedStreamImpl<T>( |
| 82 return new _GeneratedSingleStreamImpl<T>(iterableEvents); | 82 () => new _IterablePendingEvents<T>(data)); |
| 83 } | 83 } |
| 84 | 84 |
| 85 /** | 85 /** |
| 86 * Creates a stream that repeatedly emits events at [period] intervals. | 86 * Creates a stream that repeatedly emits events at [period] intervals. |
| 87 * | 87 * |
| 88 * The event values are computed by invoking [computation]. The argument to | 88 * The event values are computed by invoking [computation]. The argument to |
| 89 * this callback is an integer that starts with 0 and is incremented for | 89 * this callback is an integer that starts with 0 and is incremented for |
| 90 * every event. | 90 * every event. |
| 91 * | 91 * |
| 92 * If [computation] is omitted the event values will all be `null`. | 92 * If [computation] is omitted the event values will all be `null`. |
| (...skipping 935 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1028 * someTypeStream.transform(new DoubleTransformer<Type>()); | 1028 * someTypeStream.transform(new DoubleTransformer<Type>()); |
| 1029 * | 1029 * |
| 1030 * The default implementations of the "handle" methods forward | 1030 * The default implementations of the "handle" methods forward |
| 1031 * the events unmodified. If using the default [handleData] the generic type [T] | 1031 * the events unmodified. If using the default [handleData] the generic type [T] |
| 1032 * needs to be assignable to [S]. | 1032 * needs to be assignable to [S]. |
| 1033 */ | 1033 */ |
| 1034 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { | 1034 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { |
| 1035 const StreamEventTransformer(); | 1035 const StreamEventTransformer(); |
| 1036 | 1036 |
| 1037 Stream<T> bind(Stream<S> source) { | 1037 Stream<T> bind(Stream<S> source) { |
| 1038 // Hackish way of buffering data that goes out of the event-transformer. | 1038 return new EventTransformStream<S, T>(source, this); |
| 1039 // TODO(floitsch): replace this with a correct solution. | |
| 1040 Stream transformingStream = new EventTransformStream<S, T>(source, this); | |
| 1041 StreamController controller; | |
| 1042 StreamSubscription subscription; | |
| 1043 controller = new StreamController<T>( | |
| 1044 onListen: () { | |
| 1045 subscription = transformingStream.listen( | |
| 1046 controller.add, | |
| 1047 onError: controller.addError, | |
| 1048 onDone: controller.close); | |
| 1049 }, | |
| 1050 onPause: () => subscription.pause(), | |
| 1051 onResume: () => subscription.resume(), | |
| 1052 onCancel: () => subscription.cancel()); | |
| 1053 return controller.stream; | |
| 1054 } | 1039 } |
| 1055 | 1040 |
| 1056 /** | 1041 /** |
| 1057 * Act on incoming data event. | 1042 * Act on incoming data event. |
| 1058 * | 1043 * |
| 1059 * The method may generate any number of events on the sink, but should | 1044 * The method may generate any number of events on the sink, but should |
| 1060 * not throw. | 1045 * not throw. |
| 1061 */ | 1046 */ |
| 1062 void handleData(S event, EventSink<T> sink) { | 1047 void handleData(S event, EventSink<T> sink) { |
| 1063 var data = event; | 1048 var data = event; |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1098 final Stream<S> _source; | 1083 final Stream<S> _source; |
| 1099 final StreamEventTransformer _transformer; | 1084 final StreamEventTransformer _transformer; |
| 1100 EventTransformStream(Stream<S> source, | 1085 EventTransformStream(Stream<S> source, |
| 1101 StreamEventTransformer<S, T> transformer) | 1086 StreamEventTransformer<S, T> transformer) |
| 1102 : _source = source, _transformer = transformer; | 1087 : _source = source, _transformer = transformer; |
| 1103 | 1088 |
| 1104 StreamSubscription<T> listen(void onData(T data), | 1089 StreamSubscription<T> listen(void onData(T data), |
| 1105 { void onError(error), | 1090 { void onError(error), |
| 1106 void onDone(), | 1091 void onDone(), |
| 1107 bool cancelOnError }) { | 1092 bool cancelOnError }) { |
| 1093 if (onData == null) onData = _nullDataHandler; | |
| 1094 if (onError == null) onError = _nullErrorHandler; | |
| 1095 if (onDone == null) onDone = _nullDoneHandler; | |
| 1108 cancelOnError = identical(true, cancelOnError); | 1096 cancelOnError = identical(true, cancelOnError); |
| 1109 return new _EventTransformStreamSubscription(_source, _transformer, | 1097 return new _EventTransformStreamSubscription(_source, _transformer, |
| 1110 onData, onError, onDone, | 1098 onData, onError, onDone, |
| 1111 cancelOnError); | 1099 cancelOnError); |
| 1112 } | 1100 } |
| 1113 } | 1101 } |
| 1114 | 1102 |
| 1115 class _EventTransformStreamSubscription<S, T> | 1103 class _EventTransformStreamSubscription<S, T> |
| 1116 extends _BaseStreamSubscription<T> | 1104 extends _BufferingStreamSubscription<T> { |
| 1117 implements _EventOutputSink<T> { | |
| 1118 /** The transformer used to transform events. */ | 1105 /** The transformer used to transform events. */ |
| 1119 final StreamEventTransformer<S, T> _transformer; | 1106 final StreamEventTransformer<S, T> _transformer; |
| 1120 /** Whether to unsubscribe when emitting an error. */ | 1107 |
| 1121 final bool _cancelOnError; | |
| 1122 /** Whether this stream has sent a done event. */ | 1108 /** Whether this stream has sent a done event. */ |
| 1123 bool _isClosed = false; | 1109 bool _isClosed = false; |
| 1110 | |
| 1124 /** Source of incoming events. */ | 1111 /** Source of incoming events. */ |
| 1125 StreamSubscription<S> _subscription; | 1112 StreamSubscription<S> _subscription; |
| 1113 | |
| 1126 /** Cached EventSink wrapper for this class. */ | 1114 /** Cached EventSink wrapper for this class. */ |
| 1127 EventSink<T> _sink; | 1115 EventSink<T> _sink; |
| 1128 | 1116 |
| 1129 _EventTransformStreamSubscription(Stream<S> source, | 1117 _EventTransformStreamSubscription(Stream<S> source, |
| 1130 this._transformer, | 1118 this._transformer, |
| 1131 void onData(T data), | 1119 void onData(T data), |
| 1132 void onError(error), | 1120 void onError(error), |
| 1133 void onDone(), | 1121 void onDone(), |
| 1134 this._cancelOnError) | 1122 bool cancelOnError) |
| 1135 : super(onData, onError, onDone) { | 1123 : super(onData, onError, onDone, cancelOnError) { |
| 1136 _sink = new _EventOutputSinkWrapper<T>(this); | 1124 _sink = new _EventSinkAdapter<T>(this); |
| 1137 _subscription = source.listen(_handleData, | 1125 _subscription = source.listen(_handleData, |
| 1138 onError: _handleError, | 1126 onError: _handleError, |
| 1139 onDone: _handleDone); | 1127 onDone: _handleDone); |
| 1140 } | 1128 } |
| 1141 | 1129 |
| 1142 /** Whether this subscription is still subscribed to its source. */ | 1130 /** Whether this subscription is still subscribed to its source. */ |
| 1143 bool get _isSubscribed => _subscription != null; | 1131 bool get _isSubscribed => _subscription != null; |
| 1144 | 1132 |
| 1145 void pause([Future pauseSignal]) { | 1133 void _onPause() { |
| 1146 if (_isSubscribed) _subscription.pause(pauseSignal); | 1134 if (_isSubscribed) _subscription.pause(); |
| 1147 } | 1135 } |
| 1148 | 1136 |
| 1149 void resume() { | 1137 void _onResume() { |
| 1150 if (_isSubscribed) _subscription.resume(); | 1138 if (_isSubscribed) _subscription.resume(); |
| 1151 } | 1139 } |
| 1152 | 1140 |
| 1153 bool get isPaused => _isSubscribed ? _subscription.isPaused : false; | 1141 void _onCancel() { |
| 1154 | |
| 1155 void cancel() { | |
| 1156 if (_isSubscribed) { | 1142 if (_isSubscribed) { |
| 1157 StreamSubscription subscription = _subscription; | 1143 StreamSubscription subscription = _subscription; |
| 1158 _subscription = null; | 1144 _subscription = null; |
| 1159 subscription.cancel(); | 1145 subscription.cancel(); |
| 1160 } | 1146 } |
| 1161 _isClosed = true; | 1147 _isClosed = true; |
| 1162 } | 1148 } |
| 1163 | 1149 |
| 1164 void _handleData(S data) { | 1150 void _handleData(S data) { |
| 1165 try { | 1151 try { |
| 1166 _transformer.handleData(data, _sink); | 1152 _transformer.handleData(data, _sink); |
| 1167 } catch (e, s) { | 1153 } catch (e, s) { |
| 1168 _sendError(_asyncError(e, s)); | 1154 _addError(_asyncError(e, s)); |
| 1169 } | 1155 } |
| 1170 } | 1156 } |
| 1171 | 1157 |
| 1172 void _handleError(error) { | 1158 void _handleError(error) { |
| 1173 try { | 1159 try { |
| 1174 _transformer.handleError(error, _sink); | 1160 _transformer.handleError(error, _sink); |
| 1175 } catch (e, s) { | 1161 } catch (e, s) { |
| 1176 _sendError(_asyncError(e, s)); | 1162 _addError(_asyncError(e, s)); |
| 1177 } | 1163 } |
| 1178 } | 1164 } |
| 1179 | 1165 |
| 1180 void _handleDone() { | 1166 void _handleDone() { |
| 1181 try { | 1167 try { |
| 1182 _subscription = null; | 1168 _subscription = null; |
| 1183 _transformer.handleDone(_sink); | 1169 _transformer.handleDone(_sink); |
| 1184 } catch (e, s) { | 1170 } catch (e, s) { |
| 1185 _sendError(_asyncError(e, s)); | 1171 _addError(_asyncError(e, s)); |
| 1186 } | 1172 } |
| 1187 } | 1173 } |
| 1188 | |
| 1189 // EventOutputSink interface. | |
| 1190 void _sendData(T data) { | |
| 1191 if (_isClosed) return; | |
| 1192 _onData(data); | |
| 1193 } | |
| 1194 | |
| 1195 void _sendError(error) { | |
| 1196 if (_isClosed) return; | |
| 1197 _onError(error); | |
| 1198 if (_cancelOnError) { | |
| 1199 cancel(); | |
| 1200 } | |
| 1201 } | |
| 1202 | |
| 1203 void _sendDone() { | |
| 1204 if (_isClosed) throw new StateError("Already closed."); | |
| 1205 _isClosed = true; | |
| 1206 if (_isSubscribed) { | |
| 1207 _subscription.cancel(); | |
| 1208 _subscription = null; | |
| 1209 } | |
| 1210 _onDone(); | |
| 1211 } | |
| 1212 } | 1174 } |
| 1213 | 1175 |
| 1214 class _EventOutputSinkWrapper<T> extends EventSink<T> { | 1176 class _EventSinkAdapter<T> implements EventSink<T> { |
| 1215 _EventOutputSink _sink; | 1177 _EventSink _sink; |
| 1216 _EventOutputSinkWrapper(this._sink); | 1178 _EventSinkAdapter(this._sink); |
| 1217 | 1179 |
| 1218 void add(T data) { _sink._sendData(data); } | 1180 void add(T data) { _sink._add(data); } |
| 1219 void addError(error) { _sink._sendError(error); } | 1181 void addError(error) { _sink._addError(error); } |
| 1220 void close() { _sink._sendDone(); } | 1182 void close() { _sink._close(); } |
| 1221 } | 1183 } |
| 1184 | |
| 1185 | |
| 1186 /** | |
| 1187 * An [Iterable] like interface for the values of a [Stream]. | |
| 1188 * | |
| 1189 * This wraps a [Stream] and a subscription on the stream. It listens | |
| 1190 * on the stream, and completes the future returned by [moveNext] when the | |
| 1191 * next value becomes available. | |
| 1192 */ | |
| 1193 abstract class StreamIterator<T> { | |
| 1194 | |
| 1195 /** Create a [StreamIterator] on [stream]. */ | |
| 1196 factory StreamIterator(Stream<T> stream) | |
| 1197 // TODO(lrn): use redirecting factory constructor when type | |
| 1198 // arguments are supported. | |
| 1199 => new _StreamIteratorImpl<T>(stream); | |
| 1200 | |
| 1201 /** | |
| 1202 * Wait for the next stream value to be available. | |
| 1203 * | |
| 1204 * It is not allowed to call this function again until the future has | |
| 1205 * completed. If the returned future completes with anything except `true`, | |
| 1206 * the iterator is done, and no new value will ever be available. | |
| 1207 * | |
| 1208 * The future may complete with an error, if the stream produces an error. | |
| 1209 */ | |
| 1210 Future<bool> moveNext(); | |
| 1211 | |
| 1212 /** | |
| 1213 * The current value of the stream. | |
| 1214 * | |
| 1215 * Only valid when the future returned by [moveNext] completes with `true` | |
|
floitsch
2013/05/24 15:53:17
"valid" or non-null?
| |
| 1216 * as value, and only until the next call to [moveNext]. | |
| 1217 */ | |
| 1218 T get current; | |
| 1219 | |
| 1220 /** | |
| 1221 * Cancels the stream iterator (and the underlying stream subscription) early. | |
| 1222 * | |
| 1223 * The stream iterator is automatically cancelled if the [moveNext] future | |
| 1224 * completes with either `false` or an error. | |
| 1225 * | |
| 1226 * If a [moveNext] call has been made, it will complete with `false` as value, | |
| 1227 * as will all further calls to [moveNext]. | |
| 1228 * | |
| 1229 * If you need to stop listening for values before the stream iterator is | |
| 1230 * automatically closed, you must call [cancel] to ensure that the stream | |
| 1231 * is properly closed. | |
| 1232 */ | |
| 1233 void cancel(); | |
| 1234 } | |
| OLD | NEW |