| Index: sdk/lib/async/stream_controller.dart
|
| diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..a60ca8167ab43c0e32d3d580e3f5ed36c9d5dffb
|
| --- /dev/null
|
| +++ b/sdk/lib/async/stream_controller.dart
|
| @@ -0,0 +1,156 @@
|
| +// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +// part of dart.async;
|
| +
|
| +// -------------------------------------------------------------------
|
| +// Default implementation of a stream with a controller for adding
|
| +// events to the stream.
|
| +// -------------------------------------------------------------------
|
| +
|
| +/**
|
| + * A controller and the stream it controls.
|
| + *
|
| + * This controller allows sending data, error and done events on
|
| + * its [stream].
|
| + * This class can be used to create a simple stream that others
|
| + * can listen on, and to push events to that stream.
|
| + * For more specialized streams, the [createStream] method can be
|
| + * overridden to return a specialization of [ControllerStream], and
|
| + * other public methods can be overridden too (but it's recommended
|
| + * that the overriding method calls its super method).
|
| + *
|
| + * A [StreamController] may have zero or more subscribers.
|
| + *
|
| + * If it has subscribers, it may also be paused by any number of its
|
| + * subscribers. When paused, all incoming events are queued. It is the
|
| + * responsibility of the user of this stream to prevent incoming events when
|
| + * the controller is paused. When there are no pausing subscriptions left,
|
| + * either due to them resuming, or due to the pausing subscriptions
|
| + * unsubscribing, events are resumed.
|
| + *
|
| + * When "close" is invoked (but not necessarily when the done event is fired,
|
| + * depending on pause state) the stream controller is closed.
|
| + * When the done event is fired to a subscriber, the subscriber is automatically
|
| + * unsubscribed.
|
| + */
|
| +class StreamController<T> extends Stream<T> implements StreamSink<T> {
|
| + _StreamImpl<T> _stream;
|
| + Stream<T> get stream => _stream;
|
| +
|
| + /**
|
| + * A controller with a [stream] that supports multiple subscribers.
|
| + */
|
| + StreamController() {
|
| + _stream = new _MultiControllerStream<T>(onSubscriptionStateChange,
|
| + onPauseStateChange);
|
| + }
|
| + /**
|
| + * A controller with a [stream] that supports only one single subscriber.
|
| + * The controller will buffer all incoming events until the subscriber is
|
| + * registered.
|
| + */
|
| + StreamController.singleSubscription() {
|
| + _stream = new _SingleControllerStream<T>(onSubscriptionStateChange,
|
| + onPauseStateChange);
|
| + }
|
| +
|
| + StreamSubscription listen(void onData(T data),
|
| + { void onError(AsyncError error),
|
| + void onDone(),
|
| + bool unsubscribeOnError}) {
|
| + return _stream.listen(onData,
|
| + onError: onError,
|
| + onDone: onDone,
|
| + unsubscribeOnError: unsubscribeOnError);
|
| + }
|
| +
|
| + /**
|
| + * Returns a view of this object that only exposes the [StreamSink] interface.
|
| + */
|
| + StreamSink<T> get sink => new StreamSinkView<T>(this);
|
| +
|
| + /** Whether one or more active subscribers have requested a pause. */
|
| + bool get isPaused => _stream._isPaused;
|
| +
|
| + /** Whether there are currently any subscribers on this [Stream]. */
|
| + bool get hasSubscribers => _stream._hasSubscribers;
|
| +
|
| + /**
|
| + * Send or queue a data event.
|
| + */
|
| + Signal add(T value) => _stream._add(value);
|
| +
|
| + /**
|
| + * Send or enqueue an error event.
|
| + *
|
| + * If a subscription has requested to be unsubscribed on errors,
|
| + * it will be unsubscribed after receiving this event.
|
| + */
|
| + void signalError(AsyncError error) { _stream._signalError(error); }
|
| +
|
| + /**
|
| + * Send or enqueue a "done" message.
|
| + *
|
| + * The "done" message should be sent at most once by a stream, and it
|
| + * should be the last message sent.
|
| + */
|
| + void close() { _stream._close(); }
|
| +
|
| + /**
|
| + * Called when the first subscriber requests a pause or the last a resume.
|
| + *
|
| + * Read [isPaused] to see the new state.
|
| + */
|
| + void onPauseStateChange() {}
|
| +
|
| + /**
|
| + * Called when the first listener subscribes or the last unsubscribes.
|
| + *
|
| + * Read [hasSubscribers] to see what the new state is.
|
| + */
|
| + void onSubscriptionStateChange() {}
|
| +
|
| + void forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription)) {
|
| + _stream._forEachSubscriber(() {
|
| + try {
|
| + action();
|
| + } catch (e, s) {
|
| + new AsyncError(e, s).throwDelayed();
|
| + }
|
| + });
|
| + }
|
| +}
|
| +
|
| +typedef void _NotificationHandler();
|
| +
|
| +class _MultiControllerStream<T> extends _MultiStreamImpl<T> {
|
| + _NotificationHandler _subscriptionHandler;
|
| + _NotificationHandler _pauseHandler;
|
| +
|
| + _MultiControllerStream(this._subscriptionHandler, this._pauseHandler);
|
| +
|
| + void _onSubscriptionStateChange() {
|
| + _subscriptionHandler();
|
| + }
|
| +
|
| + void _onPauseStateChange() {
|
| + _pauseHandler();
|
| + }
|
| +}
|
| +
|
| +class _SingleControllerStream<T> extends _SingleStreamImpl<T> {
|
| + _NotificationHandler _subscriptionHandler;
|
| + _NotificationHandler _pauseHandler;
|
| +
|
| + _SingleControllerStream(this._subscriptionHandler, this._pauseHandler);
|
| +
|
| + void _onSubscriptionStateChange() {
|
| + _subscriptionHandler();
|
| + }
|
| +
|
| + void _onPauseStateChange() {
|
| + _pauseHandler();
|
| + }
|
| +}
|
|
|