Chromium Code Reviews| Index: sdk/lib/async/stream.dart |
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
| index 4be59b7326b09d1f0706db8ddf935b85f13c1097..f90c050e7b313cc4bb88edf5d1f6394c44d316e3 100644 |
| --- a/sdk/lib/async/stream.dart |
| +++ b/sdk/lib/async/stream.dart |
| @@ -62,24 +62,24 @@ abstract class Stream<T> { |
| * data or error, and then close with a done-event. |
| */ |
| factory Stream.fromFuture(Future<T> future) { |
| - _StreamImpl<T> stream = new _SingleStreamImpl<T>(); |
| + StreamController<T> controller = new StreamController<T>(); |
| future.then((value) { |
| - stream._add(value); |
| - stream._close(); |
| + controller.add(value); |
| + controller.close(); |
| }, |
| onError: (error) { |
| - stream._addError(error); |
| - stream._close(); |
| + controller.addError(error); |
| + controller.close(); |
| }); |
| - return stream; |
| + return controller.stream; |
| } |
| /** |
| * Creates a single-subscription stream that gets its data from [data]. |
| */ |
| factory Stream.fromIterable(Iterable<T> data) { |
| - _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data); |
| - return new _GeneratedSingleStreamImpl<T>(iterableEvents); |
| + return new _GeneratedStreamImpl<T>( |
| + () => new _IterablePendingEvents<T>(data)); |
| } |
| /** |
| @@ -1035,22 +1035,7 @@ abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { |
| const StreamEventTransformer(); |
| Stream<T> bind(Stream<S> source) { |
| - // Hackish way of buffering data that goes out of the event-transformer. |
| - // TODO(floitsch): replace this with a correct solution. |
| - Stream transformingStream = new EventTransformStream<S, T>(source, this); |
| - StreamController controller; |
| - StreamSubscription subscription; |
| - controller = new StreamController<T>( |
| - onListen: () { |
| - subscription = transformingStream.listen( |
| - controller.add, |
| - onError: controller.addError, |
| - onDone: controller.close); |
| - }, |
| - onPause: () => subscription.pause(), |
| - onResume: () => subscription.resume(), |
| - onCancel: () => subscription.cancel()); |
| - return controller.stream; |
| + return new EventTransformStream<S, T>(source, this); |
| } |
| /** |
| @@ -1105,6 +1090,9 @@ class EventTransformStream<S, T> extends Stream<T> { |
| { void onError(error), |
| void onDone(), |
| bool cancelOnError }) { |
| + if (onData == null) onData = _nullDataHandler; |
| + if (onError == null) onError = _nullErrorHandler; |
| + if (onDone == null) onDone = _nullDoneHandler; |
| cancelOnError = identical(true, cancelOnError); |
| return new _EventTransformStreamSubscription(_source, _transformer, |
| onData, onError, onDone, |
| @@ -1113,16 +1101,16 @@ class EventTransformStream<S, T> extends Stream<T> { |
| } |
| class _EventTransformStreamSubscription<S, T> |
| - extends _BaseStreamSubscription<T> |
| - implements _EventOutputSink<T> { |
| + extends _BufferingStreamSubscription<T> { |
| /** The transformer used to transform events. */ |
| final StreamEventTransformer<S, T> _transformer; |
| - /** Whether to unsubscribe when emitting an error. */ |
| - final bool _cancelOnError; |
| + |
| /** Whether this stream has sent a done event. */ |
| bool _isClosed = false; |
| + |
| /** Source of incoming events. */ |
| StreamSubscription<S> _subscription; |
| + |
| /** Cached EventSink wrapper for this class. */ |
| EventSink<T> _sink; |
| @@ -1131,9 +1119,9 @@ class _EventTransformStreamSubscription<S, T> |
| void onData(T data), |
| void onError(error), |
| void onDone(), |
| - this._cancelOnError) |
| - : super(onData, onError, onDone) { |
| - _sink = new _EventOutputSinkWrapper<T>(this); |
| + bool cancelOnError) |
| + : super(onData, onError, onDone, cancelOnError) { |
| + _sink = new _EventSinkAdapter<T>(this); |
| _subscription = source.listen(_handleData, |
| onError: _handleError, |
| onDone: _handleDone); |
| @@ -1142,17 +1130,15 @@ class _EventTransformStreamSubscription<S, T> |
| /** Whether this subscription is still subscribed to its source. */ |
| bool get _isSubscribed => _subscription != null; |
| - void pause([Future pauseSignal]) { |
| - if (_isSubscribed) _subscription.pause(pauseSignal); |
| + void _onPause() { |
| + if (_isSubscribed) _subscription.pause(); |
| } |
| - void resume() { |
| + void _onResume() { |
| if (_isSubscribed) _subscription.resume(); |
| } |
| - bool get isPaused => _isSubscribed ? _subscription.isPaused : false; |
| - |
| - void cancel() { |
| + void _onCancel() { |
| if (_isSubscribed) { |
| StreamSubscription subscription = _subscription; |
| _subscription = null; |
| @@ -1165,7 +1151,7 @@ class _EventTransformStreamSubscription<S, T> |
| try { |
| _transformer.handleData(data, _sink); |
| } catch (e, s) { |
| - _sendError(_asyncError(e, s)); |
| + _addError(_asyncError(e, s)); |
| } |
| } |
| @@ -1173,7 +1159,7 @@ class _EventTransformStreamSubscription<S, T> |
| try { |
| _transformer.handleError(error, _sink); |
| } catch (e, s) { |
| - _sendError(_asyncError(e, s)); |
| + _addError(_asyncError(e, s)); |
| } |
| } |
| @@ -1182,40 +1168,67 @@ class _EventTransformStreamSubscription<S, T> |
| _subscription = null; |
| _transformer.handleDone(_sink); |
| } catch (e, s) { |
| - _sendError(_asyncError(e, s)); |
| + _addError(_asyncError(e, s)); |
| } |
| } |
| +} |
| - // EventOutputSink interface. |
| - void _sendData(T data) { |
| - if (_isClosed) return; |
| - _onData(data); |
| - } |
| - |
| - void _sendError(error) { |
| - if (_isClosed) return; |
| - _onError(error); |
| - if (_cancelOnError) { |
| - cancel(); |
| - } |
| - } |
| +class _EventSinkAdapter<T> implements EventSink<T> { |
| + _EventSink _sink; |
| + _EventSinkAdapter(this._sink); |
| - void _sendDone() { |
| - if (_isClosed) throw new StateError("Already closed."); |
| - _isClosed = true; |
| - if (_isSubscribed) { |
| - _subscription.cancel(); |
| - _subscription = null; |
| - } |
| - _onDone(); |
| - } |
| + void add(T data) { _sink._add(data); } |
| + void addError(error) { _sink._addError(error); } |
| + void close() { _sink._close(); } |
| } |
| -class _EventOutputSinkWrapper<T> extends EventSink<T> { |
| - _EventOutputSink _sink; |
| - _EventOutputSinkWrapper(this._sink); |
| - void add(T data) { _sink._sendData(data); } |
| - void addError(error) { _sink._sendError(error); } |
| - void close() { _sink._sendDone(); } |
| +/** |
| + * An [Iterable] like interface for the values of a [Stream]. |
| + * |
| + * This wraps a [Stream] and a subscription on the stream. It listens |
| + * on the stream, and completes the future returned by [moveNext] when the |
| + * next value becomes available. |
| + */ |
| +abstract class StreamIterator<T> { |
| + |
| + /** Create a [StreamIterator] on [stream]. */ |
| + factory StreamIterator(Stream<T> stream) |
| + // TODO(lrn): use redirecting factory constructor when type |
| + // arguments are supported. |
| + => new _StreamIteratorImpl<T>(stream); |
| + |
| + /** |
| + * Wait for the next stream value to be available. |
| + * |
| + * It is not allowed to call this function again until the future has |
| + * completed. If the returned future completes with anything except `true`, |
| + * the iterator is done, and no new value will ever be available. |
| + * |
| + * The future may complete with an error, if the stream produces an error. |
| + */ |
| + Future<bool> moveNext(); |
| + |
| + /** |
| + * The current value of the stream. |
| + * |
| + * Only valid when the future returned by [moveNext] completes with `true` |
|
floitsch
2013/05/24 15:53:17
"valid" or non-null?
|
| + * as value, and only until the next call to [moveNext]. |
| + */ |
| + T get current; |
| + |
| + /** |
| + * Cancels the stream iterator (and the underlying stream subscription) early. |
| + * |
| + * The stream iterator is automatically cancelled if the [moveNext] future |
| + * completes with either `false` or an error. |
| + * |
| + * If a [moveNext] call has been made, it will complete with `false` as value, |
| + * as will all further calls to [moveNext]. |
| + * |
| + * If you need to stop listening for values before the stream iterator is |
| + * automatically closed, you must call [cancel] to ensure that the stream |
| + * is properly closed. |
| + */ |
| + void cancel(); |
| } |