Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(285)

Side by Side Diff: utils/pub/error_group.dart

Issue 14297021: Move pub into sdk/lib/_internal. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Disallow package: imports of pub. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « utils/pub/entrypoint.dart ('k') | utils/pub/exit_codes.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library error_group;
6
7 import 'dart:async';
8
9 import 'utils.dart';
10
11 /// An [ErrorGroup] entangles the errors of multiple [Future]s and [Stream]s
12 /// with one another. This allows APIs to expose multiple [Future]s and
13 /// [Stream]s that have identical error conditions without forcing API consumers
14 /// to attach error handling to objects they don't care about.
15 ///
16 /// To use an [ErrorGroup], register [Future]s and [Stream]s with it using
17 /// [registerFuture] and [registerStream]. These methods return wrapped versions
18 /// of the [Future]s and [Stream]s, which should then be used in place of the
19 /// originals. For example:
20 ///
21 /// var errorGroup = new ErrorGroup();
22 /// future = errorGroup.registerFuture(future);
23 /// stream = errorGroup.registerStream(stream);
24 ///
25 /// An [ErrorGroup] has two major effects on its wrapped members:
26 ///
27 /// * An error in any member of the group will be propagated to every member
28 /// that hasn't already completed. If those members later complete, their
29 /// values will be ignored.
30 /// * If any member of this group has a listener, errors on members without
31 /// listeners won't get passed to the top-level error handler.
32 class ErrorGroup {
33 /// The [Future]s that are members of [this].
34 final _futures = <_ErrorGroupFuture>[];
35
36 /// The [Stream]s that are members of [this].
37 final _streams = <_ErrorGroupStream>[];
38
39 /// Whether [this] has completed, either successfully or with an error.
40 var _isDone = false;
41
42 /// The [Completer] for [done].
43 final _doneCompleter = new Completer();
44
45 /// The underlying [Future] for [done]. We need to be able to access it
46 /// internally as an [_ErrorGroupFuture] so we can check if it has listeners
47 /// and signal errors on it.
48 _ErrorGroupFuture _done;
49
50 /// Returns a [Future] that completes successully when all members of [this]
51 /// are complete, or with an error if any member receives an error.
52 ///
53 /// This [Future] is effectively in the group in that an error on it won't be
54 /// passed to the top-level error handler unless no members of the group have
55 /// listeners attached.
56 Future get done => _done;
57
58 /// Creates a new group with no members.
59 ErrorGroup() {
60 this._done = new _ErrorGroupFuture(this, _doneCompleter.future);
61 }
62
63 /// Registers a [Future] as a member of [this]. Returns a wrapped version of
64 /// [future] that should be used in its place.
65 ///
66 /// If all members of [this] have already completed successfully or with an
67 /// error, it's a [StateError] to try to register a new [Future].
68 Future registerFuture(Future future) {
69 if (_isDone) {
70 throw new StateError("Can't register new members on a complete "
71 "ErrorGroup.");
72 }
73
74 var wrapped = new _ErrorGroupFuture(this, future);
75 _futures.add(wrapped);
76 return wrapped;
77 }
78
79 /// Registers a [Stream] as a member of [this]. Returns a wrapped version of
80 /// [stream] that should be used in its place. The returned [Stream] will be
81 /// multi-subscription if and only if [stream] is.
82 ///
83 /// Since all errors in a group are passed to all members, the returned
84 /// [Stream] will automatically unsubscribe all its listeners when it
85 /// encounters an error.
86 ///
87 /// If all members of [this] have already completed successfully or with an
88 /// error, it's a [StateError] to try to register a new [Stream].
89 Stream registerStream(Stream stream) {
90 if (_isDone) {
91 throw new StateError("Can't register new members on a complete "
92 "ErrorGroup.");
93 }
94
95 var wrapped = new _ErrorGroupStream(this, stream);
96 _streams.add(wrapped);
97 return wrapped;
98 }
99
100 /// Sends [error] to all members of [this]. Like errors that come from
101 /// members, this will only be passed to the top-level error handler if no
102 /// members have listeners.
103 ///
104 /// If all members of [this] have already completed successfully or with an
105 /// error, it's a [StateError] to try to signal an error.
106 void signalError(var error) {
107 if (_isDone) {
108 throw new StateError("Can't signal errors on a complete ErrorGroup.");
109 }
110
111 _signalError(error);
112 }
113
114 /// Signal an error internally. This is just like [signalError], but instead
115 /// of throwing an error if [this] is complete, it just does nothing.
116 void _signalError(var error) {
117 if (_isDone) return;
118
119 var caught = false;
120 for (var future in _futures) {
121 if (future._isDone || future._hasListeners) caught = true;
122 future._signalError(error);
123 }
124
125 for (var stream in _streams) {
126 if (stream._isDone || stream._hasListeners) caught = true;
127 stream._signalError(error);
128 }
129
130 _isDone = true;
131 _done._signalError(error);
132 if (!caught && !_done._hasListeners) error.throwDelayed();
133 }
134
135 /// Notifies [this] that one of its member [Future]s is complete.
136 void _signalFutureComplete(_ErrorGroupFuture future) {
137 if (_isDone) return;
138
139 _isDone = _futures.every((future) => future._isDone) &&
140 _streams.every((stream) => stream._isDone);
141 if (_isDone) _doneCompleter.complete();
142 }
143
144 /// Notifies [this] that one of its member [Stream]s is complete.
145 void _signalStreamComplete(_ErrorGroupStream stream) {
146 if (_isDone) return;
147
148 _isDone = _futures.every((future) => future._isDone) &&
149 _streams.every((stream) => stream._isDone);
150 if (_isDone) _doneCompleter.complete();
151 }
152 }
153
154 /// A [Future] wrapper that keeps track of whether it's been completed and
155 /// whether it has any listeners. It also notifies its parent [ErrorGroup] when
156 /// it completes successfully or receives an error.
157 class _ErrorGroupFuture implements Future {
158 /// The parent [ErrorGroup].
159 final ErrorGroup _group;
160
161 /// Whether [this] has completed, either successfully or with an error.
162 var _isDone = false;
163
164 /// The underlying [Completer] for [this].
165 final _completer = new Completer();
166
167 /// Whether [this] has any listeners.
168 bool _hasListeners = false;
169
170 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps
171 /// [inner].
172 _ErrorGroupFuture(this._group, Future inner) {
173 inner.then((value) {
174 if (!_isDone) _completer.complete(value);
175 _isDone = true;
176 _group._signalFutureComplete(this);
177 }).catchError((error) => _group._signalError(error));
178
179 // Make sure _completer.future doesn't automatically send errors to the
180 // top-level.
181 _completer.future.catchError((_) {});
182 }
183
184 Future then(onValue(value), {onError(error)}) {
185 _hasListeners = true;
186 return _completer.future.then(onValue, onError: onError);
187 }
188
189 Future catchError(onError(error), {bool test(Object error)}) {
190 _hasListeners = true;
191 return _completer.future.catchError(onError, test: test);
192 }
193
194 Future whenComplete(void action()) {
195 _hasListeners = true;
196 return _completer.future.whenComplete(action);
197 }
198
199 Stream asStream() {
200 _hasListeners = true;
201 return _completer.future.asStream();
202 }
203
204 /// Signal that an error from [_group] should be propagated through [this],
205 /// unless it's already complete.
206 void _signalError(var error) {
207 if (!_isDone) _completer.completeError(error);
208 _isDone = true;
209 }
210 }
211
212 // TODO(nweiz): currently streams never top-level unhandled errors (issue 7843).
213 // When this is fixed, this class will need to prevent such errors from being
214 // top-leveled.
215 /// A [Stream] wrapper that keeps track of whether it's been completed and
216 /// whether it has any listeners. It also notifies its parent [ErrorGroup] when
217 /// it completes successfully or receives an error.
218 class _ErrorGroupStream extends Stream {
219 /// The parent [ErrorGroup].
220 final ErrorGroup _group;
221
222 /// Whether [this] has completed, either successfully or with an error.
223 var _isDone = false;
224
225 /// The underlying [StreamController] for [this].
226 final StreamController _controller;
227
228 /// The controller's [Stream]. May be different than `_controller.stream` if
229 /// the wrapped stream is a broadcasting stream.
230 Stream _stream;
231
232 /// The [StreamSubscription] that connects the wrapped [Stream] to
233 /// [_controller].
234 StreamSubscription _subscription;
235
236 /// Whether [this] has any listeners.
237 bool get _hasListeners => _controller.hasListener;
238
239 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps
240 /// [inner].
241 _ErrorGroupStream(this._group, Stream inner)
242 : _controller = new StreamController() {
243 this._stream = inner.isBroadcast
244 ? _controller.stream.asBroadcastStream()
245 : _controller.stream;
246 _subscription = inner.listen((v) {
247 _controller.add(v);
248 }, onError: (e) {
249 _group._signalError(e);
250 }, onDone: () {
251 _isDone = true;
252 _group._signalStreamComplete(this);
253 _controller.close();
254 });
255 }
256
257 StreamSubscription listen(void onData(value),
258 {void onError(var error), void onDone(),
259 bool cancelOnError}) {
260 return _stream.listen(onData,
261 onError: onError,
262 onDone: onDone,
263 cancelOnError: true);
264 }
265
266 /// Signal that an error from [_group] should be propagated through [this],
267 /// unless it's already complete.
268 void _signalError(var e) {
269 if (_isDone) return;
270 _subscription.cancel();
271 // Call these asynchronously to work around issue 7913.
272 new Future.value().then((_) {
273 _controller.addError(e);
274 _controller.close();
275 });
276 }
277 }
OLDNEW
« no previous file with comments | « utils/pub/entrypoint.dart ('k') | utils/pub/exit_codes.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698