Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(269)

Side by Side Diff: utils/pub/error_group.dart

Issue 11941003: Add an ErrorGroup class to Pub. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library error_group;
6
7 import 'dart:async';
8
9 /// An [ErrorGroup] entangles the errors of multiple [Future]s and [Stream]s
10 /// with one another. This allows APIs to expose multiple [Future]s and
11 /// [Stream]s that have identical error conditions without forcing API consumers
12 /// to attach error handling to objects they don't care about.
13 ///
14 /// To use an [ErrorGroup], register [Future]s and [Stream]s with it using
15 /// [registerFuture] and [registerStream]. These methods return wrapped versions
16 /// of the [Future]s and [Stream]s, which should then be used in place of the
17 /// originals. For example:
18 ///
19 /// var errorGroup = new ErrorGroup();
20 /// future = errorGroup.registerFuture(future);
21 /// stream = errorGroup.registerStream(stream);
22 ///
23 /// An [ErrorGroup] has two major effects on its wrapped members:
24 ///
25 /// * An error in any member of the group will be propagated to every member
26 /// that hasn't already completed. If those members later complete, their
27 /// values will be ignored.
28 /// * If any member of this group has a listener, errors on members without
29 /// listeners won't get passed to the top-level error handler.
30 class ErrorGroup {
31 /// The [Future]s that are members of [this].
32 final _futures = <_ErrorGroupFuture>[];
33
34 /// The [Stream]s that are members of [this].
35 final _streams = <_ErrorGroupStream>[];
36
37 /// Whether [this] has completed, either successfully or with an error.
38 var _isComplete = false;
Bob Nystrom 2013/01/15 22:49:47 var -> bool
nweiz 2013/01/16 00:12:10 You've told me in the past not to use type declara
Bob Nystrom 2013/01/16 18:03:44 I think the rough guideline we've followed is to d
nweiz 2013/01/16 19:29:11 ...why? It's no less inferable in the "var" case t
39
40 /// The [Completer] for [complete].
41 final _completeCompleter = new Completer();
42
43 /// The underlying [Future] for [complete]. We need to be able to access it
44 /// internally as an [_ErrorGroupFuture] so we can check if it has listeners
45 /// and signal errors on it.
46 _ErrorGroupFuture _complete;
47
48 /// Returns a [Future] that completes successully when all members of [this]
49 /// are complete, or with an error if any member receives an error.
50 ///
51 /// This [Future] is effectively in the group in that an error on it won't be
52 /// passed to the top-level error handler unless no members of the group have
53 /// listeners attached.
54 Future get complete => _complete;
Bob Nystrom 2013/01/15 22:49:47 "completed"? The name "complete" seems odd to me h
nweiz 2013/01/16 00:12:10 Changed to "done" as per offline discussion.
55
56 /// Creates a new group with no members.
57 ErrorGroup() {
58 this._complete = new _ErrorGroupFuture(this, _completeCompleter.future);
59 }
60
61 /// Registers a [Future] as a member of [this]. Returns a wrapped version of
62 /// [future] that should be used in its place.
Bob Nystrom 2013/01/15 22:49:47 Document that it's an error to register an already
nweiz 2013/01/16 00:12:10 It's not. It's a little silly, but it'll work fine
63 ///
64 /// If all members of [this] have already completed successfully or with an
65 /// error, it's a [StateError] to try to register a new [Future].
66 Future registerFuture(Future future) {
Bob Nystrom 2013/01/15 22:49:47 What do you think of just "addFuture" instead of "
nweiz 2013/01/16 00:12:10 I think that makes it sound a little too much like
67 if (_isComplete) {
68 throw new StateError("Can't register new members on a complete "
69 "ErrorGroup.");
70 }
71
72 var wrapped = new _ErrorGroupFuture(this, future);
73 _futures.add(wrapped);
74 return wrapped;
75 }
76
77 /// Registers a [Stream] as a member of [this]. Returns a wrapped version of
78 /// [stream] that should be used in its place. The returned [Stream] will be
79 /// multi-subscription if and only if [stream] is.
80 ///
81 /// Since all errors in a group are passed to all members, the returned
82 /// [Stream] will automatically unsubscribe all its listeners when it
83 /// encounters an error.
84 ///
85 /// If all members of [this] have already completed successfully or with an
86 /// error, it's a [StateError] to try to register a new [Stream].
Bob Nystrom 2013/01/15 22:49:47 Document that it's an error to register an already
nweiz 2013/01/16 00:12:10 See above.
87 Stream registerStream(Stream stream) {
88 if (_isComplete) {
89 throw new StateError("Can't register new members on a complete "
90 "ErrorGroup.");
91 }
92
93 var wrapped = new _ErrorGroupStream(this, stream);
94 _streams.add(wrapped);
95 return wrapped;
96 }
97
98 /// Sends [error] to all members of [this]. Like errors that come from
99 /// members, this will only be passed to the top-level error handler if no
100 /// members have listeners.
101 ///
102 /// If all members of [this] have already completed successfully or with an
103 /// error, it's a [StateError] to try to signal an error.
104 void signalError(AsyncError error) {
105 if (_isComplete) {
106 throw new StateError("Can't signal errors on a complete ErrorGroup.");
107 }
108
109 _signalError(error);
110 }
111
112 /// Signal an error internally. This is just like [signalError], but instead
113 /// of throwing an error if [this] is complete, it just does nothing.
114 void _signalError(AsyncError error) {
115 if (_isComplete) return;
116
117 var caught = false;
118 for (var future in _futures) {
119 if (future._isComplete || future._hasListeners) caught = true;
120 future._signalError(error);
121 }
122
123 for (var stream in _streams) {
124 if (stream._isComplete || stream._hasListeners) caught = true;
125 stream._signalError(error);
126 }
127
128 _isComplete = true;
129 _complete._signalError(error);
130 if (!caught && !_complete._hasListeners) error.throwDelayed();
131 }
132
133 /// Notifies [this] that one of its member [Future]s is complete.
134 void _signalFutureComplete(_ErrorGroupFuture future) {
135 if (_isComplete) return;
136
137 _isComplete = _futures.every((future) => future._isComplete) &&
138 _streams.every((stream) => stream._isComplete);
Bob Nystrom 2013/01/15 22:49:47 Indent another 2.
nweiz 2013/01/16 00:12:10 Done.
139 if (_isComplete) _completeCompleter.complete();
140 }
141
142 /// Notifies [this] that one of its member [Stream]s is complete.
143 void _signalStreamComplete(_ErrorGroupStream stream) {
144 if (_isComplete) return;
145
146 _isComplete = _futures.every((future) => future._isComplete) &&
147 _streams.every((stream) => stream._isComplete);
Bob Nystrom 2013/01/15 22:49:47 Ditto.
nweiz 2013/01/16 00:12:10 Done.
148 if (_isComplete) _completeCompleter.complete();
149 }
150 }
151
152 /// A [Future] wrapper that keeps track of whether it's been completed and
153 /// whether it has any listeners. It also notifies its parent [ErrorGroup] when
154 /// it completes successfully or receives an error.
155 class _ErrorGroupFuture implements Future {
156 /// The parent [ErrorGroup].
157 final ErrorGroup _group;
158
159 /// Whether [this] has completed, either successfully or with an error.
160 var _isComplete = false;
161
162 /// The underlying [Completer] for [this].
163 final _completer = new Completer();
164
165 /// Whether [this] has any listeners.
166 bool _hasListeners = false;
167
168 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps
169 /// [inner].
170 _ErrorGroupFuture(this._group, Future inner) {
171 inner.then((value) {
172 if (!_isComplete) _completer.complete(value);
173 _isComplete = true;
174 _group._signalFutureComplete(this);
175 }).catchError((error) => _group._signalError(error));
176
177 // Make sure _completer.future doesn't automatically send errors to the
178 // top-level.
179 _completer.future.catchError((_) {});
180 }
181
182 Future then(onValue(T value), {onError(AsyncError asyncError)}) {
183 _hasListeners = true;
184 return _completer.future.then(onValue, onError: onError);
185 }
186
187 Future catchError(onError(AsyncError asyncError), {bool test(Object error)}) {
188 _hasListeners = true;
189 return _completer.future.catchError(onError, test: test);
190 }
191
192 Future whenComplete(void action()) {
193 _hasListeners = true;
194 return _completer.future.whenComplete(action);
195 }
196
197 Stream<T> asStream() {
198 _hasListeners = true;
199 return _completer.future.asStream();
200 }
201
202 /// Signal that an error from [_group] should be propagated through [this],
203 /// unless it's already complete.
204 void _signalError(AsyncError error) {
205 if (!_isComplete) _completer.completeError(error.error, error.stackTrace);
206 _isComplete = true;
207 }
208 }
209
210 // TODO(nweiz): currently streams never top-level unhandled errors (issue 7843).
211 // When this is fixed, this class will need to prevent such errors from being
212 // top-leveled.
213 /// A [Stream] wrapper that keeps track of whether it's been completed and
214 /// whether it has any listeners. It also notifies its parent [ErrorGroup] when
215 /// it completes successfully or receives an error.
216 class _ErrorGroupStream extends Stream {
217 /// The parent [ErrorGroup].
218 final ErrorGroup _group;
219
220 /// Whether [this] has completed, either successfully or with an error.
221 var _isComplete = false;
222
223 /// The underlying [StreamController] for [this].
224 final StreamController _controller;
225
226 /// The [StreamSubscription] that connects the wrapped [Stream] to
227 /// [_controller].
228 StreamSubscription _subscription;
229
230 /// Whether [this] has any listeners.
231 bool get _hasListeners => _controller.hasSubscribers;
232
233 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps
234 /// [inner].
235 _ErrorGroupStream(this._group, Stream inner)
236 : _controller = inner.isSingleSubscription ?
237 new StreamController() :
238 new StreamController.multiSubscription() {
239 _subscription = inner.listen(_controller.add,
240 onError: (e) => _group._signalError(e),
241 onDone: () {
242 _isComplete = true;
243 _group._signalStreamComplete(this);
244 _controller.close();
245 });
246 }
247
248 StreamSubscription listen(void onData(value),
249 {void onError(AsyncError error), void onDone(),
250 bool unsubscribeOnError}) {
251 return _controller.listen(onData,
252 onError: onError,
253 onDone: onDone,
254 unsubscribeOnError: true);
255 }
256
257 /// Signal that an error from [_group] should be propagated through [this],
258 /// unless it's already complete.
259 void _signalError(AsyncError e) {
260 if (_isComplete) return;
261 _subscription.cancel();
262 // Call these asynchronously to work around issue 7913.
263 new Future.immediate(null).then((_) {
264 _controller.signalError(e.error, e.stackTrace);
265 _controller.close();
266 });
267 }
268 }
OLDNEW
« no previous file with comments | « no previous file | utils/tests/pub/error_group_test.dart » ('j') | utils/tests/pub/error_group_test.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698