Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(790)

Unified Diff: lib/src/subscription_stream.dart

Issue 1149563010: Add new features to package:async. (Closed) Base URL: https://github.com/dart-lang/async@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: lib/src/subscription_stream.dart
diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart
new file mode 100644
index 0000000000000000000000000000000000000000..d22679c9e421e2d97b41df5928e9a1997f21c796
--- /dev/null
+++ b/lib/src/subscription_stream.dart
@@ -0,0 +1,165 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.streams.stream_subscription;
+
+import 'dart:async';
+
+import "delegates.dart";
+
+/// A [Stream] adapter for a [StreamSubscription].
+///
+/// This class allows as `StreamSubscription` to be treated as a `Stream`.
+///
+/// The subscription is paused until the stream is listened to,
+/// then it is resumed and the events are passed on to the
+/// stream's new subscription.
+///
+/// This class assumes that is has control over the original subscription.
+/// If other code is accessing the subscription, results may be unpredictable.
+class SubscriptionStream<T> extends Stream<T> {
+ StreamSubscription _source;
+ final bool _isCancelOnError;
+ bool get isBroadcastStream => false;
+
+ /// Create a `Stream` from [subscription].
+ ///
+ /// The subscription should not be paused. This class will not resume prior
+ /// pauses, so being paused is indistinguishable from not providing any
+ /// events.
+ ///
+ /// If the original `isCancelOnError` value for the subscription is known,
+ /// it can be passed as the [isCancelOnError] parameter.
+ ///
+ /// If the original subscription cancels on an error, this stream's
+ /// subscription will also do so, whether it's requested in the [listen] call
+ /// or not.
+ ///
+ /// If the `SubscriptionStream` knows this the original subscription's
+ /// error behavior, it can adapt the new stream's subscription.
+ ///
+ /// If the original cancels on an error and the new subscription shouldn't,
+ /// an extra done event is added after the first error, when the
+ /// original subscription is being cancelled.
+ ///
+ /// If the original doesn't cancel on an error and the new subscripton does,
+ /// a cancel is added on the first error.
+ ///
+ /// The default is to assume the subscription will not stop after an error.
+ /// If that assumption is wrong, the result is a stream that stops providing
+ /// events after the first error, including done events.
+ SubscriptionStream(StreamSubscription subscription,
+ {bool isCancelOnError: false})
+ : _source = subscription,
+ _isCancelOnError = isCancelOnError {
+ _source.pause();
+ // Clear callbacks to avoid keeping them alive unnecessarily.
+ _source.onData(null);
+ _source.onError(null);
+ _source.onDone(null);
+ }
+
+ StreamSubscription<T> listen(void onData(T event),
+ {Function onError,
+ void onDone(),
+ bool cancelOnError}) {
+ if (_source == null) {
+ throw new StateError("Stream has already been listened to.");
+ }
+ cancelOnError = (true == cancelOnError);
+ var subscription = _source;
+ _source = null;
+ var result;
+ if (cancelOnError == _isCancelOnError) {
+ if (subscription is StreamSubscription<T>) {
+ result = subscription;
+ } else {
+ // Type parameters don't match - cast by wrapping.
+ result = new DelegatingStreamSubscription<T>(subscription);
+ }
+ } else if (cancelOnError) {
+ assert(!_isCancelOnError);
+ result = new _CancelOnErrorSubscriptionWrapper<T>(subscription);
+ } else {
+ assert(!cancelOnError && _isCancelOnError);
+ result = new _DoneAfterErrorSubscriptionWrapper(subscription);
+ }
+ result.onData(onData);
+ result.onError(onError);
+ result.onDone(onDone);
+ subscription.resume();
+ return result;
+ }
+}
+
+class _CancelOnErrorSubscriptionWrapper<T>
+ extends DelegatingStreamSubscription<T> {
+ _CancelOnErrorSubscriptionWrapper(StreamSubscription subscription)
+ : super(subscription);
+
+ void onError(Function handleError) {
+ handleError = _cancelBeforeError(handleError);
+ super.onError(handleError);
+ }
+
+ /// Helper function used by [onError].
+ ///
+ /// Returns an error handler which cancels the stream when it receives an
+ /// error.
+ Function _cancelBeforeError(Function handleError) {
+ return (e, s) {
+ super.cancel();
+ if (handleError is _BinaryCallback) {
+ handleError(e, s);
+ } else {
+ handleError(e);
+ }
+ };
+ }
+}
+
+/// Subscription wrapper that assumes wrapped subscription is cancel-on-error.
+///
+/// This adapts a cancel-on-error subscription as non-cancel-on-error,
+/// by introducing a done event after the error event that terminated
+/// the wrapped subscription.
+class _DoneAfterErrorSubscriptionWrapper<T>
+ extends DelegatingStreamSubscription<T> {
+ // Store the done handler so it can be called after an error.
+
+ var _onDone;
+ _DoneAfterErrorSubscriptionWrapper(StreamSubscription subscription)
+ : super(subscription);
+
+ void onDone(void handleDone()) {
+ super.onDone(handleDone);
+ _onDone = handleDone;
+ }
+
+ void onError(Function errorHandler) {
+ errorHandler = _doneAfterError(errorHandler);
+ super.onError(errorHandler);
+ }
+
+ Function _doneAfterError(Function errorHandler) {
+ return (e, s) {
+ // Ensure that the subscription is really cancelled
+ // so we don't get two done events ever - even if the
+ // class is used incorrectly.
+ super.cancel();
+ scheduleMicrotask(() {
+ if (_onDone != null) {
+ _onDone();
+ }
+ });
+ if (errorHandler is _BinaryCallback) {
+ errorHandler(e, s);
+ } else {
+ errorHandler(e);
+ }
+ };
+ }
+}
+
+typedef _BinaryCallback(e, s);

Powered by Google App Engine
This is Rietveld 408576698