Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(472)

Side by Side Diff: sdk/lib/_internal/pub_generated/lib/src/error_group.dart

Issue 887223007: Revert "Use native async/await support in pub." (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library pub.error_group; 5 library pub.error_group;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 8
9 /// An [ErrorGroup] entangles the errors of multiple [Future]s and [Stream]s 9 /// An [ErrorGroup] entangles the errors of multiple [Future]s and [Stream]s
10 /// with one another. 10 /// with one another.
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
62 } 62 }
63 63
64 /// Registers a [Future] as a member of [this]. 64 /// Registers a [Future] as a member of [this].
65 /// 65 ///
66 /// Returns a wrapped version of [future] that should be used in its place. 66 /// Returns a wrapped version of [future] that should be used in its place.
67 /// 67 ///
68 /// If all members of [this] have already completed successfully or with an 68 /// If all members of [this] have already completed successfully or with an
69 /// error, it's a [StateError] to try to register a new [Future]. 69 /// error, it's a [StateError] to try to register a new [Future].
70 Future registerFuture(Future future) { 70 Future registerFuture(Future future) {
71 if (_isDone) { 71 if (_isDone) {
72 throw new StateError("Can't register new members on a complete " 72 throw new StateError(
73 "ErrorGroup."); 73 "Can't register new members on a complete " "ErrorGroup.");
74 } 74 }
75 75
76 var wrapped = new _ErrorGroupFuture(this, future); 76 var wrapped = new _ErrorGroupFuture(this, future);
77 _futures.add(wrapped); 77 _futures.add(wrapped);
78 return wrapped; 78 return wrapped;
79 } 79 }
80 80
81 /// Registers a [Stream] as a member of [this]. 81 /// Registers a [Stream] as a member of [this].
82 /// 82 ///
83 /// Returns a wrapped version of [stream] that should be used in its place. 83 /// Returns a wrapped version of [stream] that should be used in its place.
84 /// The returned [Stream] will be multi-subscription if and only if [stream] 84 /// The returned [Stream] will be multi-subscription if and only if [stream]
85 /// is. 85 /// is.
86 /// 86 ///
87 /// Since all errors in a group are passed to all members, the returned 87 /// Since all errors in a group are passed to all members, the returned
88 /// [Stream] will automatically unsubscribe all its listeners when it 88 /// [Stream] will automatically unsubscribe all its listeners when it
89 /// encounters an error. 89 /// encounters an error.
90 /// 90 ///
91 /// If all members of [this] have already completed successfully or with an 91 /// If all members of [this] have already completed successfully or with an
92 /// error, it's a [StateError] to try to register a new [Stream]. 92 /// error, it's a [StateError] to try to register a new [Stream].
93 Stream registerStream(Stream stream) { 93 Stream registerStream(Stream stream) {
94 if (_isDone) { 94 if (_isDone) {
95 throw new StateError("Can't register new members on a complete " 95 throw new StateError(
96 "ErrorGroup."); 96 "Can't register new members on a complete " "ErrorGroup.");
97 } 97 }
98 98
99 var wrapped = new _ErrorGroupStream(this, stream); 99 var wrapped = new _ErrorGroupStream(this, stream);
100 _streams.add(wrapped); 100 _streams.add(wrapped);
101 return wrapped; 101 return wrapped;
102 } 102 }
103 103
104 /// Sends [error] to all members of [this]. 104 /// Sends [error] to all members of [this].
105 /// 105 ///
106 /// Like errors that come from members, this will only be passed to the 106 /// Like errors that come from members, this will only be passed to the
(...skipping 22 matching lines...) Expand all
129 future._signalError(error, stackTrace); 129 future._signalError(error, stackTrace);
130 } 130 }
131 131
132 for (var stream in _streams) { 132 for (var stream in _streams) {
133 if (stream._isDone || stream._hasListeners) caught = true; 133 if (stream._isDone || stream._hasListeners) caught = true;
134 stream._signalError(error, stackTrace); 134 stream._signalError(error, stackTrace);
135 } 135 }
136 136
137 _isDone = true; 137 _isDone = true;
138 _done._signalError(error, stackTrace); 138 _done._signalError(error, stackTrace);
139 if (!caught && !_done._hasListeners) scheduleMicrotask((){ throw error; }); 139 if (!caught && !_done._hasListeners) scheduleMicrotask(() {
140 throw error;
141 });
140 } 142 }
141 143
142 /// Notifies [this] that one of its member [Future]s is complete. 144 /// Notifies [this] that one of its member [Future]s is complete.
143 void _signalFutureComplete(_ErrorGroupFuture future) { 145 void _signalFutureComplete(_ErrorGroupFuture future) {
144 if (_isDone) return; 146 if (_isDone) return;
145 147
146 _isDone = _futures.every((future) => future._isDone) && 148 _isDone = _futures.every((future) => future._isDone) &&
147 _streams.every((stream) => stream._isDone); 149 _streams.every((stream) => stream._isDone);
148 if (_isDone) _doneCompleter.complete(); 150 if (_isDone) _doneCompleter.complete();
149 } 151 }
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after
250 /// The [StreamSubscription] that connects the wrapped [Stream] to 252 /// The [StreamSubscription] that connects the wrapped [Stream] to
251 /// [_controller]. 253 /// [_controller].
252 StreamSubscription _subscription; 254 StreamSubscription _subscription;
253 255
254 /// Whether [this] has any listeners. 256 /// Whether [this] has any listeners.
255 bool get _hasListeners => _controller.hasListener; 257 bool get _hasListeners => _controller.hasListener;
256 258
257 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps 259 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps
258 /// [inner]. 260 /// [inner].
259 _ErrorGroupStream(this._group, Stream inner) 261 _ErrorGroupStream(this._group, Stream inner)
260 : _controller = new StreamController(sync: true) { 262 : _controller = new StreamController(sync: true) {
261 // Use old-style asBroadcastStream behavior - cancel source _subscription 263 // Use old-style asBroadcastStream behavior - cancel source _subscription
262 // the first time the stream has no listeners. 264 // the first time the stream has no listeners.
263 _stream = inner.isBroadcast 265 _stream = inner.isBroadcast ?
264 ? _controller.stream.asBroadcastStream(onCancel: (sub) => sub.cancel()) 266 _controller.stream.asBroadcastStream(onCancel: (sub) => sub.cancel()) :
265 : _controller.stream; 267 _controller.stream;
266 _subscription = inner.listen((v) { 268 _subscription = inner.listen((v) {
267 _controller.add(v); 269 _controller.add(v);
268 }, onError: (e, [stackTrace]) { 270 }, onError: (e, [stackTrace]) {
269 _group._signalError(e, stackTrace); 271 _group._signalError(e, stackTrace);
270 }, onDone: () { 272 }, onDone: () {
271 _isDone = true; 273 _isDone = true;
272 _group._signalStreamComplete(this); 274 _group._signalStreamComplete(this);
273 _controller.close(); 275 _controller.close();
274 }); 276 });
275 } 277 }
276 278
277 StreamSubscription listen(void onData(value), 279 StreamSubscription listen(void onData(value), {Function onError, void
278 {Function onError, void onDone(), 280 onDone(), bool cancelOnError}) {
279 bool cancelOnError}) { 281 return _stream.listen(
280 return _stream.listen(onData, 282 onData,
281 onError: onError, 283 onError: onError,
282 onDone: onDone, 284 onDone: onDone,
283 cancelOnError: true); 285 cancelOnError: true);
284 } 286 }
285 287
286 /// Signal that an error from [_group] should be propagated through [this], 288 /// Signal that an error from [_group] should be propagated through [this],
287 /// unless it's already complete. 289 /// unless it's already complete.
288 void _signalError(var e, [StackTrace stackTrace]) { 290 void _signalError(var e, [StackTrace stackTrace]) {
289 if (_isDone) return; 291 if (_isDone) return;
290 _subscription.cancel(); 292 _subscription.cancel();
291 // Call these asynchronously to work around issue 7913. 293 // Call these asynchronously to work around issue 7913.
292 new Future.value().then((_) { 294 new Future.value().then((_) {
293 _controller.addError(e, stackTrace); 295 _controller.addError(e, stackTrace);
294 _controller.close(); 296 _controller.close();
295 }); 297 });
296 } 298 }
297 } 299 }
OLDNEW
« no previous file with comments | « sdk/lib/_internal/pub_generated/lib/src/entrypoint.dart ('k') | sdk/lib/_internal/pub_generated/lib/src/exceptions.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698