Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(91)

Unified Diff: sdk/lib/async/stream.dart

Issue 14753009: Make StreamSubscription be the active part of a stream. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/_internal/pub/test/error_group_test.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream.dart
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
index dbe3374e8f088733bd0c49563012fc577611a200..528dee00a4ce0d23bacfddd017b720e7faee1d56 100644
--- a/sdk/lib/async/stream.dart
+++ b/sdk/lib/async/stream.dart
@@ -62,24 +62,24 @@ abstract class Stream<T> {
* data or error, and then close with a done-event.
*/
factory Stream.fromFuture(Future<T> future) {
- _StreamImpl<T> stream = new _SingleStreamImpl<T>();
+ StreamController<T> controller = new StreamController<T>();
future.then((value) {
- stream._add(value);
- stream._close();
+ controller.add(value);
+ controller.close();
},
onError: (error) {
- stream._addError(error);
- stream._close();
+ controller.addError(error);
+ controller.close();
});
- return stream;
+ return controller.stream;
}
/**
* Creates a single-subscription stream that gets its data from [data].
*/
factory Stream.fromIterable(Iterable<T> data) {
- _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data);
- return new _GeneratedSingleStreamImpl<T>(iterableEvents);
+ return new _GeneratedStreamImpl<T>(
+ () => new _IterablePendingEvents<T>(data));
}
/**
@@ -152,7 +152,7 @@ abstract class Stream<T> {
* If this stream is single-subscription, return a new stream that allows
* multiple subscribers. It will subscribe to this stream when its first
* subscriber is added, and unsubscribe again when the last subscription is
- * cancelled.
+ * canceled.
*
* If this stream is already a broadcast stream, it is returned unmodified.
*/
@@ -1049,22 +1049,7 @@ abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> {
const StreamEventTransformer();
Stream<T> bind(Stream<S> source) {
- // Hackish way of buffering data that goes out of the event-transformer.
- // TODO(floitsch): replace this with a correct solution.
- Stream transformingStream = new EventTransformStream<S, T>(source, this);
- StreamController controller;
- StreamSubscription subscription;
- controller = new StreamController<T>(
- onListen: () {
- subscription = transformingStream.listen(
- controller.add,
- onError: controller.addError,
- onDone: controller.close);
- },
- onPause: () => subscription.pause(),
- onResume: () => subscription.resume(),
- onCancel: () => subscription.cancel());
- return controller.stream;
+ return new EventTransformStream<S, T>(source, this);
}
/**
@@ -1119,6 +1104,9 @@ class EventTransformStream<S, T> extends Stream<T> {
{ void onError(error),
void onDone(),
bool cancelOnError }) {
+ if (onData == null) onData = _nullDataHandler;
+ if (onError == null) onError = _nullErrorHandler;
+ if (onDone == null) onDone = _nullDoneHandler;
cancelOnError = identical(true, cancelOnError);
return new _EventTransformStreamSubscription(_source, _transformer,
onData, onError, onDone,
@@ -1127,16 +1115,16 @@ class EventTransformStream<S, T> extends Stream<T> {
}
class _EventTransformStreamSubscription<S, T>
- extends _BaseStreamSubscription<T>
- implements _EventOutputSink<T> {
+ extends _BufferingStreamSubscription<T> {
/** The transformer used to transform events. */
final StreamEventTransformer<S, T> _transformer;
- /** Whether to unsubscribe when emitting an error. */
- final bool _cancelOnError;
+
/** Whether this stream has sent a done event. */
bool _isClosed = false;
+
/** Source of incoming events. */
StreamSubscription<S> _subscription;
+
/** Cached EventSink wrapper for this class. */
EventSink<T> _sink;
@@ -1145,9 +1133,9 @@ class _EventTransformStreamSubscription<S, T>
void onData(T data),
void onError(error),
void onDone(),
- this._cancelOnError)
- : super(onData, onError, onDone) {
- _sink = new _EventOutputSinkWrapper<T>(this);
+ bool cancelOnError)
+ : super(onData, onError, onDone, cancelOnError) {
+ _sink = new _EventSinkAdapter<T>(this);
_subscription = source.listen(_handleData,
onError: _handleError,
onDone: _handleDone);
@@ -1156,17 +1144,15 @@ class _EventTransformStreamSubscription<S, T>
/** Whether this subscription is still subscribed to its source. */
bool get _isSubscribed => _subscription != null;
- void pause([Future pauseSignal]) {
- if (_isSubscribed) _subscription.pause(pauseSignal);
+ void _onPause() {
+ if (_isSubscribed) _subscription.pause();
}
- void resume() {
+ void _onResume() {
if (_isSubscribed) _subscription.resume();
}
- bool get isPaused => _isSubscribed ? _subscription.isPaused : false;
-
- void cancel() {
+ void _onCancel() {
if (_isSubscribed) {
StreamSubscription subscription = _subscription;
_subscription = null;
@@ -1179,7 +1165,7 @@ class _EventTransformStreamSubscription<S, T>
try {
_transformer.handleData(data, _sink);
} catch (e, s) {
- _sendError(_asyncError(e, s));
+ _addError(_asyncError(e, s));
}
}
@@ -1187,7 +1173,7 @@ class _EventTransformStreamSubscription<S, T>
try {
_transformer.handleError(error, _sink);
} catch (e, s) {
- _sendError(_asyncError(e, s));
+ _addError(_asyncError(e, s));
}
}
@@ -1196,40 +1182,69 @@ class _EventTransformStreamSubscription<S, T>
_subscription = null;
_transformer.handleDone(_sink);
} catch (e, s) {
- _sendError(_asyncError(e, s));
+ _addError(_asyncError(e, s));
}
}
+}
- // EventOutputSink interface.
- void _sendData(T data) {
- if (_isClosed) return;
- _onData(data);
- }
-
- void _sendError(error) {
- if (_isClosed) return;
- _onError(error);
- if (_cancelOnError) {
- cancel();
- }
- }
+class _EventSinkAdapter<T> implements EventSink<T> {
+ _EventSink _sink;
+ _EventSinkAdapter(this._sink);
- void _sendDone() {
- if (_isClosed) throw new StateError("Already closed.");
- _isClosed = true;
- if (_isSubscribed) {
- _subscription.cancel();
- _subscription = null;
- }
- _onDone();
- }
+ void add(T data) { _sink._add(data); }
+ void addError(error) { _sink._addError(error); }
+ void close() { _sink._close(); }
}
-class _EventOutputSinkWrapper<T> extends EventSink<T> {
- _EventOutputSink _sink;
- _EventOutputSinkWrapper(this._sink);
- void add(T data) { _sink._sendData(data); }
- void addError(error) { _sink._sendError(error); }
- void close() { _sink._sendDone(); }
+/**
+ * An [Iterable] like interface for the values of a [Stream].
+ *
+ * This wraps a [Stream] and a subscription on the stream. It listens
+ * on the stream, and completes the future returned by [moveNext] when the
+ * next value becomes available.
+ *
+ * NOTICE: This is a tentative design. This class may change.
+ */
+abstract class StreamIterator<T> {
+
+ /** Create a [StreamIterator] on [stream]. */
+ factory StreamIterator(Stream<T> stream)
+ // TODO(lrn): use redirecting factory constructor when type
+ // arguments are supported.
+ => new _StreamIteratorImpl<T>(stream);
+
+ /**
+ * Wait for the next stream value to be available.
+ *
+ * It is not allowed to call this function again until the future has
+ * completed. If the returned future completes with anything except `true`,
+ * the iterator is done, and no new value will ever be available.
+ *
+ * The future may complete with an error, if the stream produces an error.
+ */
+ Future<bool> moveNext();
+
+ /**
+ * The current value of the stream.
+ *
+ * Only valid when the future returned by [moveNext] completes with `true`
+ * as value, and only until the next call to [moveNext].
+ */
+ T get current;
+
+ /**
+ * Cancels the stream iterator (and the underlying stream subscription) early.
+ *
+ * The stream iterator is automatically canceled if the [moveNext] future
+ * completes with either `false` or an error.
+ *
+ * If a [moveNext] call has been made, it will complete with `false` as value,
+ * as will all further calls to [moveNext].
+ *
+ * If you need to stop listening for values before the stream iterator is
+ * automatically closed, you must call [cancel] to ensure that the stream
+ * is properly closed.
+ */
+ void cancel();
}
« no previous file with comments | « sdk/lib/_internal/pub/test/error_group_test.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698