Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(148)

Unified Diff: sdk/lib/async/stream.dart

Issue 15989006: Revert until Windows crash is debugged. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/_internal/pub/test/error_group_test.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream.dart
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
index 528dee00a4ce0d23bacfddd017b720e7faee1d56..dbe3374e8f088733bd0c49563012fc577611a200 100644
--- a/sdk/lib/async/stream.dart
+++ b/sdk/lib/async/stream.dart
@@ -62,24 +62,24 @@ abstract class Stream<T> {
* data or error, and then close with a done-event.
*/
factory Stream.fromFuture(Future<T> future) {
- StreamController<T> controller = new StreamController<T>();
+ _StreamImpl<T> stream = new _SingleStreamImpl<T>();
future.then((value) {
- controller.add(value);
- controller.close();
+ stream._add(value);
+ stream._close();
},
onError: (error) {
- controller.addError(error);
- controller.close();
+ stream._addError(error);
+ stream._close();
});
- return controller.stream;
+ return stream;
}
/**
* Creates a single-subscription stream that gets its data from [data].
*/
factory Stream.fromIterable(Iterable<T> data) {
- return new _GeneratedStreamImpl<T>(
- () => new _IterablePendingEvents<T>(data));
+ _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data);
+ return new _GeneratedSingleStreamImpl<T>(iterableEvents);
}
/**
@@ -152,7 +152,7 @@ abstract class Stream<T> {
* If this stream is single-subscription, return a new stream that allows
* multiple subscribers. It will subscribe to this stream when its first
* subscriber is added, and unsubscribe again when the last subscription is
- * canceled.
+ * cancelled.
*
* If this stream is already a broadcast stream, it is returned unmodified.
*/
@@ -1049,7 +1049,22 @@ abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> {
const StreamEventTransformer();
Stream<T> bind(Stream<S> source) {
- return new EventTransformStream<S, T>(source, this);
+ // Hackish way of buffering data that goes out of the event-transformer.
+ // TODO(floitsch): replace this with a correct solution.
+ Stream transformingStream = new EventTransformStream<S, T>(source, this);
+ StreamController controller;
+ StreamSubscription subscription;
+ controller = new StreamController<T>(
+ onListen: () {
+ subscription = transformingStream.listen(
+ controller.add,
+ onError: controller.addError,
+ onDone: controller.close);
+ },
+ onPause: () => subscription.pause(),
+ onResume: () => subscription.resume(),
+ onCancel: () => subscription.cancel());
+ return controller.stream;
}
/**
@@ -1104,9 +1119,6 @@ class EventTransformStream<S, T> extends Stream<T> {
{ void onError(error),
void onDone(),
bool cancelOnError }) {
- if (onData == null) onData = _nullDataHandler;
- if (onError == null) onError = _nullErrorHandler;
- if (onDone == null) onDone = _nullDoneHandler;
cancelOnError = identical(true, cancelOnError);
return new _EventTransformStreamSubscription(_source, _transformer,
onData, onError, onDone,
@@ -1115,16 +1127,16 @@ class EventTransformStream<S, T> extends Stream<T> {
}
class _EventTransformStreamSubscription<S, T>
- extends _BufferingStreamSubscription<T> {
+ extends _BaseStreamSubscription<T>
+ implements _EventOutputSink<T> {
/** The transformer used to transform events. */
final StreamEventTransformer<S, T> _transformer;
-
+ /** Whether to unsubscribe when emitting an error. */
+ final bool _cancelOnError;
/** Whether this stream has sent a done event. */
bool _isClosed = false;
-
/** Source of incoming events. */
StreamSubscription<S> _subscription;
-
/** Cached EventSink wrapper for this class. */
EventSink<T> _sink;
@@ -1133,9 +1145,9 @@ class _EventTransformStreamSubscription<S, T>
void onData(T data),
void onError(error),
void onDone(),
- bool cancelOnError)
- : super(onData, onError, onDone, cancelOnError) {
- _sink = new _EventSinkAdapter<T>(this);
+ this._cancelOnError)
+ : super(onData, onError, onDone) {
+ _sink = new _EventOutputSinkWrapper<T>(this);
_subscription = source.listen(_handleData,
onError: _handleError,
onDone: _handleDone);
@@ -1144,15 +1156,17 @@ class _EventTransformStreamSubscription<S, T>
/** Whether this subscription is still subscribed to its source. */
bool get _isSubscribed => _subscription != null;
- void _onPause() {
- if (_isSubscribed) _subscription.pause();
+ void pause([Future pauseSignal]) {
+ if (_isSubscribed) _subscription.pause(pauseSignal);
}
- void _onResume() {
+ void resume() {
if (_isSubscribed) _subscription.resume();
}
- void _onCancel() {
+ bool get isPaused => _isSubscribed ? _subscription.isPaused : false;
+
+ void cancel() {
if (_isSubscribed) {
StreamSubscription subscription = _subscription;
_subscription = null;
@@ -1165,7 +1179,7 @@ class _EventTransformStreamSubscription<S, T>
try {
_transformer.handleData(data, _sink);
} catch (e, s) {
- _addError(_asyncError(e, s));
+ _sendError(_asyncError(e, s));
}
}
@@ -1173,7 +1187,7 @@ class _EventTransformStreamSubscription<S, T>
try {
_transformer.handleError(error, _sink);
} catch (e, s) {
- _addError(_asyncError(e, s));
+ _sendError(_asyncError(e, s));
}
}
@@ -1182,69 +1196,40 @@ class _EventTransformStreamSubscription<S, T>
_subscription = null;
_transformer.handleDone(_sink);
} catch (e, s) {
- _addError(_asyncError(e, s));
+ _sendError(_asyncError(e, s));
}
}
-}
-class _EventSinkAdapter<T> implements EventSink<T> {
- _EventSink _sink;
- _EventSinkAdapter(this._sink);
-
- void add(T data) { _sink._add(data); }
- void addError(error) { _sink._addError(error); }
- void close() { _sink._close(); }
-}
-
-
-/**
- * An [Iterable] like interface for the values of a [Stream].
- *
- * This wraps a [Stream] and a subscription on the stream. It listens
- * on the stream, and completes the future returned by [moveNext] when the
- * next value becomes available.
- *
- * NOTICE: This is a tentative design. This class may change.
- */
-abstract class StreamIterator<T> {
+ // EventOutputSink interface.
+ void _sendData(T data) {
+ if (_isClosed) return;
+ _onData(data);
+ }
- /** Create a [StreamIterator] on [stream]. */
- factory StreamIterator(Stream<T> stream)
- // TODO(lrn): use redirecting factory constructor when type
- // arguments are supported.
- => new _StreamIteratorImpl<T>(stream);
+ void _sendError(error) {
+ if (_isClosed) return;
+ _onError(error);
+ if (_cancelOnError) {
+ cancel();
+ }
+ }
- /**
- * Wait for the next stream value to be available.
- *
- * It is not allowed to call this function again until the future has
- * completed. If the returned future completes with anything except `true`,
- * the iterator is done, and no new value will ever be available.
- *
- * The future may complete with an error, if the stream produces an error.
- */
- Future<bool> moveNext();
+ void _sendDone() {
+ if (_isClosed) throw new StateError("Already closed.");
+ _isClosed = true;
+ if (_isSubscribed) {
+ _subscription.cancel();
+ _subscription = null;
+ }
+ _onDone();
+ }
+}
- /**
- * The current value of the stream.
- *
- * Only valid when the future returned by [moveNext] completes with `true`
- * as value, and only until the next call to [moveNext].
- */
- T get current;
+class _EventOutputSinkWrapper<T> extends EventSink<T> {
+ _EventOutputSink _sink;
+ _EventOutputSinkWrapper(this._sink);
- /**
- * Cancels the stream iterator (and the underlying stream subscription) early.
- *
- * The stream iterator is automatically canceled if the [moveNext] future
- * completes with either `false` or an error.
- *
- * If a [moveNext] call has been made, it will complete with `false` as value,
- * as will all further calls to [moveNext].
- *
- * If you need to stop listening for values before the stream iterator is
- * automatically closed, you must call [cancel] to ensure that the stream
- * is properly closed.
- */
- void cancel();
+ void add(T data) { _sink._sendData(data); }
+ void addError(error) { _sink._sendError(error); }
+ void close() { _sink._sendDone(); }
}
« no previous file with comments | « sdk/lib/_internal/pub/test/error_group_test.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698