Index: lib/src/subscription_stream.dart |
diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart |
index 4f58c76f8af7dc012c953e6e3e25c89f9147ab42..c448620c5edc6a256aa5e72f50ffc7ad0c555555 100644 |
--- a/lib/src/subscription_stream.dart |
+++ b/lib/src/subscription_stream.dart |
@@ -18,7 +18,7 @@ import "delegate/stream_subscription.dart"; |
/// If other code is accessing the subscription, results may be unpredictable. |
class SubscriptionStream<T> extends Stream<T> { |
/// The subscription providing the events for this stream. |
- StreamSubscription _source; |
+ StreamSubscription<T> _source; |
/// Create a single-subscription `Stream` from [subscription]. |
/// |
@@ -29,7 +29,7 @@ class SubscriptionStream<T> extends Stream<T> { |
/// If the `subscription` doesn't send any `done` events, neither will this |
/// stream. That may be an issue if `subscription` was made to cancel on |
/// an error. |
- SubscriptionStream(StreamSubscription subscription) |
+ SubscriptionStream(StreamSubscription<T> subscription) |
: _source = subscription { |
_source.pause(); |
// Clear callbacks to avoid keeping them alive unnecessarily. |
@@ -48,13 +48,10 @@ class SubscriptionStream<T> extends Stream<T> { |
cancelOnError = (true == cancelOnError); |
var subscription = _source; |
_source = null; |
- var result; |
- if (cancelOnError) { |
- result = new _CancelOnErrorSubscriptionWrapper<T>(subscription); |
- } else { |
- // Wrap the subscription to ensure correct type parameter. |
- result = new DelegatingStreamSubscription<T>(subscription); |
- } |
+ |
+ var result = cancelOnError |
+ ? new _CancelOnErrorSubscriptionWrapper<T>(subscription) |
+ : subscription; |
result.onData(onData); |
result.onError(onError); |
result.onDone(onDone); |