Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(364)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 14753009: Make StreamSubscription be the active part of a stream. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Remove remaining debugging prints. Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
55 abstract class Stream<T> { 55 abstract class Stream<T> {
56 Stream(); 56 Stream();
57 57
58 /** 58 /**
59 * Creates a new single-subscription stream from the future. 59 * Creates a new single-subscription stream from the future.
60 * 60 *
61 * When the future completes, the stream will fire one event, either 61 * When the future completes, the stream will fire one event, either
62 * data or error, and then close with a done-event. 62 * data or error, and then close with a done-event.
63 */ 63 */
64 factory Stream.fromFuture(Future<T> future) { 64 factory Stream.fromFuture(Future<T> future) {
65 _StreamImpl<T> stream = new _SingleStreamImpl<T>(); 65 StreamController<T> controller = new StreamController<T>();
66 future.then((value) { 66 future.then((value) {
67 stream._add(value); 67 controller.add(value);
68 stream._close(); 68 controller.close();
69 }, 69 },
70 onError: (error) { 70 onError: (error) {
71 stream._addError(error); 71 controller.addError(error);
72 stream._close(); 72 controller.close();
73 }); 73 });
74 return stream; 74 return controller.stream;
75 } 75 }
76 76
77 /** 77 /**
78 * Creates a single-subscription stream that gets its data from [data]. 78 * Creates a single-subscription stream that gets its data from [data].
79 */ 79 */
80 factory Stream.fromIterable(Iterable<T> data) { 80 factory Stream.fromIterable(Iterable<T> data) {
81 _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data); 81 return new _GeneratedStreamImpl<T>(
82 return new _GeneratedSingleStreamImpl<T>(iterableEvents); 82 () => new _IterablePendingEvents<T>(data));
83 } 83 }
84 84
85 /** 85 /**
86 * Creates a stream that repeatedly emits events at [period] intervals. 86 * Creates a stream that repeatedly emits events at [period] intervals.
87 * 87 *
88 * The event values are computed by invoking [computation]. The argument to 88 * The event values are computed by invoking [computation]. The argument to
89 * this callback is an integer that starts with 0 and is incremented for 89 * this callback is an integer that starts with 0 and is incremented for
90 * every event. 90 * every event.
91 * 91 *
92 * If [computation] is omitted the event values will all be `null`. 92 * If [computation] is omitted the event values will all be `null`.
(...skipping 935 matching lines...) Expand 10 before | Expand all | Expand 10 after
1028 * someTypeStream.transform(new DoubleTransformer<Type>()); 1028 * someTypeStream.transform(new DoubleTransformer<Type>());
1029 * 1029 *
1030 * The default implementations of the "handle" methods forward 1030 * The default implementations of the "handle" methods forward
1031 * the events unmodified. If using the default [handleData] the generic type [T] 1031 * the events unmodified. If using the default [handleData] the generic type [T]
1032 * needs to be assignable to [S]. 1032 * needs to be assignable to [S].
1033 */ 1033 */
1034 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { 1034 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> {
1035 const StreamEventTransformer(); 1035 const StreamEventTransformer();
1036 1036
1037 Stream<T> bind(Stream<S> source) { 1037 Stream<T> bind(Stream<S> source) {
1038 // Hackish way of buffering data that goes out of the event-transformer. 1038 return new EventTransformStream<S, T>(source, this);
1039 // TODO(floitsch): replace this with a correct solution.
1040 Stream transformingStream = new EventTransformStream<S, T>(source, this);
1041 StreamController controller;
1042 StreamSubscription subscription;
1043 controller = new StreamController<T>(
1044 onListen: () {
1045 subscription = transformingStream.listen(
1046 controller.add,
1047 onError: controller.addError,
1048 onDone: controller.close);
1049 },
1050 onPause: () => subscription.pause(),
1051 onResume: () => subscription.resume(),
1052 onCancel: () => subscription.cancel());
1053 return controller.stream;
1054 } 1039 }
1055 1040
1056 /** 1041 /**
1057 * Act on incoming data event. 1042 * Act on incoming data event.
1058 * 1043 *
1059 * The method may generate any number of events on the sink, but should 1044 * The method may generate any number of events on the sink, but should
1060 * not throw. 1045 * not throw.
1061 */ 1046 */
1062 void handleData(S event, EventSink<T> sink) { 1047 void handleData(S event, EventSink<T> sink) {
1063 var data = event; 1048 var data = event;
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
1098 final Stream<S> _source; 1083 final Stream<S> _source;
1099 final StreamEventTransformer _transformer; 1084 final StreamEventTransformer _transformer;
1100 EventTransformStream(Stream<S> source, 1085 EventTransformStream(Stream<S> source,
1101 StreamEventTransformer<S, T> transformer) 1086 StreamEventTransformer<S, T> transformer)
1102 : _source = source, _transformer = transformer; 1087 : _source = source, _transformer = transformer;
1103 1088
1104 StreamSubscription<T> listen(void onData(T data), 1089 StreamSubscription<T> listen(void onData(T data),
1105 { void onError(error), 1090 { void onError(error),
1106 void onDone(), 1091 void onDone(),
1107 bool cancelOnError }) { 1092 bool cancelOnError }) {
1093 if (onData == null) onData = _nullDataHandler;
1094 if (onError == null) onError = _nullErrorHandler;
1095 if (onDone == null) onDone = _nullDoneHandler;
1108 cancelOnError = identical(true, cancelOnError); 1096 cancelOnError = identical(true, cancelOnError);
1109 return new _EventTransformStreamSubscription(_source, _transformer, 1097 return new _EventTransformStreamSubscription(_source, _transformer,
1110 onData, onError, onDone, 1098 onData, onError, onDone,
1111 cancelOnError); 1099 cancelOnError);
1112 } 1100 }
1113 } 1101 }
1114 1102
1115 class _EventTransformStreamSubscription<S, T> 1103 class _EventTransformStreamSubscription<S, T>
1116 extends _BaseStreamSubscription<T> 1104 extends _BufferingStreamSubscription<T> {
1117 implements _EventOutputSink<T> {
1118 /** The transformer used to transform events. */ 1105 /** The transformer used to transform events. */
1119 final StreamEventTransformer<S, T> _transformer; 1106 final StreamEventTransformer<S, T> _transformer;
1120 /** Whether to unsubscribe when emitting an error. */ 1107
1121 final bool _cancelOnError;
1122 /** Whether this stream has sent a done event. */ 1108 /** Whether this stream has sent a done event. */
1123 bool _isClosed = false; 1109 bool _isClosed = false;
1110
1124 /** Source of incoming events. */ 1111 /** Source of incoming events. */
1125 StreamSubscription<S> _subscription; 1112 StreamSubscription<S> _subscription;
1113
1126 /** Cached EventSink wrapper for this class. */ 1114 /** Cached EventSink wrapper for this class. */
1127 EventSink<T> _sink; 1115 EventSink<T> _sink;
1128 1116
1129 _EventTransformStreamSubscription(Stream<S> source, 1117 _EventTransformStreamSubscription(Stream<S> source,
1130 this._transformer, 1118 this._transformer,
1131 void onData(T data), 1119 void onData(T data),
1132 void onError(error), 1120 void onError(error),
1133 void onDone(), 1121 void onDone(),
1134 this._cancelOnError) 1122 bool cancelOnError)
1135 : super(onData, onError, onDone) { 1123 : super(onData, onError, onDone, cancelOnError) {
1136 _sink = new _EventOutputSinkWrapper<T>(this); 1124 _sink = new _EventSinkAdapter<T>(this);
1137 _subscription = source.listen(_handleData, 1125 _subscription = source.listen(_handleData,
1138 onError: _handleError, 1126 onError: _handleError,
1139 onDone: _handleDone); 1127 onDone: _handleDone);
1140 } 1128 }
1141 1129
1142 /** Whether this subscription is still subscribed to its source. */ 1130 /** Whether this subscription is still subscribed to its source. */
1143 bool get _isSubscribed => _subscription != null; 1131 bool get _isSubscribed => _subscription != null;
1144 1132
1145 void pause([Future pauseSignal]) { 1133 void _onPause() {
1146 if (_isSubscribed) _subscription.pause(pauseSignal); 1134 if (_isSubscribed) _subscription.pause();
1147 } 1135 }
1148 1136
1149 void resume() { 1137 void _onResume() {
1150 if (_isSubscribed) _subscription.resume(); 1138 if (_isSubscribed) _subscription.resume();
1151 } 1139 }
1152 1140
1153 bool get isPaused => _isSubscribed ? _subscription.isPaused : false; 1141 void _onCancel() {
1154
1155 void cancel() {
1156 if (_isSubscribed) { 1142 if (_isSubscribed) {
1157 StreamSubscription subscription = _subscription; 1143 StreamSubscription subscription = _subscription;
1158 _subscription = null; 1144 _subscription = null;
1159 subscription.cancel(); 1145 subscription.cancel();
1160 } 1146 }
1161 _isClosed = true; 1147 _isClosed = true;
1162 } 1148 }
1163 1149
1164 void _handleData(S data) { 1150 void _handleData(S data) {
1165 try { 1151 try {
1166 _transformer.handleData(data, _sink); 1152 _transformer.handleData(data, _sink);
1167 } catch (e, s) { 1153 } catch (e, s) {
1168 _sendError(_asyncError(e, s)); 1154 _addError(_asyncError(e, s));
1169 } 1155 }
1170 } 1156 }
1171 1157
1172 void _handleError(error) { 1158 void _handleError(error) {
1173 try { 1159 try {
1174 _transformer.handleError(error, _sink); 1160 _transformer.handleError(error, _sink);
1175 } catch (e, s) { 1161 } catch (e, s) {
1176 _sendError(_asyncError(e, s)); 1162 _addError(_asyncError(e, s));
1177 } 1163 }
1178 } 1164 }
1179 1165
1180 void _handleDone() { 1166 void _handleDone() {
1181 try { 1167 try {
1182 _subscription = null; 1168 _subscription = null;
1183 _transformer.handleDone(_sink); 1169 _transformer.handleDone(_sink);
1184 } catch (e, s) { 1170 } catch (e, s) {
1185 _sendError(_asyncError(e, s)); 1171 _addError(_asyncError(e, s));
1186 } 1172 }
1187 } 1173 }
1188
1189 // EventOutputSink interface.
1190 void _sendData(T data) {
1191 if (_isClosed) return;
1192 _onData(data);
1193 }
1194
1195 void _sendError(error) {
1196 if (_isClosed) return;
1197 _onError(error);
1198 if (_cancelOnError) {
1199 cancel();
1200 }
1201 }
1202
1203 void _sendDone() {
1204 if (_isClosed) throw new StateError("Already closed.");
1205 _isClosed = true;
1206 if (_isSubscribed) {
1207 _subscription.cancel();
1208 _subscription = null;
1209 }
1210 _onDone();
1211 }
1212 } 1174 }
1213 1175
1214 class _EventOutputSinkWrapper<T> extends EventSink<T> { 1176 class _EventSinkAdapter<T> implements EventSink<T> {
1215 _EventOutputSink _sink; 1177 _EventSink _sink;
1216 _EventOutputSinkWrapper(this._sink); 1178 _EventSinkAdapter(this._sink);
1217 1179
1218 void add(T data) { _sink._sendData(data); } 1180 void add(T data) { _sink._add(data); }
1219 void addError(error) { _sink._sendError(error); } 1181 void addError(error) { _sink._addError(error); }
1220 void close() { _sink._sendDone(); } 1182 void close() { _sink._close(); }
1221 } 1183 }
1184
1185
1186 /**
1187 * An [Iterable] like interface for the values of a [Stream].
1188 *
1189 * This wraps a [Stream] and a subscription on the stream. It listens
1190 * on the stream, and completes the future returned by [moveNext] when the
1191 * next value becomes available.
1192 */
1193 abstract class StreamIterator<T> {
1194
1195 /** Create a [StreamIterator] on [stream]. */
1196 factory StreamIterator(Stream<T> stream)
1197 // TODO(lrn): use redirecting factory constructor when type
1198 // arguments are supported.
1199 => new _StreamIteratorImpl<T>(stream);
1200
1201 /**
1202 * Wait for the next stream value to be available.
1203 *
1204 * It is not allowed to call this function again until the future has
1205 * completed. If the returned future completes with anything except `true`,
1206 * the iterator is done, and no new value will ever be available.
1207 *
1208 * The future may complete with an error, if the stream produces an error.
1209 */
1210 Future<bool> moveNext();
1211
1212 /**
1213 * The current value of the stream.
1214 *
1215 * Only valid when the future returned by [moveNext] completes with `true`
floitsch 2013/05/24 15:53:17 "valid" or non-null?
1216 * as value, and only until the next call to [moveNext].
1217 */
1218 T get current;
1219
1220 /**
1221 * Cancels the stream iterator (and the underlying stream subscription) early.
1222 *
1223 * The stream iterator is automatically cancelled if the [moveNext] future
1224 * completes with either `false` or an error.
1225 *
1226 * If a [moveNext] call has been made, it will complete with `false` as value,
1227 * as will all further calls to [moveNext].
1228 *
1229 * If you need to stop listening for values before the stream iterator is
1230 * automatically closed, you must call [cancel] to ensure that the stream
1231 * is properly closed.
1232 */
1233 void cancel();
1234 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698