| Index: sdk/lib/async/stream.dart
|
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
|
| index dbe3374e8f088733bd0c49563012fc577611a200..528dee00a4ce0d23bacfddd017b720e7faee1d56 100644
|
| --- a/sdk/lib/async/stream.dart
|
| +++ b/sdk/lib/async/stream.dart
|
| @@ -62,24 +62,24 @@ abstract class Stream<T> {
|
| * data or error, and then close with a done-event.
|
| */
|
| factory Stream.fromFuture(Future<T> future) {
|
| - _StreamImpl<T> stream = new _SingleStreamImpl<T>();
|
| + StreamController<T> controller = new StreamController<T>();
|
| future.then((value) {
|
| - stream._add(value);
|
| - stream._close();
|
| + controller.add(value);
|
| + controller.close();
|
| },
|
| onError: (error) {
|
| - stream._addError(error);
|
| - stream._close();
|
| + controller.addError(error);
|
| + controller.close();
|
| });
|
| - return stream;
|
| + return controller.stream;
|
| }
|
|
|
| /**
|
| * Creates a single-subscription stream that gets its data from [data].
|
| */
|
| factory Stream.fromIterable(Iterable<T> data) {
|
| - _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data);
|
| - return new _GeneratedSingleStreamImpl<T>(iterableEvents);
|
| + return new _GeneratedStreamImpl<T>(
|
| + () => new _IterablePendingEvents<T>(data));
|
| }
|
|
|
| /**
|
| @@ -152,7 +152,7 @@ abstract class Stream<T> {
|
| * If this stream is single-subscription, return a new stream that allows
|
| * multiple subscribers. It will subscribe to this stream when its first
|
| * subscriber is added, and unsubscribe again when the last subscription is
|
| - * cancelled.
|
| + * canceled.
|
| *
|
| * If this stream is already a broadcast stream, it is returned unmodified.
|
| */
|
| @@ -1049,22 +1049,7 @@ abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> {
|
| const StreamEventTransformer();
|
|
|
| Stream<T> bind(Stream<S> source) {
|
| - // Hackish way of buffering data that goes out of the event-transformer.
|
| - // TODO(floitsch): replace this with a correct solution.
|
| - Stream transformingStream = new EventTransformStream<S, T>(source, this);
|
| - StreamController controller;
|
| - StreamSubscription subscription;
|
| - controller = new StreamController<T>(
|
| - onListen: () {
|
| - subscription = transformingStream.listen(
|
| - controller.add,
|
| - onError: controller.addError,
|
| - onDone: controller.close);
|
| - },
|
| - onPause: () => subscription.pause(),
|
| - onResume: () => subscription.resume(),
|
| - onCancel: () => subscription.cancel());
|
| - return controller.stream;
|
| + return new EventTransformStream<S, T>(source, this);
|
| }
|
|
|
| /**
|
| @@ -1119,6 +1104,9 @@ class EventTransformStream<S, T> extends Stream<T> {
|
| { void onError(error),
|
| void onDone(),
|
| bool cancelOnError }) {
|
| + if (onData == null) onData = _nullDataHandler;
|
| + if (onError == null) onError = _nullErrorHandler;
|
| + if (onDone == null) onDone = _nullDoneHandler;
|
| cancelOnError = identical(true, cancelOnError);
|
| return new _EventTransformStreamSubscription(_source, _transformer,
|
| onData, onError, onDone,
|
| @@ -1127,16 +1115,16 @@ class EventTransformStream<S, T> extends Stream<T> {
|
| }
|
|
|
| class _EventTransformStreamSubscription<S, T>
|
| - extends _BaseStreamSubscription<T>
|
| - implements _EventOutputSink<T> {
|
| + extends _BufferingStreamSubscription<T> {
|
| /** The transformer used to transform events. */
|
| final StreamEventTransformer<S, T> _transformer;
|
| - /** Whether to unsubscribe when emitting an error. */
|
| - final bool _cancelOnError;
|
| +
|
| /** Whether this stream has sent a done event. */
|
| bool _isClosed = false;
|
| +
|
| /** Source of incoming events. */
|
| StreamSubscription<S> _subscription;
|
| +
|
| /** Cached EventSink wrapper for this class. */
|
| EventSink<T> _sink;
|
|
|
| @@ -1145,9 +1133,9 @@ class _EventTransformStreamSubscription<S, T>
|
| void onData(T data),
|
| void onError(error),
|
| void onDone(),
|
| - this._cancelOnError)
|
| - : super(onData, onError, onDone) {
|
| - _sink = new _EventOutputSinkWrapper<T>(this);
|
| + bool cancelOnError)
|
| + : super(onData, onError, onDone, cancelOnError) {
|
| + _sink = new _EventSinkAdapter<T>(this);
|
| _subscription = source.listen(_handleData,
|
| onError: _handleError,
|
| onDone: _handleDone);
|
| @@ -1156,17 +1144,15 @@ class _EventTransformStreamSubscription<S, T>
|
| /** Whether this subscription is still subscribed to its source. */
|
| bool get _isSubscribed => _subscription != null;
|
|
|
| - void pause([Future pauseSignal]) {
|
| - if (_isSubscribed) _subscription.pause(pauseSignal);
|
| + void _onPause() {
|
| + if (_isSubscribed) _subscription.pause();
|
| }
|
|
|
| - void resume() {
|
| + void _onResume() {
|
| if (_isSubscribed) _subscription.resume();
|
| }
|
|
|
| - bool get isPaused => _isSubscribed ? _subscription.isPaused : false;
|
| -
|
| - void cancel() {
|
| + void _onCancel() {
|
| if (_isSubscribed) {
|
| StreamSubscription subscription = _subscription;
|
| _subscription = null;
|
| @@ -1179,7 +1165,7 @@ class _EventTransformStreamSubscription<S, T>
|
| try {
|
| _transformer.handleData(data, _sink);
|
| } catch (e, s) {
|
| - _sendError(_asyncError(e, s));
|
| + _addError(_asyncError(e, s));
|
| }
|
| }
|
|
|
| @@ -1187,7 +1173,7 @@ class _EventTransformStreamSubscription<S, T>
|
| try {
|
| _transformer.handleError(error, _sink);
|
| } catch (e, s) {
|
| - _sendError(_asyncError(e, s));
|
| + _addError(_asyncError(e, s));
|
| }
|
| }
|
|
|
| @@ -1196,40 +1182,69 @@ class _EventTransformStreamSubscription<S, T>
|
| _subscription = null;
|
| _transformer.handleDone(_sink);
|
| } catch (e, s) {
|
| - _sendError(_asyncError(e, s));
|
| + _addError(_asyncError(e, s));
|
| }
|
| }
|
| +}
|
|
|
| - // EventOutputSink interface.
|
| - void _sendData(T data) {
|
| - if (_isClosed) return;
|
| - _onData(data);
|
| - }
|
| -
|
| - void _sendError(error) {
|
| - if (_isClosed) return;
|
| - _onError(error);
|
| - if (_cancelOnError) {
|
| - cancel();
|
| - }
|
| - }
|
| +class _EventSinkAdapter<T> implements EventSink<T> {
|
| + _EventSink _sink;
|
| + _EventSinkAdapter(this._sink);
|
|
|
| - void _sendDone() {
|
| - if (_isClosed) throw new StateError("Already closed.");
|
| - _isClosed = true;
|
| - if (_isSubscribed) {
|
| - _subscription.cancel();
|
| - _subscription = null;
|
| - }
|
| - _onDone();
|
| - }
|
| + void add(T data) { _sink._add(data); }
|
| + void addError(error) { _sink._addError(error); }
|
| + void close() { _sink._close(); }
|
| }
|
|
|
| -class _EventOutputSinkWrapper<T> extends EventSink<T> {
|
| - _EventOutputSink _sink;
|
| - _EventOutputSinkWrapper(this._sink);
|
|
|
| - void add(T data) { _sink._sendData(data); }
|
| - void addError(error) { _sink._sendError(error); }
|
| - void close() { _sink._sendDone(); }
|
| +/**
|
| + * An [Iterable] like interface for the values of a [Stream].
|
| + *
|
| + * This wraps a [Stream] and a subscription on the stream. It listens
|
| + * on the stream, and completes the future returned by [moveNext] when the
|
| + * next value becomes available.
|
| + *
|
| + * NOTICE: This is a tentative design. This class may change.
|
| + */
|
| +abstract class StreamIterator<T> {
|
| +
|
| + /** Create a [StreamIterator] on [stream]. */
|
| + factory StreamIterator(Stream<T> stream)
|
| + // TODO(lrn): use redirecting factory constructor when type
|
| + // arguments are supported.
|
| + => new _StreamIteratorImpl<T>(stream);
|
| +
|
| + /**
|
| + * Wait for the next stream value to be available.
|
| + *
|
| + * It is not allowed to call this function again until the future has
|
| + * completed. If the returned future completes with anything except `true`,
|
| + * the iterator is done, and no new value will ever be available.
|
| + *
|
| + * The future may complete with an error, if the stream produces an error.
|
| + */
|
| + Future<bool> moveNext();
|
| +
|
| + /**
|
| + * The current value of the stream.
|
| + *
|
| + * Only valid when the future returned by [moveNext] completes with `true`
|
| + * as value, and only until the next call to [moveNext].
|
| + */
|
| + T get current;
|
| +
|
| + /**
|
| + * Cancels the stream iterator (and the underlying stream subscription) early.
|
| + *
|
| + * The stream iterator is automatically canceled if the [moveNext] future
|
| + * completes with either `false` or an error.
|
| + *
|
| + * If a [moveNext] call has been made, it will complete with `false` as value,
|
| + * as will all further calls to [moveNext].
|
| + *
|
| + * If you need to stop listening for values before the stream iterator is
|
| + * automatically closed, you must call [cancel] to ensure that the stream
|
| + * is properly closed.
|
| + */
|
| + void cancel();
|
| }
|
|
|