OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library error_group; | |
6 | |
7 import 'dart:async'; | |
8 | |
9 /// An [ErrorGroup] entangles the errors of multiple [Future]s and [Stream]s | |
10 /// with one another. This allows APIs to expose multiple [Future]s and | |
11 /// [Stream]s that have identical error conditions without forcing API consumers | |
12 /// to attach error handling to objects they don't care about. | |
13 /// | |
14 /// To use an [ErrorGroup], register [Future]s and [Stream]s with it using | |
15 /// [registerFuture] and [registerStream]. These methods return wrapped versions | |
16 /// of the [Future]s and [Stream]s, which should then be used in place of the | |
17 /// originals. For example: | |
18 /// | |
19 /// var errorGroup = new ErrorGroup(); | |
20 /// future = errorGroup.registerFuture(future); | |
21 /// stream = errorGroup.registerStream(stream); | |
22 /// | |
23 /// An [ErrorGroup] has two major effects on its wrapped members: | |
24 /// | |
25 /// * An error in any member of the group will be propagated to every member | |
26 /// that hasn't already completed. If those members later complete, their | |
27 /// values will be ignored. | |
28 /// * If any member of this group has a listener, errors on members without | |
29 /// listeners won't get passed to the top-level error handler. | |
30 class ErrorGroup { | |
31 /// The [Future]s that are members of [this]. | |
32 final _futures = <_ErrorGroupFuture>[]; | |
33 | |
34 /// The [Stream]s that are members of [this]. | |
35 final _streams = <_ErrorGroupStream>[]; | |
36 | |
37 /// Whether [this] has completed, either successfully or with an error. | |
38 var _isComplete = false; | |
Bob Nystrom
2013/01/15 22:49:47
var -> bool
nweiz
2013/01/16 00:12:10
You've told me in the past not to use type declara
Bob Nystrom
2013/01/16 18:03:44
I think the rough guideline we've followed is to d
nweiz
2013/01/16 19:29:11
...why? It's no less inferable in the "var" case t
| |
39 | |
40 /// The [Completer] for [complete]. | |
41 final _completeCompleter = new Completer(); | |
42 | |
43 /// The underlying [Future] for [complete]. We need to be able to access it | |
44 /// internally as an [_ErrorGroupFuture] so we can check if it has listeners | |
45 /// and signal errors on it. | |
46 _ErrorGroupFuture _complete; | |
47 | |
48 /// Returns a [Future] that completes successully when all members of [this] | |
49 /// are complete, or with an error if any member receives an error. | |
50 /// | |
51 /// This [Future] is effectively in the group in that an error on it won't be | |
52 /// passed to the top-level error handler unless no members of the group have | |
53 /// listeners attached. | |
54 Future get complete => _complete; | |
Bob Nystrom
2013/01/15 22:49:47
"completed"? The name "complete" seems odd to me h
nweiz
2013/01/16 00:12:10
Changed to "done" as per offline discussion.
| |
55 | |
56 /// Creates a new group with no members. | |
57 ErrorGroup() { | |
58 this._complete = new _ErrorGroupFuture(this, _completeCompleter.future); | |
59 } | |
60 | |
61 /// Registers a [Future] as a member of [this]. Returns a wrapped version of | |
62 /// [future] that should be used in its place. | |
Bob Nystrom
2013/01/15 22:49:47
Document that it's an error to register an already
nweiz
2013/01/16 00:12:10
It's not. It's a little silly, but it'll work fine
| |
63 /// | |
64 /// If all members of [this] have already completed successfully or with an | |
65 /// error, it's a [StateError] to try to register a new [Future]. | |
66 Future registerFuture(Future future) { | |
Bob Nystrom
2013/01/15 22:49:47
What do you think of just "addFuture" instead of "
nweiz
2013/01/16 00:12:10
I think that makes it sound a little too much like
| |
67 if (_isComplete) { | |
68 throw new StateError("Can't register new members on a complete " | |
69 "ErrorGroup."); | |
70 } | |
71 | |
72 var wrapped = new _ErrorGroupFuture(this, future); | |
73 _futures.add(wrapped); | |
74 return wrapped; | |
75 } | |
76 | |
77 /// Registers a [Stream] as a member of [this]. Returns a wrapped version of | |
78 /// [stream] that should be used in its place. The returned [Stream] will be | |
79 /// multi-subscription if and only if [stream] is. | |
80 /// | |
81 /// Since all errors in a group are passed to all members, the returned | |
82 /// [Stream] will automatically unsubscribe all its listeners when it | |
83 /// encounters an error. | |
84 /// | |
85 /// If all members of [this] have already completed successfully or with an | |
86 /// error, it's a [StateError] to try to register a new [Stream]. | |
Bob Nystrom
2013/01/15 22:49:47
Document that it's an error to register an already
nweiz
2013/01/16 00:12:10
See above.
| |
87 Stream registerStream(Stream stream) { | |
88 if (_isComplete) { | |
89 throw new StateError("Can't register new members on a complete " | |
90 "ErrorGroup."); | |
91 } | |
92 | |
93 var wrapped = new _ErrorGroupStream(this, stream); | |
94 _streams.add(wrapped); | |
95 return wrapped; | |
96 } | |
97 | |
98 /// Sends [error] to all members of [this]. Like errors that come from | |
99 /// members, this will only be passed to the top-level error handler if no | |
100 /// members have listeners. | |
101 /// | |
102 /// If all members of [this] have already completed successfully or with an | |
103 /// error, it's a [StateError] to try to signal an error. | |
104 void signalError(AsyncError error) { | |
105 if (_isComplete) { | |
106 throw new StateError("Can't signal errors on a complete ErrorGroup."); | |
107 } | |
108 | |
109 _signalError(error); | |
110 } | |
111 | |
112 /// Signal an error internally. This is just like [signalError], but instead | |
113 /// of throwing an error if [this] is complete, it just does nothing. | |
114 void _signalError(AsyncError error) { | |
115 if (_isComplete) return; | |
116 | |
117 var caught = false; | |
118 for (var future in _futures) { | |
119 if (future._isComplete || future._hasListeners) caught = true; | |
120 future._signalError(error); | |
121 } | |
122 | |
123 for (var stream in _streams) { | |
124 if (stream._isComplete || stream._hasListeners) caught = true; | |
125 stream._signalError(error); | |
126 } | |
127 | |
128 _isComplete = true; | |
129 _complete._signalError(error); | |
130 if (!caught && !_complete._hasListeners) error.throwDelayed(); | |
131 } | |
132 | |
133 /// Notifies [this] that one of its member [Future]s is complete. | |
134 void _signalFutureComplete(_ErrorGroupFuture future) { | |
135 if (_isComplete) return; | |
136 | |
137 _isComplete = _futures.every((future) => future._isComplete) && | |
138 _streams.every((stream) => stream._isComplete); | |
Bob Nystrom
2013/01/15 22:49:47
Indent another 2.
nweiz
2013/01/16 00:12:10
Done.
| |
139 if (_isComplete) _completeCompleter.complete(); | |
140 } | |
141 | |
142 /// Notifies [this] that one of its member [Stream]s is complete. | |
143 void _signalStreamComplete(_ErrorGroupStream stream) { | |
144 if (_isComplete) return; | |
145 | |
146 _isComplete = _futures.every((future) => future._isComplete) && | |
147 _streams.every((stream) => stream._isComplete); | |
Bob Nystrom
2013/01/15 22:49:47
Ditto.
nweiz
2013/01/16 00:12:10
Done.
| |
148 if (_isComplete) _completeCompleter.complete(); | |
149 } | |
150 } | |
151 | |
152 /// A [Future] wrapper that keeps track of whether it's been completed and | |
153 /// whether it has any listeners. It also notifies its parent [ErrorGroup] when | |
154 /// it completes successfully or receives an error. | |
155 class _ErrorGroupFuture implements Future { | |
156 /// The parent [ErrorGroup]. | |
157 final ErrorGroup _group; | |
158 | |
159 /// Whether [this] has completed, either successfully or with an error. | |
160 var _isComplete = false; | |
161 | |
162 /// The underlying [Completer] for [this]. | |
163 final _completer = new Completer(); | |
164 | |
165 /// Whether [this] has any listeners. | |
166 bool _hasListeners = false; | |
167 | |
168 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps | |
169 /// [inner]. | |
170 _ErrorGroupFuture(this._group, Future inner) { | |
171 inner.then((value) { | |
172 if (!_isComplete) _completer.complete(value); | |
173 _isComplete = true; | |
174 _group._signalFutureComplete(this); | |
175 }).catchError((error) => _group._signalError(error)); | |
176 | |
177 // Make sure _completer.future doesn't automatically send errors to the | |
178 // top-level. | |
179 _completer.future.catchError((_) {}); | |
180 } | |
181 | |
182 Future then(onValue(T value), {onError(AsyncError asyncError)}) { | |
183 _hasListeners = true; | |
184 return _completer.future.then(onValue, onError: onError); | |
185 } | |
186 | |
187 Future catchError(onError(AsyncError asyncError), {bool test(Object error)}) { | |
188 _hasListeners = true; | |
189 return _completer.future.catchError(onError, test: test); | |
190 } | |
191 | |
192 Future whenComplete(void action()) { | |
193 _hasListeners = true; | |
194 return _completer.future.whenComplete(action); | |
195 } | |
196 | |
197 Stream<T> asStream() { | |
198 _hasListeners = true; | |
199 return _completer.future.asStream(); | |
200 } | |
201 | |
202 /// Signal that an error from [_group] should be propagated through [this], | |
203 /// unless it's already complete. | |
204 void _signalError(AsyncError error) { | |
205 if (!_isComplete) _completer.completeError(error.error, error.stackTrace); | |
206 _isComplete = true; | |
207 } | |
208 } | |
209 | |
210 // TODO(nweiz): currently streams never top-level unhandled errors (issue 7843). | |
211 // When this is fixed, this class will need to prevent such errors from being | |
212 // top-leveled. | |
213 /// A [Stream] wrapper that keeps track of whether it's been completed and | |
214 /// whether it has any listeners. It also notifies its parent [ErrorGroup] when | |
215 /// it completes successfully or receives an error. | |
216 class _ErrorGroupStream extends Stream { | |
217 /// The parent [ErrorGroup]. | |
218 final ErrorGroup _group; | |
219 | |
220 /// Whether [this] has completed, either successfully or with an error. | |
221 var _isComplete = false; | |
222 | |
223 /// The underlying [StreamController] for [this]. | |
224 final StreamController _controller; | |
225 | |
226 /// The [StreamSubscription] that connects the wrapped [Stream] to | |
227 /// [_controller]. | |
228 StreamSubscription _subscription; | |
229 | |
230 /// Whether [this] has any listeners. | |
231 bool get _hasListeners => _controller.hasSubscribers; | |
232 | |
233 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps | |
234 /// [inner]. | |
235 _ErrorGroupStream(this._group, Stream inner) | |
236 : _controller = inner.isSingleSubscription ? | |
237 new StreamController() : | |
238 new StreamController.multiSubscription() { | |
239 _subscription = inner.listen(_controller.add, | |
240 onError: (e) => _group._signalError(e), | |
241 onDone: () { | |
242 _isComplete = true; | |
243 _group._signalStreamComplete(this); | |
244 _controller.close(); | |
245 }); | |
246 } | |
247 | |
248 StreamSubscription listen(void onData(value), | |
249 {void onError(AsyncError error), void onDone(), | |
250 bool unsubscribeOnError}) { | |
251 return _controller.listen(onData, | |
252 onError: onError, | |
253 onDone: onDone, | |
254 unsubscribeOnError: true); | |
255 } | |
256 | |
257 /// Signal that an error from [_group] should be propagated through [this], | |
258 /// unless it's already complete. | |
259 void _signalError(AsyncError e) { | |
260 if (_isComplete) return; | |
261 _subscription.cancel(); | |
262 // Call these asynchronously to work around issue 7913. | |
263 new Future.immediate(null).then((_) { | |
264 _controller.signalError(e.error, e.stackTrace); | |
265 _controller.close(); | |
266 }); | |
267 } | |
268 } | |
OLD | NEW |