| Index: packages/async/lib/src/subscription_stream.dart
|
| diff --git a/packages/async/lib/src/subscription_stream.dart b/packages/async/lib/src/subscription_stream.dart
|
| index e9e7d77b96955735b03d21dc65ae2cd976046f5a..a2356631aa7a973ea943b3b19bdc15e37f853b8d 100644
|
| --- a/packages/async/lib/src/subscription_stream.dart
|
| +++ b/packages/async/lib/src/subscription_stream.dart
|
| @@ -2,15 +2,13 @@
|
| // for details. All rights reserved. Use of this source code is governed by a
|
| // BSD-style license that can be found in the LICENSE file.
|
|
|
| -library async.subscription_stream;
|
| -
|
| import 'dart:async';
|
|
|
| import "delegate/stream_subscription.dart";
|
|
|
| /// A [Stream] adapter for a [StreamSubscription].
|
| ///
|
| -/// This class allows as `StreamSubscription` to be treated as a `Stream`.
|
| +/// This class allows a `StreamSubscription` to be treated as a `Stream`.
|
| ///
|
| /// The subscription is paused until the stream is listened to,
|
| /// then it is resumed and the events are passed on to the
|
| @@ -20,7 +18,7 @@ import "delegate/stream_subscription.dart";
|
| /// If other code is accessing the subscription, results may be unpredictable.
|
| class SubscriptionStream<T> extends Stream<T> {
|
| /// The subscription providing the events for this stream.
|
| - StreamSubscription _source;
|
| + StreamSubscription<T> _source;
|
|
|
| /// Create a single-subscription `Stream` from [subscription].
|
| ///
|
| @@ -31,8 +29,8 @@ class SubscriptionStream<T> extends Stream<T> {
|
| /// If the `subscription` doesn't send any `done` events, neither will this
|
| /// stream. That may be an issue if `subscription` was made to cancel on
|
| /// an error.
|
| - SubscriptionStream(StreamSubscription subscription)
|
| - : _source = subscription {
|
| + SubscriptionStream(StreamSubscription<T> subscription)
|
| + : _source = subscription {
|
| _source.pause();
|
| // Clear callbacks to avoid keeping them alive unnecessarily.
|
| _source.onData(null);
|
| @@ -41,22 +39,17 @@ class SubscriptionStream<T> extends Stream<T> {
|
| }
|
|
|
| StreamSubscription<T> listen(void onData(T event),
|
| - {Function onError,
|
| - void onDone(),
|
| - bool cancelOnError}) {
|
| + {Function onError, void onDone(), bool cancelOnError}) {
|
| if (_source == null) {
|
| throw new StateError("Stream has already been listened to.");
|
| }
|
| cancelOnError = (true == cancelOnError);
|
| var subscription = _source;
|
| _source = null;
|
| - var result;
|
| - if (cancelOnError) {
|
| - result = new _CancelOnErrorSubscriptionWrapper<T>(subscription);
|
| - } else {
|
| - // Wrap the subscription to ensure correct type parameter.
|
| - result = new DelegatingStreamSubscription<T>(subscription);
|
| - }
|
| +
|
| + var result = cancelOnError
|
| + ? new _CancelOnErrorSubscriptionWrapper<T>(subscription)
|
| + : subscription;
|
| result.onData(onData);
|
| result.onError(onError);
|
| result.onDone(onDone);
|
| @@ -73,7 +66,7 @@ class SubscriptionStream<T> extends Stream<T> {
|
| /// source subscription on the first error.
|
| class _CancelOnErrorSubscriptionWrapper<T>
|
| extends DelegatingStreamSubscription<T> {
|
| - _CancelOnErrorSubscriptionWrapper(StreamSubscription subscription)
|
| + _CancelOnErrorSubscriptionWrapper(StreamSubscription<T> subscription)
|
| : super(subscription);
|
|
|
| void onError(Function handleError) {
|
|
|