OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library async.streams.subscription_stream; | |
6 | |
7 import 'dart:async'; | |
8 | |
9 import "delegates.dart"; | |
10 | |
11 /// A [Stream] adapter for a [StreamSubscription]. | |
12 /// | |
13 /// This class allows as `StreamSubscription` to be treated as a `Stream`. | |
14 /// | |
15 /// The subscription is paused until the stream is listened to, | |
16 /// then it is resumed and the events are passed on to the | |
17 /// stream's new subscription. | |
18 /// | |
19 /// This class assumes that is has control over the original subscription. | |
20 /// If other code is accessing the subscription, results may be unpredictable. | |
21 class SubscriptionStream<T> extends Stream<T> { | |
22 StreamSubscription _source; | |
23 final bool _isCancelOnError; | |
24 bool get isBroadcastStream => false; | |
25 | |
26 /// Create a `Stream` from [subscription]. | |
27 /// | |
28 /// The subscription should not be paused. This class will not resume prior | |
29 /// pauses, so being paused is indistinguishable from not providing any | |
30 /// events. | |
31 /// | |
32 /// If the original `isCancelOnError` value for the subscription is known, | |
nweiz
2015/06/12 01:24:26
"isCancelOnError" -> "cancelOnError"
Lasse Reichstein Nielsen
2015/06/15 15:46:24
I considered that, but it looks too much like a re
| |
33 /// it can be passed as the [isCancelOnError] parameter. | |
34 /// | |
35 /// If the original subscription cancels on an error, this stream's | |
36 /// subscription will also do so, whether it's requested in the [listen] call | |
37 /// or not. | |
38 /// | |
39 /// If the `SubscriptionStream` knows this the original subscription's | |
40 /// error behavior, it can adapt the new stream's subscription. | |
41 /// | |
42 /// If the original cancels on an error and the new subscription shouldn't, | |
43 /// an extra done event is added after the first error, when the | |
44 /// original subscription is being cancelled. | |
45 /// | |
46 /// If the original doesn't cancel on an error and the new subscripton does, | |
47 /// a cancel is added on the first error. | |
48 /// | |
49 /// The default is to assume the subscription will not stop after an error. | |
50 /// If that assumption is wrong, the result is a stream that stops providing | |
51 /// events after the first error, including done events. | |
Søren Gjesse
2015/06/11 07:57:07
I find that the requirement to know the cancel on
nweiz
2015/06/12 01:24:26
I also found this pretty hard to follow. If [Strea
Lasse Reichstein Nielsen
2015/06/12 13:04:23
It's slightly annoying. The alternative is to not
Lasse Reichstein Nielsen
2015/06/15 15:46:24
I adapted the paragraph, it's more readable than t
nweiz
2015/06/16 01:05:23
The more I think about this, the more I think the
Lasse Reichstein Nielsen
2015/06/16 13:05:45
I tend to agree. It just creates ugliness all the
| |
52 SubscriptionStream(StreamSubscription subscription, | |
nweiz
2015/06/12 01:24:26
Shouldn't this be a StreamSubscription<T>?
Lasse Reichstein Nielsen
2015/06/15 15:46:23
Like Iterable.from, the argument is more permissiv
| |
53 {bool isCancelOnError: false}) | |
nweiz
2015/06/12 01:24:26
I'd call this "cancelOnError" to match the paramet
Lasse Reichstein Nielsen
2015/06/15 15:46:24
And I want it to not match because it's not really
| |
54 : _source = subscription, | |
55 _isCancelOnError = isCancelOnError { | |
56 _source.pause(); | |
57 // Clear callbacks to avoid keeping them alive unnecessarily. | |
58 _source.onData(null); | |
59 _source.onError(null); | |
60 _source.onDone(null); | |
61 } | |
62 | |
63 StreamSubscription<T> listen(void onData(T event), | |
64 {Function onError, | |
65 void onDone(), | |
66 bool cancelOnError}) { | |
67 if (_source == null) { | |
68 throw new StateError("Stream has already been listened to."); | |
69 } | |
70 cancelOnError = (true == cancelOnError); | |
71 var subscription = _source; | |
72 _source = null; | |
73 var result; | |
74 if (cancelOnError == _isCancelOnError) { | |
75 if (subscription is StreamSubscription<T>) { | |
nweiz
2015/06/12 01:24:26
I don't understand why [subscription] isn't typed
Lasse Reichstein Nielsen
2015/06/15 15:46:24
The problem is that we promise to return a StreamS
| |
76 result = subscription; | |
77 } else { | |
78 // Type parameters don't match - cast by wrapping. | |
79 result = new DelegatingStreamSubscription<T>(subscription); | |
80 } | |
81 } else if (cancelOnError) { | |
82 assert(!_isCancelOnError); | |
83 result = new _CancelOnErrorSubscriptionWrapper<T>(subscription); | |
84 } else { | |
85 assert(!cancelOnError && _isCancelOnError); | |
86 result = new _DoneAfterErrorSubscriptionWrapper(subscription); | |
87 } | |
88 result.onData(onData); | |
89 result.onError(onError); | |
90 result.onDone(onDone); | |
91 subscription.resume(); | |
92 return result; | |
93 } | |
94 } | |
95 | |
96 class _CancelOnErrorSubscriptionWrapper<T> | |
nweiz
2015/06/12 01:24:26
Document this.
Lasse Reichstein Nielsen
2015/06/15 15:46:24
Done.
| |
97 extends DelegatingStreamSubscription<T> { | |
98 _CancelOnErrorSubscriptionWrapper(StreamSubscription subscription) | |
99 : super(subscription); | |
100 | |
101 void onError(Function handleError) { | |
102 handleError = _cancelBeforeError(handleError); | |
103 super.onError(handleError); | |
104 } | |
105 | |
106 /// Helper function used by [onError]. | |
107 /// | |
108 /// Returns an error handler which cancels the stream when it receives an | |
109 /// error. | |
110 Function _cancelBeforeError(Function handleError) { | |
nweiz
2015/06/12 01:24:26
Why not just inline this?
Lasse Reichstein Nielsen
2015/06/15 15:46:24
I generally like to put functionality in separate
nweiz
2015/06/16 01:05:23
That's interesting... I tend to prefer keeping the
| |
111 return (e, s) { | |
nweiz
2015/06/12 01:24:26
Use full words for variables.
Lasse Reichstein Nielsen
2015/06/15 15:46:24
Done.
| |
112 super.cancel(); | |
113 if (handleError is _BinaryCallback) { | |
114 handleError(e, s); | |
115 } else { | |
116 handleError(e); | |
117 } | |
118 }; | |
119 } | |
120 } | |
121 | |
122 /// Subscription wrapper that assumes wrapped subscription is cancel-on-error. | |
nweiz
2015/06/12 01:24:27
"assumes wrapped" -> "assumes the wrapped"
Lasse Reichstein Nielsen
2015/06/15 15:46:24
But then it doesn't fit on one line?!? :)
Rewritte
| |
123 /// | |
124 /// This adapts a cancel-on-error subscription as non-cancel-on-error, | |
125 /// by introducing a done event after the error event that terminated | |
126 /// the wrapped subscription. | |
127 class _DoneAfterErrorSubscriptionWrapper<T> | |
128 extends DelegatingStreamSubscription<T> { | |
129 // Store the done handler so it can be called after an error. | |
130 | |
131 var _onDone; | |
nweiz
2015/06/12 01:24:26
Nit: swap this with the previous blank line.
Lasse Reichstein Nielsen
2015/06/15 15:46:24
Done.
| |
132 _DoneAfterErrorSubscriptionWrapper(StreamSubscription subscription) | |
133 : super(subscription); | |
134 | |
135 void onDone(void handleDone()) { | |
136 super.onDone(handleDone); | |
137 _onDone = handleDone; | |
138 } | |
139 | |
140 void onError(Function errorHandler) { | |
141 errorHandler = _doneAfterError(errorHandler); | |
142 super.onError(errorHandler); | |
143 } | |
144 | |
145 Function _doneAfterError(Function errorHandler) { | |
nweiz
2015/06/12 01:24:26
This also seems like it would be clearer if it wer
Lasse Reichstein Nielsen
2015/06/15 15:46:23
Done.
| |
146 return (e, s) { | |
147 // Ensure that the subscription is really cancelled | |
148 // so we don't get two done events ever - even if the | |
149 // class is used incorrectly. | |
150 super.cancel(); | |
151 scheduleMicrotask(() { | |
152 if (_onDone != null) { | |
153 _onDone(); | |
154 } | |
155 }); | |
156 if (errorHandler is _BinaryCallback) { | |
157 errorHandler(e, s); | |
158 } else { | |
159 errorHandler(e); | |
160 } | |
161 }; | |
162 } | |
163 } | |
164 | |
165 typedef _BinaryCallback(e, s); | |
Søren Gjesse
2015/06/11 07:57:07
See comment in other file.
Lasse Reichstein Nielsen
2015/06/12 13:04:23
Acknowledged.
| |
OLD | NEW |