Index: lib/src/subscription_stream.dart |
diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..a0da1bb5c6f8326aa0dc0e9e8d6f3a4412a6644a |
--- /dev/null |
+++ b/lib/src/subscription_stream.dart |
@@ -0,0 +1,154 @@ |
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+library async.subscription_stream; |
+ |
+import 'dart:async'; |
+ |
+import "delegating_stream_subscription.dart"; |
+ |
+/// A [Stream] adapter for a [StreamSubscription]. |
+/// |
+/// This class allows as `StreamSubscription` to be treated as a `Stream`. |
+/// |
+/// The subscription is paused until the stream is listened to, |
+/// then it is resumed and the events are passed on to the |
+/// stream's new subscription. |
+/// |
+/// This class assumes that is has control over the original subscription. |
+/// If other code is accessing the subscription, results may be unpredictable. |
+class SubscriptionStream<T> extends Stream<T> { |
+ /// The subscription providing the events for this stream. |
+ StreamSubscription _source; |
+ |
+ /// Whether [_source] was created with `cancelOnError` set to `true.` |
+ final bool _sourceCancelsOnError; |
+ |
+ /// Create a single-subscription `Stream` from [subscription]. |
+ /// |
+ /// The `subscription` should not be paused. This class will not resume prior |
+ /// pauses, so being paused is indistinguishable from not providing any |
+ /// events. |
+ /// |
+ /// If the original `cancelOnError` value used when creating `subscription` |
+ /// is known, it can be passed as the [isCancelOnError] parameter. |
+ /// |
+ /// The [iscancelOnError] argument doesn't need to match the original |
+ /// subscription's `cancelOnError` value; |
+ /// however, if [isCancelOnError] is `false` |
+ /// and the original subscription's `cancelOnError` value is `true`, |
+ /// this stream will never emit a done event after an error event. |
+ /// On the other hand, if [isCancelOnError] is true, this stream |
+ /// will emit a done event and stop after the first error |
+ /// regardless of both the original subscription's value |
+ /// and the `cancelOnError` value used when listening to this stream. |
+ SubscriptionStream(StreamSubscription subscription, |
+ {bool isCancelOnError: false}) |
nweiz
2015/06/18 23:44:27
I thought you were getting rid of this?
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Ack, I was. Got lost in the other changes, I guess
|
+ : _source = subscription, |
+ _sourceCancelsOnError = isCancelOnError { |
+ _source.pause(); |
+ // Clear callbacks to avoid keeping them alive unnecessarily. |
+ _source.onData(null); |
+ _source.onError(null); |
+ _source.onDone(null); |
+ } |
+ |
+ StreamSubscription<T> listen(void onData(T event), |
+ {Function onError, |
+ void onDone(), |
+ bool cancelOnError}) { |
+ if (_source == null) { |
+ throw new StateError("Stream has already been listened to."); |
+ } |
+ cancelOnError = (true == cancelOnError); |
+ var subscription = _source; |
+ _source = null; |
+ var result; |
+ if (cancelOnError == _sourceCancelsOnError) { |
+ // Type parameters may not match - cast by wrapping. |
+ result = new DelegatingStreamSubscription<T>(subscription); |
+ } else if (cancelOnError) { |
+ assert(!_sourceCancelsOnError); |
+ result = new _CancelOnErrorSubscriptionWrapper<T>(subscription); |
+ } else { |
+ assert(!cancelOnError && _sourceCancelsOnError); |
+ result = new _DoneAfterErrorSubscriptionWrapper(subscription); |
+ } |
+ result.onData(onData); |
+ result.onError(onError); |
+ result.onDone(onDone); |
+ subscription.resume(); |
+ return result; |
+ } |
+} |
+ |
+/// Subscription wrapper that cancels on error. |
+/// |
+/// Used by [SubscriptionStream] when forwarding a subscription |
+/// created with `cancelOnError` as `true` to one with (assumed) |
+/// `cancelOnError` as `false`. It automatically cancels the |
+/// source subscription on the first error. |
+class _CancelOnErrorSubscriptionWrapper<T> |
+ extends DelegatingStreamSubscription<T> { |
+ _CancelOnErrorSubscriptionWrapper(StreamSubscription subscription) |
+ : super(subscription); |
+ |
+ void onError(Function handleError) { |
+ // Cancel when receiving an error. |
+ super.onError((error, StackTrace stackTrace) { |
+ super.cancel(); |
+ if (handleError is ZoneBinaryCallback) { |
+ handleError(error, stackTrace); |
+ } else { |
+ handleError(error); |
+ } |
+ }); |
+ } |
+} |
+ |
+/// Subscription wrapper that sends a done event after an error. |
+/// |
+/// If the source subscription was created with `cancelOnError` as true, |
+/// this subscription will look like a non-`cancelOnError` subscription |
+/// that happens to end normally after the first error. |
+/// |
+/// If the source subscription isn't `cancelOnError` then it's canceled |
+/// after the first error anyway, to ensure consistent behavior. |
+class _DoneAfterErrorSubscriptionWrapper<T> |
+ extends DelegatingStreamSubscription<T> { |
+ // Stores the done handler so it can be called after an error. |
+ var _onDone; |
+ |
+ _DoneAfterErrorSubscriptionWrapper(StreamSubscription subscription) |
+ : super(subscription); |
+ |
+ void onDone(void handleDone()) { |
+ super.onDone(handleDone); |
+ _onDone = handleDone; |
+ } |
+ |
+ void onError(Function errorHandler) { |
+ errorHandler = _doneAfterError(errorHandler); |
+ super.onError(errorHandler); |
+ } |
+ |
+ Function _doneAfterError(Function errorHandler) { |
+ return (error, StackTrace stackTrace) { |
+ // Ensure that the subscription is really canceled |
+ // so we don't get two done events ever - even if the |
+ // class is used incorrectly. |
+ super.cancel(); |
+ scheduleMicrotask(() { |
+ if (_onDone != null) { |
+ _onDone(); |
+ } |
+ }); |
+ if (errorHandler is ZoneBinaryCallback) { |
+ errorHandler(error, stackTrace); |
+ } else { |
+ errorHandler(error); |
+ } |
+ }; |
+ } |
+} |