Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(73)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 14753009: Make StreamSubscription be the active part of a stream. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/_internal/pub/test/error_group_test.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
55 abstract class Stream<T> { 55 abstract class Stream<T> {
56 Stream(); 56 Stream();
57 57
58 /** 58 /**
59 * Creates a new single-subscription stream from the future. 59 * Creates a new single-subscription stream from the future.
60 * 60 *
61 * When the future completes, the stream will fire one event, either 61 * When the future completes, the stream will fire one event, either
62 * data or error, and then close with a done-event. 62 * data or error, and then close with a done-event.
63 */ 63 */
64 factory Stream.fromFuture(Future<T> future) { 64 factory Stream.fromFuture(Future<T> future) {
65 _StreamImpl<T> stream = new _SingleStreamImpl<T>(); 65 StreamController<T> controller = new StreamController<T>();
66 future.then((value) { 66 future.then((value) {
67 stream._add(value); 67 controller.add(value);
68 stream._close(); 68 controller.close();
69 }, 69 },
70 onError: (error) { 70 onError: (error) {
71 stream._addError(error); 71 controller.addError(error);
72 stream._close(); 72 controller.close();
73 }); 73 });
74 return stream; 74 return controller.stream;
75 } 75 }
76 76
77 /** 77 /**
78 * Creates a single-subscription stream that gets its data from [data]. 78 * Creates a single-subscription stream that gets its data from [data].
79 */ 79 */
80 factory Stream.fromIterable(Iterable<T> data) { 80 factory Stream.fromIterable(Iterable<T> data) {
81 _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data); 81 return new _GeneratedStreamImpl<T>(
82 return new _GeneratedSingleStreamImpl<T>(iterableEvents); 82 () => new _IterablePendingEvents<T>(data));
83 } 83 }
84 84
85 /** 85 /**
86 * Creates a stream that repeatedly emits events at [period] intervals. 86 * Creates a stream that repeatedly emits events at [period] intervals.
87 * 87 *
88 * The event values are computed by invoking [computation]. The argument to 88 * The event values are computed by invoking [computation]. The argument to
89 * this callback is an integer that starts with 0 and is incremented for 89 * this callback is an integer that starts with 0 and is incremented for
90 * every event. 90 * every event.
91 * 91 *
92 * If [computation] is omitted the event values will all be `null`. 92 * If [computation] is omitted the event values will all be `null`.
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
145 * Reports whether this stream is a broadcast stream. 145 * Reports whether this stream is a broadcast stream.
146 */ 146 */
147 bool get isBroadcast => false; 147 bool get isBroadcast => false;
148 148
149 /** 149 /**
150 * Returns a multi-subscription stream that produces the same events as this. 150 * Returns a multi-subscription stream that produces the same events as this.
151 * 151 *
152 * If this stream is single-subscription, return a new stream that allows 152 * If this stream is single-subscription, return a new stream that allows
153 * multiple subscribers. It will subscribe to this stream when its first 153 * multiple subscribers. It will subscribe to this stream when its first
154 * subscriber is added, and unsubscribe again when the last subscription is 154 * subscriber is added, and unsubscribe again when the last subscription is
155 * cancelled. 155 * canceled.
156 * 156 *
157 * If this stream is already a broadcast stream, it is returned unmodified. 157 * If this stream is already a broadcast stream, it is returned unmodified.
158 */ 158 */
159 Stream<T> asBroadcastStream() { 159 Stream<T> asBroadcastStream() {
160 if (isBroadcast) return this; 160 if (isBroadcast) return this;
161 return new _SingleStreamMultiplexer<T>(this); 161 return new _SingleStreamMultiplexer<T>(this);
162 } 162 }
163 163
164 /** 164 /**
165 * Adds a subscription to this stream. 165 * Adds a subscription to this stream.
(...skipping 876 matching lines...) Expand 10 before | Expand all | Expand 10 after
1042 * someTypeStream.transform(new DoubleTransformer<Type>()); 1042 * someTypeStream.transform(new DoubleTransformer<Type>());
1043 * 1043 *
1044 * The default implementations of the "handle" methods forward 1044 * The default implementations of the "handle" methods forward
1045 * the events unmodified. If using the default [handleData] the generic type [T] 1045 * the events unmodified. If using the default [handleData] the generic type [T]
1046 * needs to be assignable to [S]. 1046 * needs to be assignable to [S].
1047 */ 1047 */
1048 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { 1048 abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> {
1049 const StreamEventTransformer(); 1049 const StreamEventTransformer();
1050 1050
1051 Stream<T> bind(Stream<S> source) { 1051 Stream<T> bind(Stream<S> source) {
1052 // Hackish way of buffering data that goes out of the event-transformer. 1052 return new EventTransformStream<S, T>(source, this);
1053 // TODO(floitsch): replace this with a correct solution.
1054 Stream transformingStream = new EventTransformStream<S, T>(source, this);
1055 StreamController controller;
1056 StreamSubscription subscription;
1057 controller = new StreamController<T>(
1058 onListen: () {
1059 subscription = transformingStream.listen(
1060 controller.add,
1061 onError: controller.addError,
1062 onDone: controller.close);
1063 },
1064 onPause: () => subscription.pause(),
1065 onResume: () => subscription.resume(),
1066 onCancel: () => subscription.cancel());
1067 return controller.stream;
1068 } 1053 }
1069 1054
1070 /** 1055 /**
1071 * Act on incoming data event. 1056 * Act on incoming data event.
1072 * 1057 *
1073 * The method may generate any number of events on the sink, but should 1058 * The method may generate any number of events on the sink, but should
1074 * not throw. 1059 * not throw.
1075 */ 1060 */
1076 void handleData(S event, EventSink<T> sink) { 1061 void handleData(S event, EventSink<T> sink) {
1077 var data = event; 1062 var data = event;
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
1112 final Stream<S> _source; 1097 final Stream<S> _source;
1113 final StreamEventTransformer _transformer; 1098 final StreamEventTransformer _transformer;
1114 EventTransformStream(Stream<S> source, 1099 EventTransformStream(Stream<S> source,
1115 StreamEventTransformer<S, T> transformer) 1100 StreamEventTransformer<S, T> transformer)
1116 : _source = source, _transformer = transformer; 1101 : _source = source, _transformer = transformer;
1117 1102
1118 StreamSubscription<T> listen(void onData(T data), 1103 StreamSubscription<T> listen(void onData(T data),
1119 { void onError(error), 1104 { void onError(error),
1120 void onDone(), 1105 void onDone(),
1121 bool cancelOnError }) { 1106 bool cancelOnError }) {
1107 if (onData == null) onData = _nullDataHandler;
1108 if (onError == null) onError = _nullErrorHandler;
1109 if (onDone == null) onDone = _nullDoneHandler;
1122 cancelOnError = identical(true, cancelOnError); 1110 cancelOnError = identical(true, cancelOnError);
1123 return new _EventTransformStreamSubscription(_source, _transformer, 1111 return new _EventTransformStreamSubscription(_source, _transformer,
1124 onData, onError, onDone, 1112 onData, onError, onDone,
1125 cancelOnError); 1113 cancelOnError);
1126 } 1114 }
1127 } 1115 }
1128 1116
1129 class _EventTransformStreamSubscription<S, T> 1117 class _EventTransformStreamSubscription<S, T>
1130 extends _BaseStreamSubscription<T> 1118 extends _BufferingStreamSubscription<T> {
1131 implements _EventOutputSink<T> {
1132 /** The transformer used to transform events. */ 1119 /** The transformer used to transform events. */
1133 final StreamEventTransformer<S, T> _transformer; 1120 final StreamEventTransformer<S, T> _transformer;
1134 /** Whether to unsubscribe when emitting an error. */ 1121
1135 final bool _cancelOnError;
1136 /** Whether this stream has sent a done event. */ 1122 /** Whether this stream has sent a done event. */
1137 bool _isClosed = false; 1123 bool _isClosed = false;
1124
1138 /** Source of incoming events. */ 1125 /** Source of incoming events. */
1139 StreamSubscription<S> _subscription; 1126 StreamSubscription<S> _subscription;
1127
1140 /** Cached EventSink wrapper for this class. */ 1128 /** Cached EventSink wrapper for this class. */
1141 EventSink<T> _sink; 1129 EventSink<T> _sink;
1142 1130
1143 _EventTransformStreamSubscription(Stream<S> source, 1131 _EventTransformStreamSubscription(Stream<S> source,
1144 this._transformer, 1132 this._transformer,
1145 void onData(T data), 1133 void onData(T data),
1146 void onError(error), 1134 void onError(error),
1147 void onDone(), 1135 void onDone(),
1148 this._cancelOnError) 1136 bool cancelOnError)
1149 : super(onData, onError, onDone) { 1137 : super(onData, onError, onDone, cancelOnError) {
1150 _sink = new _EventOutputSinkWrapper<T>(this); 1138 _sink = new _EventSinkAdapter<T>(this);
1151 _subscription = source.listen(_handleData, 1139 _subscription = source.listen(_handleData,
1152 onError: _handleError, 1140 onError: _handleError,
1153 onDone: _handleDone); 1141 onDone: _handleDone);
1154 } 1142 }
1155 1143
1156 /** Whether this subscription is still subscribed to its source. */ 1144 /** Whether this subscription is still subscribed to its source. */
1157 bool get _isSubscribed => _subscription != null; 1145 bool get _isSubscribed => _subscription != null;
1158 1146
1159 void pause([Future pauseSignal]) { 1147 void _onPause() {
1160 if (_isSubscribed) _subscription.pause(pauseSignal); 1148 if (_isSubscribed) _subscription.pause();
1161 } 1149 }
1162 1150
1163 void resume() { 1151 void _onResume() {
1164 if (_isSubscribed) _subscription.resume(); 1152 if (_isSubscribed) _subscription.resume();
1165 } 1153 }
1166 1154
1167 bool get isPaused => _isSubscribed ? _subscription.isPaused : false; 1155 void _onCancel() {
1168
1169 void cancel() {
1170 if (_isSubscribed) { 1156 if (_isSubscribed) {
1171 StreamSubscription subscription = _subscription; 1157 StreamSubscription subscription = _subscription;
1172 _subscription = null; 1158 _subscription = null;
1173 subscription.cancel(); 1159 subscription.cancel();
1174 } 1160 }
1175 _isClosed = true; 1161 _isClosed = true;
1176 } 1162 }
1177 1163
1178 void _handleData(S data) { 1164 void _handleData(S data) {
1179 try { 1165 try {
1180 _transformer.handleData(data, _sink); 1166 _transformer.handleData(data, _sink);
1181 } catch (e, s) { 1167 } catch (e, s) {
1182 _sendError(_asyncError(e, s)); 1168 _addError(_asyncError(e, s));
1183 } 1169 }
1184 } 1170 }
1185 1171
1186 void _handleError(error) { 1172 void _handleError(error) {
1187 try { 1173 try {
1188 _transformer.handleError(error, _sink); 1174 _transformer.handleError(error, _sink);
1189 } catch (e, s) { 1175 } catch (e, s) {
1190 _sendError(_asyncError(e, s)); 1176 _addError(_asyncError(e, s));
1191 } 1177 }
1192 } 1178 }
1193 1179
1194 void _handleDone() { 1180 void _handleDone() {
1195 try { 1181 try {
1196 _subscription = null; 1182 _subscription = null;
1197 _transformer.handleDone(_sink); 1183 _transformer.handleDone(_sink);
1198 } catch (e, s) { 1184 } catch (e, s) {
1199 _sendError(_asyncError(e, s)); 1185 _addError(_asyncError(e, s));
1200 } 1186 }
1201 } 1187 }
1202
1203 // EventOutputSink interface.
1204 void _sendData(T data) {
1205 if (_isClosed) return;
1206 _onData(data);
1207 }
1208
1209 void _sendError(error) {
1210 if (_isClosed) return;
1211 _onError(error);
1212 if (_cancelOnError) {
1213 cancel();
1214 }
1215 }
1216
1217 void _sendDone() {
1218 if (_isClosed) throw new StateError("Already closed.");
1219 _isClosed = true;
1220 if (_isSubscribed) {
1221 _subscription.cancel();
1222 _subscription = null;
1223 }
1224 _onDone();
1225 }
1226 } 1188 }
1227 1189
1228 class _EventOutputSinkWrapper<T> extends EventSink<T> { 1190 class _EventSinkAdapter<T> implements EventSink<T> {
1229 _EventOutputSink _sink; 1191 _EventSink _sink;
1230 _EventOutputSinkWrapper(this._sink); 1192 _EventSinkAdapter(this._sink);
1231 1193
1232 void add(T data) { _sink._sendData(data); } 1194 void add(T data) { _sink._add(data); }
1233 void addError(error) { _sink._sendError(error); } 1195 void addError(error) { _sink._addError(error); }
1234 void close() { _sink._sendDone(); } 1196 void close() { _sink._close(); }
1235 } 1197 }
1198
1199
1200 /**
1201 * An [Iterable] like interface for the values of a [Stream].
1202 *
1203 * This wraps a [Stream] and a subscription on the stream. It listens
1204 * on the stream, and completes the future returned by [moveNext] when the
1205 * next value becomes available.
1206 *
1207 * NOTICE: This is a tentative design. This class may change.
1208 */
1209 abstract class StreamIterator<T> {
1210
1211 /** Create a [StreamIterator] on [stream]. */
1212 factory StreamIterator(Stream<T> stream)
1213 // TODO(lrn): use redirecting factory constructor when type
1214 // arguments are supported.
1215 => new _StreamIteratorImpl<T>(stream);
1216
1217 /**
1218 * Wait for the next stream value to be available.
1219 *
1220 * It is not allowed to call this function again until the future has
1221 * completed. If the returned future completes with anything except `true`,
1222 * the iterator is done, and no new value will ever be available.
1223 *
1224 * The future may complete with an error, if the stream produces an error.
1225 */
1226 Future<bool> moveNext();
1227
1228 /**
1229 * The current value of the stream.
1230 *
1231 * Only valid when the future returned by [moveNext] completes with `true`
1232 * as value, and only until the next call to [moveNext].
1233 */
1234 T get current;
1235
1236 /**
1237 * Cancels the stream iterator (and the underlying stream subscription) early.
1238 *
1239 * The stream iterator is automatically canceled if the [moveNext] future
1240 * completes with either `false` or an error.
1241 *
1242 * If a [moveNext] call has been made, it will complete with `false` as value,
1243 * as will all further calls to [moveNext].
1244 *
1245 * If you need to stop listening for values before the stream iterator is
1246 * automatically closed, you must call [cancel] to ensure that the stream
1247 * is properly closed.
1248 */
1249 void cancel();
1250 }
OLDNEW
« no previous file with comments | « sdk/lib/_internal/pub/test/error_group_test.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698