Index: sdk/lib/async/stream.dart |
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
index 528dee00a4ce0d23bacfddd017b720e7faee1d56..dbe3374e8f088733bd0c49563012fc577611a200 100644 |
--- a/sdk/lib/async/stream.dart |
+++ b/sdk/lib/async/stream.dart |
@@ -62,24 +62,24 @@ abstract class Stream<T> { |
* data or error, and then close with a done-event. |
*/ |
factory Stream.fromFuture(Future<T> future) { |
- StreamController<T> controller = new StreamController<T>(); |
+ _StreamImpl<T> stream = new _SingleStreamImpl<T>(); |
future.then((value) { |
- controller.add(value); |
- controller.close(); |
+ stream._add(value); |
+ stream._close(); |
}, |
onError: (error) { |
- controller.addError(error); |
- controller.close(); |
+ stream._addError(error); |
+ stream._close(); |
}); |
- return controller.stream; |
+ return stream; |
} |
/** |
* Creates a single-subscription stream that gets its data from [data]. |
*/ |
factory Stream.fromIterable(Iterable<T> data) { |
- return new _GeneratedStreamImpl<T>( |
- () => new _IterablePendingEvents<T>(data)); |
+ _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data); |
+ return new _GeneratedSingleStreamImpl<T>(iterableEvents); |
} |
/** |
@@ -152,7 +152,7 @@ abstract class Stream<T> { |
* If this stream is single-subscription, return a new stream that allows |
* multiple subscribers. It will subscribe to this stream when its first |
* subscriber is added, and unsubscribe again when the last subscription is |
- * canceled. |
+ * cancelled. |
* |
* If this stream is already a broadcast stream, it is returned unmodified. |
*/ |
@@ -1049,7 +1049,22 @@ abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { |
const StreamEventTransformer(); |
Stream<T> bind(Stream<S> source) { |
- return new EventTransformStream<S, T>(source, this); |
+ // Hackish way of buffering data that goes out of the event-transformer. |
+ // TODO(floitsch): replace this with a correct solution. |
+ Stream transformingStream = new EventTransformStream<S, T>(source, this); |
+ StreamController controller; |
+ StreamSubscription subscription; |
+ controller = new StreamController<T>( |
+ onListen: () { |
+ subscription = transformingStream.listen( |
+ controller.add, |
+ onError: controller.addError, |
+ onDone: controller.close); |
+ }, |
+ onPause: () => subscription.pause(), |
+ onResume: () => subscription.resume(), |
+ onCancel: () => subscription.cancel()); |
+ return controller.stream; |
} |
/** |
@@ -1104,9 +1119,6 @@ class EventTransformStream<S, T> extends Stream<T> { |
{ void onError(error), |
void onDone(), |
bool cancelOnError }) { |
- if (onData == null) onData = _nullDataHandler; |
- if (onError == null) onError = _nullErrorHandler; |
- if (onDone == null) onDone = _nullDoneHandler; |
cancelOnError = identical(true, cancelOnError); |
return new _EventTransformStreamSubscription(_source, _transformer, |
onData, onError, onDone, |
@@ -1115,16 +1127,16 @@ class EventTransformStream<S, T> extends Stream<T> { |
} |
class _EventTransformStreamSubscription<S, T> |
- extends _BufferingStreamSubscription<T> { |
+ extends _BaseStreamSubscription<T> |
+ implements _EventOutputSink<T> { |
/** The transformer used to transform events. */ |
final StreamEventTransformer<S, T> _transformer; |
- |
+ /** Whether to unsubscribe when emitting an error. */ |
+ final bool _cancelOnError; |
/** Whether this stream has sent a done event. */ |
bool _isClosed = false; |
- |
/** Source of incoming events. */ |
StreamSubscription<S> _subscription; |
- |
/** Cached EventSink wrapper for this class. */ |
EventSink<T> _sink; |
@@ -1133,9 +1145,9 @@ class _EventTransformStreamSubscription<S, T> |
void onData(T data), |
void onError(error), |
void onDone(), |
- bool cancelOnError) |
- : super(onData, onError, onDone, cancelOnError) { |
- _sink = new _EventSinkAdapter<T>(this); |
+ this._cancelOnError) |
+ : super(onData, onError, onDone) { |
+ _sink = new _EventOutputSinkWrapper<T>(this); |
_subscription = source.listen(_handleData, |
onError: _handleError, |
onDone: _handleDone); |
@@ -1144,15 +1156,17 @@ class _EventTransformStreamSubscription<S, T> |
/** Whether this subscription is still subscribed to its source. */ |
bool get _isSubscribed => _subscription != null; |
- void _onPause() { |
- if (_isSubscribed) _subscription.pause(); |
+ void pause([Future pauseSignal]) { |
+ if (_isSubscribed) _subscription.pause(pauseSignal); |
} |
- void _onResume() { |
+ void resume() { |
if (_isSubscribed) _subscription.resume(); |
} |
- void _onCancel() { |
+ bool get isPaused => _isSubscribed ? _subscription.isPaused : false; |
+ |
+ void cancel() { |
if (_isSubscribed) { |
StreamSubscription subscription = _subscription; |
_subscription = null; |
@@ -1165,7 +1179,7 @@ class _EventTransformStreamSubscription<S, T> |
try { |
_transformer.handleData(data, _sink); |
} catch (e, s) { |
- _addError(_asyncError(e, s)); |
+ _sendError(_asyncError(e, s)); |
} |
} |
@@ -1173,7 +1187,7 @@ class _EventTransformStreamSubscription<S, T> |
try { |
_transformer.handleError(error, _sink); |
} catch (e, s) { |
- _addError(_asyncError(e, s)); |
+ _sendError(_asyncError(e, s)); |
} |
} |
@@ -1182,69 +1196,40 @@ class _EventTransformStreamSubscription<S, T> |
_subscription = null; |
_transformer.handleDone(_sink); |
} catch (e, s) { |
- _addError(_asyncError(e, s)); |
+ _sendError(_asyncError(e, s)); |
} |
} |
-} |
-class _EventSinkAdapter<T> implements EventSink<T> { |
- _EventSink _sink; |
- _EventSinkAdapter(this._sink); |
- |
- void add(T data) { _sink._add(data); } |
- void addError(error) { _sink._addError(error); } |
- void close() { _sink._close(); } |
-} |
- |
- |
-/** |
- * An [Iterable] like interface for the values of a [Stream]. |
- * |
- * This wraps a [Stream] and a subscription on the stream. It listens |
- * on the stream, and completes the future returned by [moveNext] when the |
- * next value becomes available. |
- * |
- * NOTICE: This is a tentative design. This class may change. |
- */ |
-abstract class StreamIterator<T> { |
+ // EventOutputSink interface. |
+ void _sendData(T data) { |
+ if (_isClosed) return; |
+ _onData(data); |
+ } |
- /** Create a [StreamIterator] on [stream]. */ |
- factory StreamIterator(Stream<T> stream) |
- // TODO(lrn): use redirecting factory constructor when type |
- // arguments are supported. |
- => new _StreamIteratorImpl<T>(stream); |
+ void _sendError(error) { |
+ if (_isClosed) return; |
+ _onError(error); |
+ if (_cancelOnError) { |
+ cancel(); |
+ } |
+ } |
- /** |
- * Wait for the next stream value to be available. |
- * |
- * It is not allowed to call this function again until the future has |
- * completed. If the returned future completes with anything except `true`, |
- * the iterator is done, and no new value will ever be available. |
- * |
- * The future may complete with an error, if the stream produces an error. |
- */ |
- Future<bool> moveNext(); |
+ void _sendDone() { |
+ if (_isClosed) throw new StateError("Already closed."); |
+ _isClosed = true; |
+ if (_isSubscribed) { |
+ _subscription.cancel(); |
+ _subscription = null; |
+ } |
+ _onDone(); |
+ } |
+} |
- /** |
- * The current value of the stream. |
- * |
- * Only valid when the future returned by [moveNext] completes with `true` |
- * as value, and only until the next call to [moveNext]. |
- */ |
- T get current; |
+class _EventOutputSinkWrapper<T> extends EventSink<T> { |
+ _EventOutputSink _sink; |
+ _EventOutputSinkWrapper(this._sink); |
- /** |
- * Cancels the stream iterator (and the underlying stream subscription) early. |
- * |
- * The stream iterator is automatically canceled if the [moveNext] future |
- * completes with either `false` or an error. |
- * |
- * If a [moveNext] call has been made, it will complete with `false` as value, |
- * as will all further calls to [moveNext]. |
- * |
- * If you need to stop listening for values before the stream iterator is |
- * automatically closed, you must call [cancel] to ensure that the stream |
- * is properly closed. |
- */ |
- void cancel(); |
+ void add(T data) { _sink._sendData(data); } |
+ void addError(error) { _sink._sendError(error); } |
+ void close() { _sink._sendDone(); } |
} |