Chromium Code Reviews| Index: lib/src/subscription_stream.dart |
| diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..a0da1bb5c6f8326aa0dc0e9e8d6f3a4412a6644a |
| --- /dev/null |
| +++ b/lib/src/subscription_stream.dart |
| @@ -0,0 +1,154 @@ |
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +library async.subscription_stream; |
| + |
| +import 'dart:async'; |
| + |
| +import "delegating_stream_subscription.dart"; |
| + |
| +/// A [Stream] adapter for a [StreamSubscription]. |
| +/// |
| +/// This class allows as `StreamSubscription` to be treated as a `Stream`. |
| +/// |
| +/// The subscription is paused until the stream is listened to, |
| +/// then it is resumed and the events are passed on to the |
| +/// stream's new subscription. |
| +/// |
| +/// This class assumes that is has control over the original subscription. |
| +/// If other code is accessing the subscription, results may be unpredictable. |
| +class SubscriptionStream<T> extends Stream<T> { |
| + /// The subscription providing the events for this stream. |
| + StreamSubscription _source; |
| + |
| + /// Whether [_source] was created with `cancelOnError` set to `true.` |
| + final bool _sourceCancelsOnError; |
| + |
| + /// Create a single-subscription `Stream` from [subscription]. |
| + /// |
| + /// The `subscription` should not be paused. This class will not resume prior |
| + /// pauses, so being paused is indistinguishable from not providing any |
| + /// events. |
| + /// |
| + /// If the original `cancelOnError` value used when creating `subscription` |
| + /// is known, it can be passed as the [isCancelOnError] parameter. |
| + /// |
| + /// The [iscancelOnError] argument doesn't need to match the original |
| + /// subscription's `cancelOnError` value; |
| + /// however, if [isCancelOnError] is `false` |
| + /// and the original subscription's `cancelOnError` value is `true`, |
| + /// this stream will never emit a done event after an error event. |
| + /// On the other hand, if [isCancelOnError] is true, this stream |
| + /// will emit a done event and stop after the first error |
| + /// regardless of both the original subscription's value |
| + /// and the `cancelOnError` value used when listening to this stream. |
| + SubscriptionStream(StreamSubscription subscription, |
| + {bool isCancelOnError: false}) |
|
nweiz
2015/06/18 23:44:27
I thought you were getting rid of this?
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Ack, I was. Got lost in the other changes, I guess
|
| + : _source = subscription, |
| + _sourceCancelsOnError = isCancelOnError { |
| + _source.pause(); |
| + // Clear callbacks to avoid keeping them alive unnecessarily. |
| + _source.onData(null); |
| + _source.onError(null); |
| + _source.onDone(null); |
| + } |
| + |
| + StreamSubscription<T> listen(void onData(T event), |
| + {Function onError, |
| + void onDone(), |
| + bool cancelOnError}) { |
| + if (_source == null) { |
| + throw new StateError("Stream has already been listened to."); |
| + } |
| + cancelOnError = (true == cancelOnError); |
| + var subscription = _source; |
| + _source = null; |
| + var result; |
| + if (cancelOnError == _sourceCancelsOnError) { |
| + // Type parameters may not match - cast by wrapping. |
| + result = new DelegatingStreamSubscription<T>(subscription); |
| + } else if (cancelOnError) { |
| + assert(!_sourceCancelsOnError); |
| + result = new _CancelOnErrorSubscriptionWrapper<T>(subscription); |
| + } else { |
| + assert(!cancelOnError && _sourceCancelsOnError); |
| + result = new _DoneAfterErrorSubscriptionWrapper(subscription); |
| + } |
| + result.onData(onData); |
| + result.onError(onError); |
| + result.onDone(onDone); |
| + subscription.resume(); |
| + return result; |
| + } |
| +} |
| + |
| +/// Subscription wrapper that cancels on error. |
| +/// |
| +/// Used by [SubscriptionStream] when forwarding a subscription |
| +/// created with `cancelOnError` as `true` to one with (assumed) |
| +/// `cancelOnError` as `false`. It automatically cancels the |
| +/// source subscription on the first error. |
| +class _CancelOnErrorSubscriptionWrapper<T> |
| + extends DelegatingStreamSubscription<T> { |
| + _CancelOnErrorSubscriptionWrapper(StreamSubscription subscription) |
| + : super(subscription); |
| + |
| + void onError(Function handleError) { |
| + // Cancel when receiving an error. |
| + super.onError((error, StackTrace stackTrace) { |
| + super.cancel(); |
| + if (handleError is ZoneBinaryCallback) { |
| + handleError(error, stackTrace); |
| + } else { |
| + handleError(error); |
| + } |
| + }); |
| + } |
| +} |
| + |
| +/// Subscription wrapper that sends a done event after an error. |
| +/// |
| +/// If the source subscription was created with `cancelOnError` as true, |
| +/// this subscription will look like a non-`cancelOnError` subscription |
| +/// that happens to end normally after the first error. |
| +/// |
| +/// If the source subscription isn't `cancelOnError` then it's canceled |
| +/// after the first error anyway, to ensure consistent behavior. |
| +class _DoneAfterErrorSubscriptionWrapper<T> |
| + extends DelegatingStreamSubscription<T> { |
| + // Stores the done handler so it can be called after an error. |
| + var _onDone; |
| + |
| + _DoneAfterErrorSubscriptionWrapper(StreamSubscription subscription) |
| + : super(subscription); |
| + |
| + void onDone(void handleDone()) { |
| + super.onDone(handleDone); |
| + _onDone = handleDone; |
| + } |
| + |
| + void onError(Function errorHandler) { |
| + errorHandler = _doneAfterError(errorHandler); |
| + super.onError(errorHandler); |
| + } |
| + |
| + Function _doneAfterError(Function errorHandler) { |
| + return (error, StackTrace stackTrace) { |
| + // Ensure that the subscription is really canceled |
| + // so we don't get two done events ever - even if the |
| + // class is used incorrectly. |
| + super.cancel(); |
| + scheduleMicrotask(() { |
| + if (_onDone != null) { |
| + _onDone(); |
| + } |
| + }); |
| + if (errorHandler is ZoneBinaryCallback) { |
| + errorHandler(error, stackTrace); |
| + } else { |
| + errorHandler(error); |
| + } |
| + }; |
| + } |
| +} |