OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library async.subscription_stream; | |
6 | |
7 import 'dart:async'; | |
8 | |
9 import "delegating_stream_subscription.dart"; | |
10 | |
11 /// A [Stream] adapter for a [StreamSubscription]. | |
12 /// | |
13 /// This class allows as `StreamSubscription` to be treated as a `Stream`. | |
14 /// | |
15 /// The subscription is paused until the stream is listened to, | |
16 /// then it is resumed and the events are passed on to the | |
17 /// stream's new subscription. | |
18 /// | |
19 /// This class assumes that is has control over the original subscription. | |
20 /// If other code is accessing the subscription, results may be unpredictable. | |
21 class SubscriptionStream<T> extends Stream<T> { | |
22 /// The subscription providing the events for this stream. | |
23 StreamSubscription _source; | |
24 | |
25 /// Whether [_source] was created with `cancelOnError` set to `true.` | |
26 final bool _sourceCancelsOnError; | |
27 | |
28 /// Create a single-subscription `Stream` from [subscription]. | |
29 /// | |
30 /// The `subscription` should not be paused. This class will not resume prior | |
31 /// pauses, so being paused is indistinguishable from not providing any | |
32 /// events. | |
33 /// | |
34 /// If the original `cancelOnError` value used when creating `subscription` | |
35 /// is known, it can be passed as the [isCancelOnError] parameter. | |
36 /// | |
37 /// The [iscancelOnError] argument doesn't need to match the original | |
38 /// subscription's `cancelOnError` value; | |
39 /// however, if [isCancelOnError] is `false` | |
40 /// and the original subscription's `cancelOnError` value is `true`, | |
41 /// this stream will never emit a done event after an error event. | |
42 /// On the other hand, if [isCancelOnError] is true, this stream | |
43 /// will emit a done event and stop after the first error | |
44 /// regardless of both the original subscription's value | |
45 /// and the `cancelOnError` value used when listening to this stream. | |
46 SubscriptionStream(StreamSubscription subscription, | |
47 {bool isCancelOnError: false}) | |
nweiz
2015/06/18 23:44:27
I thought you were getting rid of this?
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Ack, I was. Got lost in the other changes, I guess
| |
48 : _source = subscription, | |
49 _sourceCancelsOnError = isCancelOnError { | |
50 _source.pause(); | |
51 // Clear callbacks to avoid keeping them alive unnecessarily. | |
52 _source.onData(null); | |
53 _source.onError(null); | |
54 _source.onDone(null); | |
55 } | |
56 | |
57 StreamSubscription<T> listen(void onData(T event), | |
58 {Function onError, | |
59 void onDone(), | |
60 bool cancelOnError}) { | |
61 if (_source == null) { | |
62 throw new StateError("Stream has already been listened to."); | |
63 } | |
64 cancelOnError = (true == cancelOnError); | |
65 var subscription = _source; | |
66 _source = null; | |
67 var result; | |
68 if (cancelOnError == _sourceCancelsOnError) { | |
69 // Type parameters may not match - cast by wrapping. | |
70 result = new DelegatingStreamSubscription<T>(subscription); | |
71 } else if (cancelOnError) { | |
72 assert(!_sourceCancelsOnError); | |
73 result = new _CancelOnErrorSubscriptionWrapper<T>(subscription); | |
74 } else { | |
75 assert(!cancelOnError && _sourceCancelsOnError); | |
76 result = new _DoneAfterErrorSubscriptionWrapper(subscription); | |
77 } | |
78 result.onData(onData); | |
79 result.onError(onError); | |
80 result.onDone(onDone); | |
81 subscription.resume(); | |
82 return result; | |
83 } | |
84 } | |
85 | |
86 /// Subscription wrapper that cancels on error. | |
87 /// | |
88 /// Used by [SubscriptionStream] when forwarding a subscription | |
89 /// created with `cancelOnError` as `true` to one with (assumed) | |
90 /// `cancelOnError` as `false`. It automatically cancels the | |
91 /// source subscription on the first error. | |
92 class _CancelOnErrorSubscriptionWrapper<T> | |
93 extends DelegatingStreamSubscription<T> { | |
94 _CancelOnErrorSubscriptionWrapper(StreamSubscription subscription) | |
95 : super(subscription); | |
96 | |
97 void onError(Function handleError) { | |
98 // Cancel when receiving an error. | |
99 super.onError((error, StackTrace stackTrace) { | |
100 super.cancel(); | |
101 if (handleError is ZoneBinaryCallback) { | |
102 handleError(error, stackTrace); | |
103 } else { | |
104 handleError(error); | |
105 } | |
106 }); | |
107 } | |
108 } | |
109 | |
110 /// Subscription wrapper that sends a done event after an error. | |
111 /// | |
112 /// If the source subscription was created with `cancelOnError` as true, | |
113 /// this subscription will look like a non-`cancelOnError` subscription | |
114 /// that happens to end normally after the first error. | |
115 /// | |
116 /// If the source subscription isn't `cancelOnError` then it's canceled | |
117 /// after the first error anyway, to ensure consistent behavior. | |
118 class _DoneAfterErrorSubscriptionWrapper<T> | |
119 extends DelegatingStreamSubscription<T> { | |
120 // Stores the done handler so it can be called after an error. | |
121 var _onDone; | |
122 | |
123 _DoneAfterErrorSubscriptionWrapper(StreamSubscription subscription) | |
124 : super(subscription); | |
125 | |
126 void onDone(void handleDone()) { | |
127 super.onDone(handleDone); | |
128 _onDone = handleDone; | |
129 } | |
130 | |
131 void onError(Function errorHandler) { | |
132 errorHandler = _doneAfterError(errorHandler); | |
133 super.onError(errorHandler); | |
134 } | |
135 | |
136 Function _doneAfterError(Function errorHandler) { | |
137 return (error, StackTrace stackTrace) { | |
138 // Ensure that the subscription is really canceled | |
139 // so we don't get two done events ever - even if the | |
140 // class is used incorrectly. | |
141 super.cancel(); | |
142 scheduleMicrotask(() { | |
143 if (_onDone != null) { | |
144 _onDone(); | |
145 } | |
146 }); | |
147 if (errorHandler is ZoneBinaryCallback) { | |
148 errorHandler(error, stackTrace); | |
149 } else { | |
150 errorHandler(error); | |
151 } | |
152 }; | |
153 } | |
154 } | |
OLD | NEW |