| Index: lib/src/subscription_stream.dart
|
| diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..d22679c9e421e2d97b41df5928e9a1997f21c796
|
| --- /dev/null
|
| +++ b/lib/src/subscription_stream.dart
|
| @@ -0,0 +1,165 @@
|
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +library async.streams.stream_subscription;
|
| +
|
| +import 'dart:async';
|
| +
|
| +import "delegates.dart";
|
| +
|
| +/// A [Stream] adapter for a [StreamSubscription].
|
| +///
|
| +/// This class allows as `StreamSubscription` to be treated as a `Stream`.
|
| +///
|
| +/// The subscription is paused until the stream is listened to,
|
| +/// then it is resumed and the events are passed on to the
|
| +/// stream's new subscription.
|
| +///
|
| +/// This class assumes that is has control over the original subscription.
|
| +/// If other code is accessing the subscription, results may be unpredictable.
|
| +class SubscriptionStream<T> extends Stream<T> {
|
| + StreamSubscription _source;
|
| + final bool _isCancelOnError;
|
| + bool get isBroadcastStream => false;
|
| +
|
| + /// Create a `Stream` from [subscription].
|
| + ///
|
| + /// The subscription should not be paused. This class will not resume prior
|
| + /// pauses, so being paused is indistinguishable from not providing any
|
| + /// events.
|
| + ///
|
| + /// If the original `isCancelOnError` value for the subscription is known,
|
| + /// it can be passed as the [isCancelOnError] parameter.
|
| + ///
|
| + /// If the original subscription cancels on an error, this stream's
|
| + /// subscription will also do so, whether it's requested in the [listen] call
|
| + /// or not.
|
| + ///
|
| + /// If the `SubscriptionStream` knows this the original subscription's
|
| + /// error behavior, it can adapt the new stream's subscription.
|
| + ///
|
| + /// If the original cancels on an error and the new subscription shouldn't,
|
| + /// an extra done event is added after the first error, when the
|
| + /// original subscription is being cancelled.
|
| + ///
|
| + /// If the original doesn't cancel on an error and the new subscripton does,
|
| + /// a cancel is added on the first error.
|
| + ///
|
| + /// The default is to assume the subscription will not stop after an error.
|
| + /// If that assumption is wrong, the result is a stream that stops providing
|
| + /// events after the first error, including done events.
|
| + SubscriptionStream(StreamSubscription subscription,
|
| + {bool isCancelOnError: false})
|
| + : _source = subscription,
|
| + _isCancelOnError = isCancelOnError {
|
| + _source.pause();
|
| + // Clear callbacks to avoid keeping them alive unnecessarily.
|
| + _source.onData(null);
|
| + _source.onError(null);
|
| + _source.onDone(null);
|
| + }
|
| +
|
| + StreamSubscription<T> listen(void onData(T event),
|
| + {Function onError,
|
| + void onDone(),
|
| + bool cancelOnError}) {
|
| + if (_source == null) {
|
| + throw new StateError("Stream has already been listened to.");
|
| + }
|
| + cancelOnError = (true == cancelOnError);
|
| + var subscription = _source;
|
| + _source = null;
|
| + var result;
|
| + if (cancelOnError == _isCancelOnError) {
|
| + if (subscription is StreamSubscription<T>) {
|
| + result = subscription;
|
| + } else {
|
| + // Type parameters don't match - cast by wrapping.
|
| + result = new DelegatingStreamSubscription<T>(subscription);
|
| + }
|
| + } else if (cancelOnError) {
|
| + assert(!_isCancelOnError);
|
| + result = new _CancelOnErrorSubscriptionWrapper<T>(subscription);
|
| + } else {
|
| + assert(!cancelOnError && _isCancelOnError);
|
| + result = new _DoneAfterErrorSubscriptionWrapper(subscription);
|
| + }
|
| + result.onData(onData);
|
| + result.onError(onError);
|
| + result.onDone(onDone);
|
| + subscription.resume();
|
| + return result;
|
| + }
|
| +}
|
| +
|
| +class _CancelOnErrorSubscriptionWrapper<T>
|
| + extends DelegatingStreamSubscription<T> {
|
| + _CancelOnErrorSubscriptionWrapper(StreamSubscription subscription)
|
| + : super(subscription);
|
| +
|
| + void onError(Function handleError) {
|
| + handleError = _cancelBeforeError(handleError);
|
| + super.onError(handleError);
|
| + }
|
| +
|
| + /// Helper function used by [onError].
|
| + ///
|
| + /// Returns an error handler which cancels the stream when it receives an
|
| + /// error.
|
| + Function _cancelBeforeError(Function handleError) {
|
| + return (e, s) {
|
| + super.cancel();
|
| + if (handleError is _BinaryCallback) {
|
| + handleError(e, s);
|
| + } else {
|
| + handleError(e);
|
| + }
|
| + };
|
| + }
|
| +}
|
| +
|
| +/// Subscription wrapper that assumes wrapped subscription is cancel-on-error.
|
| +///
|
| +/// This adapts a cancel-on-error subscription as non-cancel-on-error,
|
| +/// by introducing a done event after the error event that terminated
|
| +/// the wrapped subscription.
|
| +class _DoneAfterErrorSubscriptionWrapper<T>
|
| + extends DelegatingStreamSubscription<T> {
|
| + // Store the done handler so it can be called after an error.
|
| +
|
| + var _onDone;
|
| + _DoneAfterErrorSubscriptionWrapper(StreamSubscription subscription)
|
| + : super(subscription);
|
| +
|
| + void onDone(void handleDone()) {
|
| + super.onDone(handleDone);
|
| + _onDone = handleDone;
|
| + }
|
| +
|
| + void onError(Function errorHandler) {
|
| + errorHandler = _doneAfterError(errorHandler);
|
| + super.onError(errorHandler);
|
| + }
|
| +
|
| + Function _doneAfterError(Function errorHandler) {
|
| + return (e, s) {
|
| + // Ensure that the subscription is really cancelled
|
| + // so we don't get two done events ever - even if the
|
| + // class is used incorrectly.
|
| + super.cancel();
|
| + scheduleMicrotask(() {
|
| + if (_onDone != null) {
|
| + _onDone();
|
| + }
|
| + });
|
| + if (errorHandler is _BinaryCallback) {
|
| + errorHandler(e, s);
|
| + } else {
|
| + errorHandler(e);
|
| + }
|
| + };
|
| + }
|
| +}
|
| +
|
| +typedef _BinaryCallback(e, s);
|
|
|