Index: sdk/lib/async/stream.dart |
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
index 4be59b7326b09d1f0706db8ddf935b85f13c1097..f90c050e7b313cc4bb88edf5d1f6394c44d316e3 100644 |
--- a/sdk/lib/async/stream.dart |
+++ b/sdk/lib/async/stream.dart |
@@ -62,24 +62,24 @@ abstract class Stream<T> { |
* data or error, and then close with a done-event. |
*/ |
factory Stream.fromFuture(Future<T> future) { |
- _StreamImpl<T> stream = new _SingleStreamImpl<T>(); |
+ StreamController<T> controller = new StreamController<T>(); |
future.then((value) { |
- stream._add(value); |
- stream._close(); |
+ controller.add(value); |
+ controller.close(); |
}, |
onError: (error) { |
- stream._addError(error); |
- stream._close(); |
+ controller.addError(error); |
+ controller.close(); |
}); |
- return stream; |
+ return controller.stream; |
} |
/** |
* Creates a single-subscription stream that gets its data from [data]. |
*/ |
factory Stream.fromIterable(Iterable<T> data) { |
- _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data); |
- return new _GeneratedSingleStreamImpl<T>(iterableEvents); |
+ return new _GeneratedStreamImpl<T>( |
+ () => new _IterablePendingEvents<T>(data)); |
} |
/** |
@@ -1035,22 +1035,7 @@ abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { |
const StreamEventTransformer(); |
Stream<T> bind(Stream<S> source) { |
- // Hackish way of buffering data that goes out of the event-transformer. |
- // TODO(floitsch): replace this with a correct solution. |
- Stream transformingStream = new EventTransformStream<S, T>(source, this); |
- StreamController controller; |
- StreamSubscription subscription; |
- controller = new StreamController<T>( |
- onListen: () { |
- subscription = transformingStream.listen( |
- controller.add, |
- onError: controller.addError, |
- onDone: controller.close); |
- }, |
- onPause: () => subscription.pause(), |
- onResume: () => subscription.resume(), |
- onCancel: () => subscription.cancel()); |
- return controller.stream; |
+ return new EventTransformStream<S, T>(source, this); |
} |
/** |
@@ -1105,6 +1090,9 @@ class EventTransformStream<S, T> extends Stream<T> { |
{ void onError(error), |
void onDone(), |
bool cancelOnError }) { |
+ if (onData == null) onData = _nullDataHandler; |
+ if (onError == null) onError = _nullErrorHandler; |
+ if (onDone == null) onDone = _nullDoneHandler; |
cancelOnError = identical(true, cancelOnError); |
return new _EventTransformStreamSubscription(_source, _transformer, |
onData, onError, onDone, |
@@ -1113,16 +1101,16 @@ class EventTransformStream<S, T> extends Stream<T> { |
} |
class _EventTransformStreamSubscription<S, T> |
- extends _BaseStreamSubscription<T> |
- implements _EventOutputSink<T> { |
+ extends _BufferingStreamSubscription<T> { |
/** The transformer used to transform events. */ |
final StreamEventTransformer<S, T> _transformer; |
- /** Whether to unsubscribe when emitting an error. */ |
- final bool _cancelOnError; |
+ |
/** Whether this stream has sent a done event. */ |
bool _isClosed = false; |
+ |
/** Source of incoming events. */ |
StreamSubscription<S> _subscription; |
+ |
/** Cached EventSink wrapper for this class. */ |
EventSink<T> _sink; |
@@ -1131,9 +1119,9 @@ class _EventTransformStreamSubscription<S, T> |
void onData(T data), |
void onError(error), |
void onDone(), |
- this._cancelOnError) |
- : super(onData, onError, onDone) { |
- _sink = new _EventOutputSinkWrapper<T>(this); |
+ bool cancelOnError) |
+ : super(onData, onError, onDone, cancelOnError) { |
+ _sink = new _EventSinkAdapter<T>(this); |
_subscription = source.listen(_handleData, |
onError: _handleError, |
onDone: _handleDone); |
@@ -1142,17 +1130,15 @@ class _EventTransformStreamSubscription<S, T> |
/** Whether this subscription is still subscribed to its source. */ |
bool get _isSubscribed => _subscription != null; |
- void pause([Future pauseSignal]) { |
- if (_isSubscribed) _subscription.pause(pauseSignal); |
+ void _onPause() { |
+ if (_isSubscribed) _subscription.pause(); |
} |
- void resume() { |
+ void _onResume() { |
if (_isSubscribed) _subscription.resume(); |
} |
- bool get isPaused => _isSubscribed ? _subscription.isPaused : false; |
- |
- void cancel() { |
+ void _onCancel() { |
if (_isSubscribed) { |
StreamSubscription subscription = _subscription; |
_subscription = null; |
@@ -1165,7 +1151,7 @@ class _EventTransformStreamSubscription<S, T> |
try { |
_transformer.handleData(data, _sink); |
} catch (e, s) { |
- _sendError(_asyncError(e, s)); |
+ _addError(_asyncError(e, s)); |
} |
} |
@@ -1173,7 +1159,7 @@ class _EventTransformStreamSubscription<S, T> |
try { |
_transformer.handleError(error, _sink); |
} catch (e, s) { |
- _sendError(_asyncError(e, s)); |
+ _addError(_asyncError(e, s)); |
} |
} |
@@ -1182,40 +1168,67 @@ class _EventTransformStreamSubscription<S, T> |
_subscription = null; |
_transformer.handleDone(_sink); |
} catch (e, s) { |
- _sendError(_asyncError(e, s)); |
+ _addError(_asyncError(e, s)); |
} |
} |
+} |
- // EventOutputSink interface. |
- void _sendData(T data) { |
- if (_isClosed) return; |
- _onData(data); |
- } |
- |
- void _sendError(error) { |
- if (_isClosed) return; |
- _onError(error); |
- if (_cancelOnError) { |
- cancel(); |
- } |
- } |
+class _EventSinkAdapter<T> implements EventSink<T> { |
+ _EventSink _sink; |
+ _EventSinkAdapter(this._sink); |
- void _sendDone() { |
- if (_isClosed) throw new StateError("Already closed."); |
- _isClosed = true; |
- if (_isSubscribed) { |
- _subscription.cancel(); |
- _subscription = null; |
- } |
- _onDone(); |
- } |
+ void add(T data) { _sink._add(data); } |
+ void addError(error) { _sink._addError(error); } |
+ void close() { _sink._close(); } |
} |
-class _EventOutputSinkWrapper<T> extends EventSink<T> { |
- _EventOutputSink _sink; |
- _EventOutputSinkWrapper(this._sink); |
- void add(T data) { _sink._sendData(data); } |
- void addError(error) { _sink._sendError(error); } |
- void close() { _sink._sendDone(); } |
+/** |
+ * An [Iterable] like interface for the values of a [Stream]. |
+ * |
+ * This wraps a [Stream] and a subscription on the stream. It listens |
+ * on the stream, and completes the future returned by [moveNext] when the |
+ * next value becomes available. |
+ */ |
+abstract class StreamIterator<T> { |
+ |
+ /** Create a [StreamIterator] on [stream]. */ |
+ factory StreamIterator(Stream<T> stream) |
+ // TODO(lrn): use redirecting factory constructor when type |
+ // arguments are supported. |
+ => new _StreamIteratorImpl<T>(stream); |
+ |
+ /** |
+ * Wait for the next stream value to be available. |
+ * |
+ * It is not allowed to call this function again until the future has |
+ * completed. If the returned future completes with anything except `true`, |
+ * the iterator is done, and no new value will ever be available. |
+ * |
+ * The future may complete with an error, if the stream produces an error. |
+ */ |
+ Future<bool> moveNext(); |
+ |
+ /** |
+ * The current value of the stream. |
+ * |
+ * Only valid when the future returned by [moveNext] completes with `true` |
floitsch
2013/05/24 15:53:17
"valid" or non-null?
|
+ * as value, and only until the next call to [moveNext]. |
+ */ |
+ T get current; |
+ |
+ /** |
+ * Cancels the stream iterator (and the underlying stream subscription) early. |
+ * |
+ * The stream iterator is automatically cancelled if the [moveNext] future |
+ * completes with either `false` or an error. |
+ * |
+ * If a [moveNext] call has been made, it will complete with `false` as value, |
+ * as will all further calls to [moveNext]. |
+ * |
+ * If you need to stop listening for values before the stream iterator is |
+ * automatically closed, you must call [cancel] to ensure that the stream |
+ * is properly closed. |
+ */ |
+ void cancel(); |
} |