OLD | NEW |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library async.subscription_stream; | |
6 | |
7 import 'dart:async'; | 5 import 'dart:async'; |
8 | 6 |
9 import "delegate/stream_subscription.dart"; | 7 import "delegate/stream_subscription.dart"; |
10 | 8 |
11 /// A [Stream] adapter for a [StreamSubscription]. | 9 /// A [Stream] adapter for a [StreamSubscription]. |
12 /// | 10 /// |
13 /// This class allows as `StreamSubscription` to be treated as a `Stream`. | 11 /// This class allows a `StreamSubscription` to be treated as a `Stream`. |
14 /// | 12 /// |
15 /// The subscription is paused until the stream is listened to, | 13 /// The subscription is paused until the stream is listened to, |
16 /// then it is resumed and the events are passed on to the | 14 /// then it is resumed and the events are passed on to the |
17 /// stream's new subscription. | 15 /// stream's new subscription. |
18 /// | 16 /// |
19 /// This class assumes that is has control over the original subscription. | 17 /// This class assumes that is has control over the original subscription. |
20 /// If other code is accessing the subscription, results may be unpredictable. | 18 /// If other code is accessing the subscription, results may be unpredictable. |
21 class SubscriptionStream<T> extends Stream<T> { | 19 class SubscriptionStream<T> extends Stream<T> { |
22 /// The subscription providing the events for this stream. | 20 /// The subscription providing the events for this stream. |
23 StreamSubscription _source; | 21 StreamSubscription<T> _source; |
24 | 22 |
25 /// Create a single-subscription `Stream` from [subscription]. | 23 /// Create a single-subscription `Stream` from [subscription]. |
26 /// | 24 /// |
27 /// The `subscription` should not be paused. This class will not resume prior | 25 /// The `subscription` should not be paused. This class will not resume prior |
28 /// pauses, so being paused is indistinguishable from not providing any | 26 /// pauses, so being paused is indistinguishable from not providing any |
29 /// events. | 27 /// events. |
30 /// | 28 /// |
31 /// If the `subscription` doesn't send any `done` events, neither will this | 29 /// If the `subscription` doesn't send any `done` events, neither will this |
32 /// stream. That may be an issue if `subscription` was made to cancel on | 30 /// stream. That may be an issue if `subscription` was made to cancel on |
33 /// an error. | 31 /// an error. |
34 SubscriptionStream(StreamSubscription subscription) | 32 SubscriptionStream(StreamSubscription<T> subscription) |
35 : _source = subscription { | 33 : _source = subscription { |
36 _source.pause(); | 34 _source.pause(); |
37 // Clear callbacks to avoid keeping them alive unnecessarily. | 35 // Clear callbacks to avoid keeping them alive unnecessarily. |
38 _source.onData(null); | 36 _source.onData(null); |
39 _source.onError(null); | 37 _source.onError(null); |
40 _source.onDone(null); | 38 _source.onDone(null); |
41 } | 39 } |
42 | 40 |
43 StreamSubscription<T> listen(void onData(T event), | 41 StreamSubscription<T> listen(void onData(T event), |
44 {Function onError, | 42 {Function onError, void onDone(), bool cancelOnError}) { |
45 void onDone(), | |
46 bool cancelOnError}) { | |
47 if (_source == null) { | 43 if (_source == null) { |
48 throw new StateError("Stream has already been listened to."); | 44 throw new StateError("Stream has already been listened to."); |
49 } | 45 } |
50 cancelOnError = (true == cancelOnError); | 46 cancelOnError = (true == cancelOnError); |
51 var subscription = _source; | 47 var subscription = _source; |
52 _source = null; | 48 _source = null; |
53 var result; | 49 |
54 if (cancelOnError) { | 50 var result = cancelOnError |
55 result = new _CancelOnErrorSubscriptionWrapper<T>(subscription); | 51 ? new _CancelOnErrorSubscriptionWrapper<T>(subscription) |
56 } else { | 52 : subscription; |
57 // Wrap the subscription to ensure correct type parameter. | |
58 result = new DelegatingStreamSubscription<T>(subscription); | |
59 } | |
60 result.onData(onData); | 53 result.onData(onData); |
61 result.onError(onError); | 54 result.onError(onError); |
62 result.onDone(onDone); | 55 result.onDone(onDone); |
63 subscription.resume(); | 56 subscription.resume(); |
64 return result; | 57 return result; |
65 } | 58 } |
66 } | 59 } |
67 | 60 |
68 /// Subscription wrapper that cancels on error. | 61 /// Subscription wrapper that cancels on error. |
69 /// | 62 /// |
70 /// Used by [SubscriptionStream] when forwarding a subscription | 63 /// Used by [SubscriptionStream] when forwarding a subscription |
71 /// created with `cancelOnError` as `true` to one with (assumed) | 64 /// created with `cancelOnError` as `true` to one with (assumed) |
72 /// `cancelOnError` as `false`. It automatically cancels the | 65 /// `cancelOnError` as `false`. It automatically cancels the |
73 /// source subscription on the first error. | 66 /// source subscription on the first error. |
74 class _CancelOnErrorSubscriptionWrapper<T> | 67 class _CancelOnErrorSubscriptionWrapper<T> |
75 extends DelegatingStreamSubscription<T> { | 68 extends DelegatingStreamSubscription<T> { |
76 _CancelOnErrorSubscriptionWrapper(StreamSubscription subscription) | 69 _CancelOnErrorSubscriptionWrapper(StreamSubscription<T> subscription) |
77 : super(subscription); | 70 : super(subscription); |
78 | 71 |
79 void onError(Function handleError) { | 72 void onError(Function handleError) { |
80 // Cancel when receiving an error. | 73 // Cancel when receiving an error. |
81 super.onError((error, StackTrace stackTrace) { | 74 super.onError((error, StackTrace stackTrace) { |
82 var cancelFuture = super.cancel(); | 75 var cancelFuture = super.cancel(); |
83 if (cancelFuture != null) { | 76 if (cancelFuture != null) { |
84 // Wait for the cancel to complete before sending the error event. | 77 // Wait for the cancel to complete before sending the error event. |
85 cancelFuture.whenComplete(() { | 78 cancelFuture.whenComplete(() { |
86 if (handleError is ZoneBinaryCallback) { | 79 if (handleError is ZoneBinaryCallback) { |
87 handleError(error, stackTrace); | 80 handleError(error, stackTrace); |
88 } else { | 81 } else { |
89 handleError(error); | 82 handleError(error); |
90 } | 83 } |
91 }); | 84 }); |
92 } else { | 85 } else { |
93 if (handleError is ZoneBinaryCallback) { | 86 if (handleError is ZoneBinaryCallback) { |
94 handleError(error, stackTrace); | 87 handleError(error, stackTrace); |
95 } else { | 88 } else { |
96 handleError(error); | 89 handleError(error); |
97 } | 90 } |
98 } | 91 } |
99 }); | 92 }); |
100 } | 93 } |
101 } | 94 } |
OLD | NEW |