OLD | NEW |
| (Empty) |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library error_group; | |
6 | |
7 import 'dart:async'; | |
8 | |
9 import 'utils.dart'; | |
10 | |
11 /// An [ErrorGroup] entangles the errors of multiple [Future]s and [Stream]s | |
12 /// with one another. This allows APIs to expose multiple [Future]s and | |
13 /// [Stream]s that have identical error conditions without forcing API consumers | |
14 /// to attach error handling to objects they don't care about. | |
15 /// | |
16 /// To use an [ErrorGroup], register [Future]s and [Stream]s with it using | |
17 /// [registerFuture] and [registerStream]. These methods return wrapped versions | |
18 /// of the [Future]s and [Stream]s, which should then be used in place of the | |
19 /// originals. For example: | |
20 /// | |
21 /// var errorGroup = new ErrorGroup(); | |
22 /// future = errorGroup.registerFuture(future); | |
23 /// stream = errorGroup.registerStream(stream); | |
24 /// | |
25 /// An [ErrorGroup] has two major effects on its wrapped members: | |
26 /// | |
27 /// * An error in any member of the group will be propagated to every member | |
28 /// that hasn't already completed. If those members later complete, their | |
29 /// values will be ignored. | |
30 /// * If any member of this group has a listener, errors on members without | |
31 /// listeners won't get passed to the top-level error handler. | |
32 class ErrorGroup { | |
33 /// The [Future]s that are members of [this]. | |
34 final _futures = <_ErrorGroupFuture>[]; | |
35 | |
36 /// The [Stream]s that are members of [this]. | |
37 final _streams = <_ErrorGroupStream>[]; | |
38 | |
39 /// Whether [this] has completed, either successfully or with an error. | |
40 var _isDone = false; | |
41 | |
42 /// The [Completer] for [done]. | |
43 final _doneCompleter = new Completer(); | |
44 | |
45 /// The underlying [Future] for [done]. We need to be able to access it | |
46 /// internally as an [_ErrorGroupFuture] so we can check if it has listeners | |
47 /// and signal errors on it. | |
48 _ErrorGroupFuture _done; | |
49 | |
50 /// Returns a [Future] that completes successully when all members of [this] | |
51 /// are complete, or with an error if any member receives an error. | |
52 /// | |
53 /// This [Future] is effectively in the group in that an error on it won't be | |
54 /// passed to the top-level error handler unless no members of the group have | |
55 /// listeners attached. | |
56 Future get done => _done; | |
57 | |
58 /// Creates a new group with no members. | |
59 ErrorGroup() { | |
60 this._done = new _ErrorGroupFuture(this, _doneCompleter.future); | |
61 } | |
62 | |
63 /// Registers a [Future] as a member of [this]. Returns a wrapped version of | |
64 /// [future] that should be used in its place. | |
65 /// | |
66 /// If all members of [this] have already completed successfully or with an | |
67 /// error, it's a [StateError] to try to register a new [Future]. | |
68 Future registerFuture(Future future) { | |
69 if (_isDone) { | |
70 throw new StateError("Can't register new members on a complete " | |
71 "ErrorGroup."); | |
72 } | |
73 | |
74 var wrapped = new _ErrorGroupFuture(this, future); | |
75 _futures.add(wrapped); | |
76 return wrapped; | |
77 } | |
78 | |
79 /// Registers a [Stream] as a member of [this]. Returns a wrapped version of | |
80 /// [stream] that should be used in its place. The returned [Stream] will be | |
81 /// multi-subscription if and only if [stream] is. | |
82 /// | |
83 /// Since all errors in a group are passed to all members, the returned | |
84 /// [Stream] will automatically unsubscribe all its listeners when it | |
85 /// encounters an error. | |
86 /// | |
87 /// If all members of [this] have already completed successfully or with an | |
88 /// error, it's a [StateError] to try to register a new [Stream]. | |
89 Stream registerStream(Stream stream) { | |
90 if (_isDone) { | |
91 throw new StateError("Can't register new members on a complete " | |
92 "ErrorGroup."); | |
93 } | |
94 | |
95 var wrapped = new _ErrorGroupStream(this, stream); | |
96 _streams.add(wrapped); | |
97 return wrapped; | |
98 } | |
99 | |
100 /// Sends [error] to all members of [this]. Like errors that come from | |
101 /// members, this will only be passed to the top-level error handler if no | |
102 /// members have listeners. | |
103 /// | |
104 /// If all members of [this] have already completed successfully or with an | |
105 /// error, it's a [StateError] to try to signal an error. | |
106 void signalError(var error) { | |
107 if (_isDone) { | |
108 throw new StateError("Can't signal errors on a complete ErrorGroup."); | |
109 } | |
110 | |
111 _signalError(error); | |
112 } | |
113 | |
114 /// Signal an error internally. This is just like [signalError], but instead | |
115 /// of throwing an error if [this] is complete, it just does nothing. | |
116 void _signalError(var error) { | |
117 if (_isDone) return; | |
118 | |
119 var caught = false; | |
120 for (var future in _futures) { | |
121 if (future._isDone || future._hasListeners) caught = true; | |
122 future._signalError(error); | |
123 } | |
124 | |
125 for (var stream in _streams) { | |
126 if (stream._isDone || stream._hasListeners) caught = true; | |
127 stream._signalError(error); | |
128 } | |
129 | |
130 _isDone = true; | |
131 _done._signalError(error); | |
132 if (!caught && !_done._hasListeners) error.throwDelayed(); | |
133 } | |
134 | |
135 /// Notifies [this] that one of its member [Future]s is complete. | |
136 void _signalFutureComplete(_ErrorGroupFuture future) { | |
137 if (_isDone) return; | |
138 | |
139 _isDone = _futures.every((future) => future._isDone) && | |
140 _streams.every((stream) => stream._isDone); | |
141 if (_isDone) _doneCompleter.complete(); | |
142 } | |
143 | |
144 /// Notifies [this] that one of its member [Stream]s is complete. | |
145 void _signalStreamComplete(_ErrorGroupStream stream) { | |
146 if (_isDone) return; | |
147 | |
148 _isDone = _futures.every((future) => future._isDone) && | |
149 _streams.every((stream) => stream._isDone); | |
150 if (_isDone) _doneCompleter.complete(); | |
151 } | |
152 } | |
153 | |
154 /// A [Future] wrapper that keeps track of whether it's been completed and | |
155 /// whether it has any listeners. It also notifies its parent [ErrorGroup] when | |
156 /// it completes successfully or receives an error. | |
157 class _ErrorGroupFuture implements Future { | |
158 /// The parent [ErrorGroup]. | |
159 final ErrorGroup _group; | |
160 | |
161 /// Whether [this] has completed, either successfully or with an error. | |
162 var _isDone = false; | |
163 | |
164 /// The underlying [Completer] for [this]. | |
165 final _completer = new Completer(); | |
166 | |
167 /// Whether [this] has any listeners. | |
168 bool _hasListeners = false; | |
169 | |
170 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps | |
171 /// [inner]. | |
172 _ErrorGroupFuture(this._group, Future inner) { | |
173 inner.then((value) { | |
174 if (!_isDone) _completer.complete(value); | |
175 _isDone = true; | |
176 _group._signalFutureComplete(this); | |
177 }).catchError((error) => _group._signalError(error)); | |
178 | |
179 // Make sure _completer.future doesn't automatically send errors to the | |
180 // top-level. | |
181 _completer.future.catchError((_) {}); | |
182 } | |
183 | |
184 Future then(onValue(value), {onError(error)}) { | |
185 _hasListeners = true; | |
186 return _completer.future.then(onValue, onError: onError); | |
187 } | |
188 | |
189 Future catchError(onError(error), {bool test(Object error)}) { | |
190 _hasListeners = true; | |
191 return _completer.future.catchError(onError, test: test); | |
192 } | |
193 | |
194 Future whenComplete(void action()) { | |
195 _hasListeners = true; | |
196 return _completer.future.whenComplete(action); | |
197 } | |
198 | |
199 Stream asStream() { | |
200 _hasListeners = true; | |
201 return _completer.future.asStream(); | |
202 } | |
203 | |
204 /// Signal that an error from [_group] should be propagated through [this], | |
205 /// unless it's already complete. | |
206 void _signalError(var error) { | |
207 if (!_isDone) _completer.completeError(error); | |
208 _isDone = true; | |
209 } | |
210 } | |
211 | |
212 // TODO(nweiz): currently streams never top-level unhandled errors (issue 7843). | |
213 // When this is fixed, this class will need to prevent such errors from being | |
214 // top-leveled. | |
215 /// A [Stream] wrapper that keeps track of whether it's been completed and | |
216 /// whether it has any listeners. It also notifies its parent [ErrorGroup] when | |
217 /// it completes successfully or receives an error. | |
218 class _ErrorGroupStream extends Stream { | |
219 /// The parent [ErrorGroup]. | |
220 final ErrorGroup _group; | |
221 | |
222 /// Whether [this] has completed, either successfully or with an error. | |
223 var _isDone = false; | |
224 | |
225 /// The underlying [StreamController] for [this]. | |
226 final StreamController _controller; | |
227 | |
228 /// The controller's [Stream]. May be different than `_controller.stream` if | |
229 /// the wrapped stream is a broadcasting stream. | |
230 Stream _stream; | |
231 | |
232 /// The [StreamSubscription] that connects the wrapped [Stream] to | |
233 /// [_controller]. | |
234 StreamSubscription _subscription; | |
235 | |
236 /// Whether [this] has any listeners. | |
237 bool get _hasListeners => _controller.hasListener; | |
238 | |
239 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps | |
240 /// [inner]. | |
241 _ErrorGroupStream(this._group, Stream inner) | |
242 : _controller = new StreamController() { | |
243 this._stream = inner.isBroadcast | |
244 ? _controller.stream.asBroadcastStream() | |
245 : _controller.stream; | |
246 _subscription = inner.listen((v) { | |
247 _controller.add(v); | |
248 }, onError: (e) { | |
249 _group._signalError(e); | |
250 }, onDone: () { | |
251 _isDone = true; | |
252 _group._signalStreamComplete(this); | |
253 _controller.close(); | |
254 }); | |
255 } | |
256 | |
257 StreamSubscription listen(void onData(value), | |
258 {void onError(var error), void onDone(), | |
259 bool cancelOnError}) { | |
260 return _stream.listen(onData, | |
261 onError: onError, | |
262 onDone: onDone, | |
263 cancelOnError: true); | |
264 } | |
265 | |
266 /// Signal that an error from [_group] should be propagated through [this], | |
267 /// unless it's already complete. | |
268 void _signalError(var e) { | |
269 if (_isDone) return; | |
270 _subscription.cancel(); | |
271 // Call these asynchronously to work around issue 7913. | |
272 new Future.value().then((_) { | |
273 _controller.addError(e); | |
274 _controller.close(); | |
275 }); | |
276 } | |
277 } | |
OLD | NEW |