Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(567)

Unified Diff: lib/src/subscription_stream.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Add all.dart to test. Apparently people like that. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: lib/src/subscription_stream.dart
diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart
new file mode 100644
index 0000000000000000000000000000000000000000..a0da1bb5c6f8326aa0dc0e9e8d6f3a4412a6644a
--- /dev/null
+++ b/lib/src/subscription_stream.dart
@@ -0,0 +1,154 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.subscription_stream;
+
+import 'dart:async';
+
+import "delegating_stream_subscription.dart";
+
+/// A [Stream] adapter for a [StreamSubscription].
+///
+/// This class allows as `StreamSubscription` to be treated as a `Stream`.
+///
+/// The subscription is paused until the stream is listened to,
+/// then it is resumed and the events are passed on to the
+/// stream's new subscription.
+///
+/// This class assumes that is has control over the original subscription.
+/// If other code is accessing the subscription, results may be unpredictable.
+class SubscriptionStream<T> extends Stream<T> {
+ /// The subscription providing the events for this stream.
+ StreamSubscription _source;
+
+ /// Whether [_source] was created with `cancelOnError` set to `true.`
+ final bool _sourceCancelsOnError;
+
+ /// Create a single-subscription `Stream` from [subscription].
+ ///
+ /// The `subscription` should not be paused. This class will not resume prior
+ /// pauses, so being paused is indistinguishable from not providing any
+ /// events.
+ ///
+ /// If the original `cancelOnError` value used when creating `subscription`
+ /// is known, it can be passed as the [isCancelOnError] parameter.
+ ///
+ /// The [iscancelOnError] argument doesn't need to match the original
+ /// subscription's `cancelOnError` value;
+ /// however, if [isCancelOnError] is `false`
+ /// and the original subscription's `cancelOnError` value is `true`,
+ /// this stream will never emit a done event after an error event.
+ /// On the other hand, if [isCancelOnError] is true, this stream
+ /// will emit a done event and stop after the first error
+ /// regardless of both the original subscription's value
+ /// and the `cancelOnError` value used when listening to this stream.
+ SubscriptionStream(StreamSubscription subscription,
+ {bool isCancelOnError: false})
nweiz 2015/06/18 23:44:27 I thought you were getting rid of this?
Lasse Reichstein Nielsen 2015/06/30 10:34:14 Ack, I was. Got lost in the other changes, I guess
+ : _source = subscription,
+ _sourceCancelsOnError = isCancelOnError {
+ _source.pause();
+ // Clear callbacks to avoid keeping them alive unnecessarily.
+ _source.onData(null);
+ _source.onError(null);
+ _source.onDone(null);
+ }
+
+ StreamSubscription<T> listen(void onData(T event),
+ {Function onError,
+ void onDone(),
+ bool cancelOnError}) {
+ if (_source == null) {
+ throw new StateError("Stream has already been listened to.");
+ }
+ cancelOnError = (true == cancelOnError);
+ var subscription = _source;
+ _source = null;
+ var result;
+ if (cancelOnError == _sourceCancelsOnError) {
+ // Type parameters may not match - cast by wrapping.
+ result = new DelegatingStreamSubscription<T>(subscription);
+ } else if (cancelOnError) {
+ assert(!_sourceCancelsOnError);
+ result = new _CancelOnErrorSubscriptionWrapper<T>(subscription);
+ } else {
+ assert(!cancelOnError && _sourceCancelsOnError);
+ result = new _DoneAfterErrorSubscriptionWrapper(subscription);
+ }
+ result.onData(onData);
+ result.onError(onError);
+ result.onDone(onDone);
+ subscription.resume();
+ return result;
+ }
+}
+
+/// Subscription wrapper that cancels on error.
+///
+/// Used by [SubscriptionStream] when forwarding a subscription
+/// created with `cancelOnError` as `true` to one with (assumed)
+/// `cancelOnError` as `false`. It automatically cancels the
+/// source subscription on the first error.
+class _CancelOnErrorSubscriptionWrapper<T>
+ extends DelegatingStreamSubscription<T> {
+ _CancelOnErrorSubscriptionWrapper(StreamSubscription subscription)
+ : super(subscription);
+
+ void onError(Function handleError) {
+ // Cancel when receiving an error.
+ super.onError((error, StackTrace stackTrace) {
+ super.cancel();
+ if (handleError is ZoneBinaryCallback) {
+ handleError(error, stackTrace);
+ } else {
+ handleError(error);
+ }
+ });
+ }
+}
+
+/// Subscription wrapper that sends a done event after an error.
+///
+/// If the source subscription was created with `cancelOnError` as true,
+/// this subscription will look like a non-`cancelOnError` subscription
+/// that happens to end normally after the first error.
+///
+/// If the source subscription isn't `cancelOnError` then it's canceled
+/// after the first error anyway, to ensure consistent behavior.
+class _DoneAfterErrorSubscriptionWrapper<T>
+ extends DelegatingStreamSubscription<T> {
+ // Stores the done handler so it can be called after an error.
+ var _onDone;
+
+ _DoneAfterErrorSubscriptionWrapper(StreamSubscription subscription)
+ : super(subscription);
+
+ void onDone(void handleDone()) {
+ super.onDone(handleDone);
+ _onDone = handleDone;
+ }
+
+ void onError(Function errorHandler) {
+ errorHandler = _doneAfterError(errorHandler);
+ super.onError(errorHandler);
+ }
+
+ Function _doneAfterError(Function errorHandler) {
+ return (error, StackTrace stackTrace) {
+ // Ensure that the subscription is really canceled
+ // so we don't get two done events ever - even if the
+ // class is used incorrectly.
+ super.cancel();
+ scheduleMicrotask(() {
+ if (_onDone != null) {
+ _onDone();
+ }
+ });
+ if (errorHandler is ZoneBinaryCallback) {
+ errorHandler(error, stackTrace);
+ } else {
+ errorHandler(error);
+ }
+ };
+ }
+}

Powered by Google App Engine
This is Rietveld 408576698