| Index: sdk/lib/async/stream.dart
|
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
|
| index 528dee00a4ce0d23bacfddd017b720e7faee1d56..dbe3374e8f088733bd0c49563012fc577611a200 100644
|
| --- a/sdk/lib/async/stream.dart
|
| +++ b/sdk/lib/async/stream.dart
|
| @@ -62,24 +62,24 @@ abstract class Stream<T> {
|
| * data or error, and then close with a done-event.
|
| */
|
| factory Stream.fromFuture(Future<T> future) {
|
| - StreamController<T> controller = new StreamController<T>();
|
| + _StreamImpl<T> stream = new _SingleStreamImpl<T>();
|
| future.then((value) {
|
| - controller.add(value);
|
| - controller.close();
|
| + stream._add(value);
|
| + stream._close();
|
| },
|
| onError: (error) {
|
| - controller.addError(error);
|
| - controller.close();
|
| + stream._addError(error);
|
| + stream._close();
|
| });
|
| - return controller.stream;
|
| + return stream;
|
| }
|
|
|
| /**
|
| * Creates a single-subscription stream that gets its data from [data].
|
| */
|
| factory Stream.fromIterable(Iterable<T> data) {
|
| - return new _GeneratedStreamImpl<T>(
|
| - () => new _IterablePendingEvents<T>(data));
|
| + _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data);
|
| + return new _GeneratedSingleStreamImpl<T>(iterableEvents);
|
| }
|
|
|
| /**
|
| @@ -152,7 +152,7 @@ abstract class Stream<T> {
|
| * If this stream is single-subscription, return a new stream that allows
|
| * multiple subscribers. It will subscribe to this stream when its first
|
| * subscriber is added, and unsubscribe again when the last subscription is
|
| - * canceled.
|
| + * cancelled.
|
| *
|
| * If this stream is already a broadcast stream, it is returned unmodified.
|
| */
|
| @@ -1049,7 +1049,22 @@ abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> {
|
| const StreamEventTransformer();
|
|
|
| Stream<T> bind(Stream<S> source) {
|
| - return new EventTransformStream<S, T>(source, this);
|
| + // Hackish way of buffering data that goes out of the event-transformer.
|
| + // TODO(floitsch): replace this with a correct solution.
|
| + Stream transformingStream = new EventTransformStream<S, T>(source, this);
|
| + StreamController controller;
|
| + StreamSubscription subscription;
|
| + controller = new StreamController<T>(
|
| + onListen: () {
|
| + subscription = transformingStream.listen(
|
| + controller.add,
|
| + onError: controller.addError,
|
| + onDone: controller.close);
|
| + },
|
| + onPause: () => subscription.pause(),
|
| + onResume: () => subscription.resume(),
|
| + onCancel: () => subscription.cancel());
|
| + return controller.stream;
|
| }
|
|
|
| /**
|
| @@ -1104,9 +1119,6 @@ class EventTransformStream<S, T> extends Stream<T> {
|
| { void onError(error),
|
| void onDone(),
|
| bool cancelOnError }) {
|
| - if (onData == null) onData = _nullDataHandler;
|
| - if (onError == null) onError = _nullErrorHandler;
|
| - if (onDone == null) onDone = _nullDoneHandler;
|
| cancelOnError = identical(true, cancelOnError);
|
| return new _EventTransformStreamSubscription(_source, _transformer,
|
| onData, onError, onDone,
|
| @@ -1115,16 +1127,16 @@ class EventTransformStream<S, T> extends Stream<T> {
|
| }
|
|
|
| class _EventTransformStreamSubscription<S, T>
|
| - extends _BufferingStreamSubscription<T> {
|
| + extends _BaseStreamSubscription<T>
|
| + implements _EventOutputSink<T> {
|
| /** The transformer used to transform events. */
|
| final StreamEventTransformer<S, T> _transformer;
|
| -
|
| + /** Whether to unsubscribe when emitting an error. */
|
| + final bool _cancelOnError;
|
| /** Whether this stream has sent a done event. */
|
| bool _isClosed = false;
|
| -
|
| /** Source of incoming events. */
|
| StreamSubscription<S> _subscription;
|
| -
|
| /** Cached EventSink wrapper for this class. */
|
| EventSink<T> _sink;
|
|
|
| @@ -1133,9 +1145,9 @@ class _EventTransformStreamSubscription<S, T>
|
| void onData(T data),
|
| void onError(error),
|
| void onDone(),
|
| - bool cancelOnError)
|
| - : super(onData, onError, onDone, cancelOnError) {
|
| - _sink = new _EventSinkAdapter<T>(this);
|
| + this._cancelOnError)
|
| + : super(onData, onError, onDone) {
|
| + _sink = new _EventOutputSinkWrapper<T>(this);
|
| _subscription = source.listen(_handleData,
|
| onError: _handleError,
|
| onDone: _handleDone);
|
| @@ -1144,15 +1156,17 @@ class _EventTransformStreamSubscription<S, T>
|
| /** Whether this subscription is still subscribed to its source. */
|
| bool get _isSubscribed => _subscription != null;
|
|
|
| - void _onPause() {
|
| - if (_isSubscribed) _subscription.pause();
|
| + void pause([Future pauseSignal]) {
|
| + if (_isSubscribed) _subscription.pause(pauseSignal);
|
| }
|
|
|
| - void _onResume() {
|
| + void resume() {
|
| if (_isSubscribed) _subscription.resume();
|
| }
|
|
|
| - void _onCancel() {
|
| + bool get isPaused => _isSubscribed ? _subscription.isPaused : false;
|
| +
|
| + void cancel() {
|
| if (_isSubscribed) {
|
| StreamSubscription subscription = _subscription;
|
| _subscription = null;
|
| @@ -1165,7 +1179,7 @@ class _EventTransformStreamSubscription<S, T>
|
| try {
|
| _transformer.handleData(data, _sink);
|
| } catch (e, s) {
|
| - _addError(_asyncError(e, s));
|
| + _sendError(_asyncError(e, s));
|
| }
|
| }
|
|
|
| @@ -1173,7 +1187,7 @@ class _EventTransformStreamSubscription<S, T>
|
| try {
|
| _transformer.handleError(error, _sink);
|
| } catch (e, s) {
|
| - _addError(_asyncError(e, s));
|
| + _sendError(_asyncError(e, s));
|
| }
|
| }
|
|
|
| @@ -1182,69 +1196,40 @@ class _EventTransformStreamSubscription<S, T>
|
| _subscription = null;
|
| _transformer.handleDone(_sink);
|
| } catch (e, s) {
|
| - _addError(_asyncError(e, s));
|
| + _sendError(_asyncError(e, s));
|
| }
|
| }
|
| -}
|
|
|
| -class _EventSinkAdapter<T> implements EventSink<T> {
|
| - _EventSink _sink;
|
| - _EventSinkAdapter(this._sink);
|
| -
|
| - void add(T data) { _sink._add(data); }
|
| - void addError(error) { _sink._addError(error); }
|
| - void close() { _sink._close(); }
|
| -}
|
| -
|
| -
|
| -/**
|
| - * An [Iterable] like interface for the values of a [Stream].
|
| - *
|
| - * This wraps a [Stream] and a subscription on the stream. It listens
|
| - * on the stream, and completes the future returned by [moveNext] when the
|
| - * next value becomes available.
|
| - *
|
| - * NOTICE: This is a tentative design. This class may change.
|
| - */
|
| -abstract class StreamIterator<T> {
|
| + // EventOutputSink interface.
|
| + void _sendData(T data) {
|
| + if (_isClosed) return;
|
| + _onData(data);
|
| + }
|
|
|
| - /** Create a [StreamIterator] on [stream]. */
|
| - factory StreamIterator(Stream<T> stream)
|
| - // TODO(lrn): use redirecting factory constructor when type
|
| - // arguments are supported.
|
| - => new _StreamIteratorImpl<T>(stream);
|
| + void _sendError(error) {
|
| + if (_isClosed) return;
|
| + _onError(error);
|
| + if (_cancelOnError) {
|
| + cancel();
|
| + }
|
| + }
|
|
|
| - /**
|
| - * Wait for the next stream value to be available.
|
| - *
|
| - * It is not allowed to call this function again until the future has
|
| - * completed. If the returned future completes with anything except `true`,
|
| - * the iterator is done, and no new value will ever be available.
|
| - *
|
| - * The future may complete with an error, if the stream produces an error.
|
| - */
|
| - Future<bool> moveNext();
|
| + void _sendDone() {
|
| + if (_isClosed) throw new StateError("Already closed.");
|
| + _isClosed = true;
|
| + if (_isSubscribed) {
|
| + _subscription.cancel();
|
| + _subscription = null;
|
| + }
|
| + _onDone();
|
| + }
|
| +}
|
|
|
| - /**
|
| - * The current value of the stream.
|
| - *
|
| - * Only valid when the future returned by [moveNext] completes with `true`
|
| - * as value, and only until the next call to [moveNext].
|
| - */
|
| - T get current;
|
| +class _EventOutputSinkWrapper<T> extends EventSink<T> {
|
| + _EventOutputSink _sink;
|
| + _EventOutputSinkWrapper(this._sink);
|
|
|
| - /**
|
| - * Cancels the stream iterator (and the underlying stream subscription) early.
|
| - *
|
| - * The stream iterator is automatically canceled if the [moveNext] future
|
| - * completes with either `false` or an error.
|
| - *
|
| - * If a [moveNext] call has been made, it will complete with `false` as value,
|
| - * as will all further calls to [moveNext].
|
| - *
|
| - * If you need to stop listening for values before the stream iterator is
|
| - * automatically closed, you must call [cancel] to ensure that the stream
|
| - * is properly closed.
|
| - */
|
| - void cancel();
|
| + void add(T data) { _sink._sendData(data); }
|
| + void addError(error) { _sink._sendError(error); }
|
| + void close() { _sink._sendDone(); }
|
| }
|
|
|