| Index: lib/src/subscription_stream.dart
|
| diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart
|
| index 4f58c76f8af7dc012c953e6e3e25c89f9147ab42..c448620c5edc6a256aa5e72f50ffc7ad0c555555 100644
|
| --- a/lib/src/subscription_stream.dart
|
| +++ b/lib/src/subscription_stream.dart
|
| @@ -18,7 +18,7 @@ import "delegate/stream_subscription.dart";
|
| /// If other code is accessing the subscription, results may be unpredictable.
|
| class SubscriptionStream<T> extends Stream<T> {
|
| /// The subscription providing the events for this stream.
|
| - StreamSubscription _source;
|
| + StreamSubscription<T> _source;
|
|
|
| /// Create a single-subscription `Stream` from [subscription].
|
| ///
|
| @@ -29,7 +29,7 @@ class SubscriptionStream<T> extends Stream<T> {
|
| /// If the `subscription` doesn't send any `done` events, neither will this
|
| /// stream. That may be an issue if `subscription` was made to cancel on
|
| /// an error.
|
| - SubscriptionStream(StreamSubscription subscription)
|
| + SubscriptionStream(StreamSubscription<T> subscription)
|
| : _source = subscription {
|
| _source.pause();
|
| // Clear callbacks to avoid keeping them alive unnecessarily.
|
| @@ -48,13 +48,10 @@ class SubscriptionStream<T> extends Stream<T> {
|
| cancelOnError = (true == cancelOnError);
|
| var subscription = _source;
|
| _source = null;
|
| - var result;
|
| - if (cancelOnError) {
|
| - result = new _CancelOnErrorSubscriptionWrapper<T>(subscription);
|
| - } else {
|
| - // Wrap the subscription to ensure correct type parameter.
|
| - result = new DelegatingStreamSubscription<T>(subscription);
|
| - }
|
| +
|
| + var result = cancelOnError
|
| + ? new _CancelOnErrorSubscriptionWrapper<T>(subscription)
|
| + : subscription;
|
| result.onData(onData);
|
| result.onError(onError);
|
| result.onDone(onDone);
|
|
|