Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: lib/src/error_group.dart

Issue 2184303002: Make pub strong-mode clean. (Closed) Base URL: git@github.com:dart-lang/pub.git@master
Patch Set: Code review changes Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/src/entrypoint.dart ('k') | lib/src/executable.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 import 'dart:async'; 5 import 'dart:async';
6 6
7 /// An [ErrorGroup] entangles the errors of multiple [Future]s and [Stream]s 7 /// An [ErrorGroup] entangles the errors of multiple [Future]s and [Stream]s
8 /// with one another. 8 /// with one another.
9 /// 9 ///
10 /// This allows APIs to expose multiple [Future]s and [Stream]s that have 10 /// This allows APIs to expose multiple [Future]s and [Stream]s that have
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
58 ErrorGroup() { 58 ErrorGroup() {
59 this._done = new _ErrorGroupFuture(this, _doneCompleter.future); 59 this._done = new _ErrorGroupFuture(this, _doneCompleter.future);
60 } 60 }
61 61
62 /// Registers a [Future] as a member of [this]. 62 /// Registers a [Future] as a member of [this].
63 /// 63 ///
64 /// Returns a wrapped version of [future] that should be used in its place. 64 /// Returns a wrapped version of [future] that should be used in its place.
65 /// 65 ///
66 /// If all members of [this] have already completed successfully or with an 66 /// If all members of [this] have already completed successfully or with an
67 /// error, it's a [StateError] to try to register a new [Future]. 67 /// error, it's a [StateError] to try to register a new [Future].
68 Future registerFuture(Future future) { 68 Future/*<T>*/ registerFuture/*<T>*/(Future/*<T>*/ future) {
69 if (_isDone) { 69 if (_isDone) {
70 throw new StateError("Can't register new members on a complete " 70 throw new StateError("Can't register new members on a complete "
71 "ErrorGroup."); 71 "ErrorGroup.");
72 } 72 }
73 73
74 var wrapped = new _ErrorGroupFuture(this, future); 74 var wrapped = new _ErrorGroupFuture(this, future);
75 _futures.add(wrapped); 75 _futures.add(wrapped);
76 return wrapped; 76 return wrapped;
77 } 77 }
78 78
79 /// Registers a [Stream] as a member of [this]. 79 /// Registers a [Stream] as a member of [this].
80 /// 80 ///
81 /// Returns a wrapped version of [stream] that should be used in its place. 81 /// Returns a wrapped version of [stream] that should be used in its place.
82 /// The returned [Stream] will be multi-subscription if and only if [stream] 82 /// The returned [Stream] will be multi-subscription if and only if [stream]
83 /// is. 83 /// is.
84 /// 84 ///
85 /// Since all errors in a group are passed to all members, the returned 85 /// Since all errors in a group are passed to all members, the returned
86 /// [Stream] will automatically unsubscribe all its listeners when it 86 /// [Stream] will automatically unsubscribe all its listeners when it
87 /// encounters an error. 87 /// encounters an error.
88 /// 88 ///
89 /// If all members of [this] have already completed successfully or with an 89 /// If all members of [this] have already completed successfully or with an
90 /// error, it's a [StateError] to try to register a new [Stream]. 90 /// error, it's a [StateError] to try to register a new [Stream].
91 Stream registerStream(Stream stream) { 91 Stream/*<T>*/ registerStream/*<T>*/(Stream/*<T>*/ stream) {
92 if (_isDone) { 92 if (_isDone) {
93 throw new StateError("Can't register new members on a complete " 93 throw new StateError("Can't register new members on a complete "
94 "ErrorGroup."); 94 "ErrorGroup.");
95 } 95 }
96 96
97 var wrapped = new _ErrorGroupStream(this, stream); 97 var wrapped = new _ErrorGroupStream(this, stream);
98 _streams.add(wrapped); 98 _streams.add(wrapped);
99 return wrapped; 99 return wrapped;
100 } 100 }
101 101
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
154 _streams.every((stream) => stream._isDone); 154 _streams.every((stream) => stream._isDone);
155 if (_isDone) _doneCompleter.complete(); 155 if (_isDone) _doneCompleter.complete();
156 } 156 }
157 } 157 }
158 158
159 /// A [Future] wrapper that keeps track of whether it's been completed and 159 /// A [Future] wrapper that keeps track of whether it's been completed and
160 /// whether it has any listeners. 160 /// whether it has any listeners.
161 /// 161 ///
162 /// It also notifies its parent [ErrorGroup] when it completes successfully or 162 /// It also notifies its parent [ErrorGroup] when it completes successfully or
163 /// receives an error. 163 /// receives an error.
164 class _ErrorGroupFuture implements Future { 164 class _ErrorGroupFuture<T> implements Future<T> {
165 /// The parent [ErrorGroup]. 165 /// The parent [ErrorGroup].
166 final ErrorGroup _group; 166 final ErrorGroup _group;
167 167
168 /// Whether [this] has completed, either successfully or with an error. 168 /// Whether [this] has completed, either successfully or with an error.
169 var _isDone = false; 169 var _isDone = false;
170 170
171 /// The underlying [Completer] for [this]. 171 /// The underlying [Completer] for [this].
172 final _completer = new Completer(); 172 final _completer = new Completer<T>();
173 173
174 /// Whether [this] has any listeners. 174 /// Whether [this] has any listeners.
175 bool _hasListeners = false; 175 bool _hasListeners = false;
176 176
177 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps 177 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps
178 /// [inner]. 178 /// [inner].
179 _ErrorGroupFuture(this._group, Future inner) { 179 _ErrorGroupFuture(this._group, Future<T> inner) {
180 inner.then((value) { 180 inner.then((value) {
181 if (!_isDone) _completer.complete(value); 181 if (!_isDone) _completer.complete(value);
182 _isDone = true; 182 _isDone = true;
183 _group._signalFutureComplete(this); 183 _group._signalFutureComplete(this);
184 }).catchError(_group._signalError); 184 }).catchError(_group._signalError);
185 185
186 // Make sure _completer.future doesn't automatically send errors to the 186 // Make sure _completer.future doesn't automatically send errors to the
187 // top-level. 187 // top-level.
188 _completer.future.catchError((_) {}); 188 _completer.future.catchError((_) {});
189 } 189 }
190 190
191 Future then(onValue(value), {Function onError}) { 191 Future/*<S>*/ then/*<S>*/(/*=S*/ onValue(/*=T*/ value), {Function onError}) {
192 _hasListeners = true; 192 _hasListeners = true;
193 return _completer.future.then(onValue, onError: onError); 193 return _completer.future.then(onValue, onError: onError);
194 } 194 }
195 195
196 Future catchError(Function onError, {bool test(Object error)}) { 196 Future<T> catchError(Function onError, {bool test(Object error)}) {
197 _hasListeners = true; 197 _hasListeners = true;
198 return _completer.future.catchError(onError, test: test); 198 return _completer.future.catchError(onError, test: test);
199 } 199 }
200 200
201 Future whenComplete(void action()) { 201 Future<T> whenComplete(void action()) {
202 _hasListeners = true; 202 _hasListeners = true;
203 return _completer.future.whenComplete(action); 203 return _completer.future.whenComplete(action);
204 } 204 }
205 205
206 Future timeout(Duration timeLimit, {void onTimeout()}) { 206 Future<T> timeout(Duration timeLimit, {onTimeout()}) {
207 _hasListeners = true; 207 _hasListeners = true;
208 return _completer.future.timeout(timeLimit, onTimeout: onTimeout); 208 return _completer.future.timeout(timeLimit, onTimeout: onTimeout);
209 } 209 }
210 210
211 Stream asStream() { 211 Stream<T> asStream() {
212 _hasListeners = true; 212 _hasListeners = true;
213 return _completer.future.asStream(); 213 return _completer.future.asStream();
214 } 214 }
215 215
216 /// Signal that an error from [_group] should be propagated through [this], 216 /// Signal that an error from [_group] should be propagated through [this],
217 /// unless it's already complete. 217 /// unless it's already complete.
218 void _signalError(var error, [StackTrace stackTrace]) { 218 void _signalError(var error, [StackTrace stackTrace]) {
219 if (!_isDone) _completer.completeError(error, stackTrace); 219 if (!_isDone) _completer.completeError(error, stackTrace);
220 _isDone = true; 220 _isDone = true;
221 } 221 }
222 } 222 }
223 223
224 // TODO(nweiz): currently streams never top-level unhandled errors (issue 7843). 224 // TODO(nweiz): currently streams never top-level unhandled errors (issue 7843).
225 // When this is fixed, this class will need to prevent such errors from being 225 // When this is fixed, this class will need to prevent such errors from being
226 // top-leveled. 226 // top-leveled.
227 /// A [Stream] wrapper that keeps track of whether it's been completed and 227 /// A [Stream] wrapper that keeps track of whether it's been completed and
228 /// whether it has any listeners. 228 /// whether it has any listeners.
229 /// 229 ///
230 /// It also notifies its parent [ErrorGroup] when it completes successfully or 230 /// It also notifies its parent [ErrorGroup] when it completes successfully or
231 /// receives an error. 231 /// receives an error.
232 class _ErrorGroupStream extends Stream { 232 class _ErrorGroupStream<T> extends Stream<T> {
233 /// The parent [ErrorGroup]. 233 /// The parent [ErrorGroup].
234 final ErrorGroup _group; 234 final ErrorGroup _group;
235 235
236 /// Whether [this] has completed, either successfully or with an error. 236 /// Whether [this] has completed, either successfully or with an error.
237 var _isDone = false; 237 var _isDone = false;
238 238
239 /// The underlying [StreamController] for [this]. 239 /// The underlying [StreamController] for [this].
240 final StreamController _controller; 240 final StreamController<T> _controller;
241 241
242 /// The controller's [Stream]. 242 /// The controller's [Stream].
243 /// 243 ///
244 /// May be different than `_controller.stream` if the wrapped stream is a 244 /// May be different than `_controller.stream` if the wrapped stream is a
245 /// broadcasting stream. 245 /// broadcasting stream.
246 Stream _stream; 246 Stream<T> _stream;
247 247
248 /// The [StreamSubscription] that connects the wrapped [Stream] to 248 /// The [StreamSubscription] that connects the wrapped [Stream] to
249 /// [_controller]. 249 /// [_controller].
250 StreamSubscription _subscription; 250 StreamSubscription<T> _subscription;
251 251
252 /// Whether [this] has any listeners. 252 /// Whether [this] has any listeners.
253 bool get _hasListeners => _controller.hasListener; 253 bool get _hasListeners => _controller.hasListener;
254 254
255 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps 255 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps
256 /// [inner]. 256 /// [inner].
257 _ErrorGroupStream(this._group, Stream inner) 257 _ErrorGroupStream(this._group, Stream<T> inner)
258 : _controller = new StreamController(sync: true) { 258 : _controller = new StreamController(sync: true) {
259 // Use old-style asBroadcastStream behavior - cancel source _subscription 259 // Use old-style asBroadcastStream behavior - cancel source _subscription
260 // the first time the stream has no listeners. 260 // the first time the stream has no listeners.
261 _stream = inner.isBroadcast 261 _stream = inner.isBroadcast
262 ? _controller.stream.asBroadcastStream(onCancel: (sub) => sub.cancel()) 262 ? _controller.stream.asBroadcastStream(onCancel: (sub) => sub.cancel())
263 : _controller.stream; 263 : _controller.stream;
264 _subscription = inner.listen((v) { 264 _subscription = inner.listen((v) {
265 _controller.add(v); 265 _controller.add(v);
266 }, onError: (e, [stackTrace]) { 266 }, onError: (e, [stackTrace]) {
267 _group._signalError(e, stackTrace); 267 _group._signalError(e, stackTrace);
268 }, onDone: () { 268 }, onDone: () {
269 _isDone = true; 269 _isDone = true;
270 _group._signalStreamComplete(this); 270 _group._signalStreamComplete(this);
271 _controller.close(); 271 _controller.close();
272 }); 272 });
273 } 273 }
274 274
275 StreamSubscription listen(void onData(value), 275 StreamSubscription<T> listen(void onData(T value),
276 {Function onError, void onDone(), 276 {Function onError, void onDone(),
277 bool cancelOnError}) { 277 bool cancelOnError}) {
278 return _stream.listen(onData, 278 return _stream.listen(onData,
279 onError: onError, 279 onError: onError,
280 onDone: onDone, 280 onDone: onDone,
281 cancelOnError: true); 281 cancelOnError: true);
282 } 282 }
283 283
284 /// Signal that an error from [_group] should be propagated through [this], 284 /// Signal that an error from [_group] should be propagated through [this],
285 /// unless it's already complete. 285 /// unless it's already complete.
286 void _signalError(var e, [StackTrace stackTrace]) { 286 void _signalError(var e, [StackTrace stackTrace]) {
287 if (_isDone) return; 287 if (_isDone) return;
288 _subscription.cancel(); 288 _subscription.cancel();
289 // Call these asynchronously to work around issue 7913. 289 // Call these asynchronously to work around issue 7913.
290 new Future.value().then((_) { 290 new Future.value().then((_) {
291 _controller.addError(e, stackTrace); 291 _controller.addError(e, stackTrace);
292 _controller.close(); 292 _controller.close();
293 }); 293 });
294 } 294 }
295 } 295 }
OLDNEW
« no previous file with comments | « lib/src/entrypoint.dart ('k') | lib/src/executable.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698