Index: packages/async/lib/src/subscription_stream.dart |
diff --git a/packages/async/lib/src/subscription_stream.dart b/packages/async/lib/src/subscription_stream.dart |
index e9e7d77b96955735b03d21dc65ae2cd976046f5a..a2356631aa7a973ea943b3b19bdc15e37f853b8d 100644 |
--- a/packages/async/lib/src/subscription_stream.dart |
+++ b/packages/async/lib/src/subscription_stream.dart |
@@ -2,15 +2,13 @@ |
// for details. All rights reserved. Use of this source code is governed by a |
// BSD-style license that can be found in the LICENSE file. |
-library async.subscription_stream; |
- |
import 'dart:async'; |
import "delegate/stream_subscription.dart"; |
/// A [Stream] adapter for a [StreamSubscription]. |
/// |
-/// This class allows as `StreamSubscription` to be treated as a `Stream`. |
+/// This class allows a `StreamSubscription` to be treated as a `Stream`. |
/// |
/// The subscription is paused until the stream is listened to, |
/// then it is resumed and the events are passed on to the |
@@ -20,7 +18,7 @@ import "delegate/stream_subscription.dart"; |
/// If other code is accessing the subscription, results may be unpredictable. |
class SubscriptionStream<T> extends Stream<T> { |
/// The subscription providing the events for this stream. |
- StreamSubscription _source; |
+ StreamSubscription<T> _source; |
/// Create a single-subscription `Stream` from [subscription]. |
/// |
@@ -31,8 +29,8 @@ class SubscriptionStream<T> extends Stream<T> { |
/// If the `subscription` doesn't send any `done` events, neither will this |
/// stream. That may be an issue if `subscription` was made to cancel on |
/// an error. |
- SubscriptionStream(StreamSubscription subscription) |
- : _source = subscription { |
+ SubscriptionStream(StreamSubscription<T> subscription) |
+ : _source = subscription { |
_source.pause(); |
// Clear callbacks to avoid keeping them alive unnecessarily. |
_source.onData(null); |
@@ -41,22 +39,17 @@ class SubscriptionStream<T> extends Stream<T> { |
} |
StreamSubscription<T> listen(void onData(T event), |
- {Function onError, |
- void onDone(), |
- bool cancelOnError}) { |
+ {Function onError, void onDone(), bool cancelOnError}) { |
if (_source == null) { |
throw new StateError("Stream has already been listened to."); |
} |
cancelOnError = (true == cancelOnError); |
var subscription = _source; |
_source = null; |
- var result; |
- if (cancelOnError) { |
- result = new _CancelOnErrorSubscriptionWrapper<T>(subscription); |
- } else { |
- // Wrap the subscription to ensure correct type parameter. |
- result = new DelegatingStreamSubscription<T>(subscription); |
- } |
+ |
+ var result = cancelOnError |
+ ? new _CancelOnErrorSubscriptionWrapper<T>(subscription) |
+ : subscription; |
result.onData(onData); |
result.onError(onError); |
result.onDone(onDone); |
@@ -73,7 +66,7 @@ class SubscriptionStream<T> extends Stream<T> { |
/// source subscription on the first error. |
class _CancelOnErrorSubscriptionWrapper<T> |
extends DelegatingStreamSubscription<T> { |
- _CancelOnErrorSubscriptionWrapper(StreamSubscription subscription) |
+ _CancelOnErrorSubscriptionWrapper(StreamSubscription<T> subscription) |
: super(subscription); |
void onError(Function handleError) { |