Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(505)

Side by Side Diff: sdk/lib/_internal/pub/lib/src/error_group.dart

Issue 25094002: Adapt streams for additional stackTrace argument. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Remove types in closures. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/_internal/pub/lib/src/command/serve.dart ('k') | sdk/lib/_internal/pub/lib/src/io.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library pub.error_group; 5 library pub.error_group;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 8
9 /// An [ErrorGroup] entangles the errors of multiple [Future]s and [Stream]s 9 /// An [ErrorGroup] entangles the errors of multiple [Future]s and [Stream]s
10 /// with one another. This allows APIs to expose multiple [Future]s and 10 /// with one another. This allows APIs to expose multiple [Future]s and
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after
94 _streams.add(wrapped); 94 _streams.add(wrapped);
95 return wrapped; 95 return wrapped;
96 } 96 }
97 97
98 /// Sends [error] to all members of [this]. Like errors that come from 98 /// Sends [error] to all members of [this]. Like errors that come from
99 /// members, this will only be passed to the top-level error handler if no 99 /// members, this will only be passed to the top-level error handler if no
100 /// members have listeners. 100 /// members have listeners.
101 /// 101 ///
102 /// If all members of [this] have already completed successfully or with an 102 /// If all members of [this] have already completed successfully or with an
103 /// error, it's a [StateError] to try to signal an error. 103 /// error, it's a [StateError] to try to signal an error.
104 void signalError(var error) { 104 void signalError(var error, [StackTrace stackTrace]) {
105 if (_isDone) { 105 if (_isDone) {
106 throw new StateError("Can't signal errors on a complete ErrorGroup."); 106 throw new StateError("Can't signal errors on a complete ErrorGroup.");
107 } 107 }
108 108
109 _signalError(error); 109 _signalError(error, stackTrace);
110 } 110 }
111 111
112 /// Signal an error internally. This is just like [signalError], but instead 112 /// Signal an error internally. This is just like [signalError], but instead
113 /// of throwing an error if [this] is complete, it just does nothing. 113 /// of throwing an error if [this] is complete, it just does nothing.
114 void _signalError(var error) { 114 void _signalError(var error, [StackTrace stackTrace]) {
115 if (_isDone) return; 115 if (_isDone) return;
116 116
117 var caught = false; 117 var caught = false;
118 for (var future in _futures) { 118 for (var future in _futures) {
119 if (future._isDone || future._hasListeners) caught = true; 119 if (future._isDone || future._hasListeners) caught = true;
120 future._signalError(error); 120 future._signalError(error, stackTrace);
121 } 121 }
122 122
123 for (var stream in _streams) { 123 for (var stream in _streams) {
124 if (stream._isDone || stream._hasListeners) caught = true; 124 if (stream._isDone || stream._hasListeners) caught = true;
125 stream._signalError(error); 125 stream._signalError(error, stackTrace);
126 } 126 }
127 127
128 _isDone = true; 128 _isDone = true;
129 _done._signalError(error); 129 _done._signalError(error, stackTrace);
130 if (!caught && !_done._hasListeners) runAsync((){ throw error; }); 130 if (!caught && !_done._hasListeners) runAsync((){ throw error; });
131 } 131 }
132 132
133 /// Notifies [this] that one of its member [Future]s is complete. 133 /// Notifies [this] that one of its member [Future]s is complete.
134 void _signalFutureComplete(_ErrorGroupFuture future) { 134 void _signalFutureComplete(_ErrorGroupFuture future) {
135 if (_isDone) return; 135 if (_isDone) return;
136 136
137 _isDone = _futures.every((future) => future._isDone) && 137 _isDone = _futures.every((future) => future._isDone) &&
138 _streams.every((stream) => stream._isDone); 138 _streams.every((stream) => stream._isDone);
139 if (_isDone) _doneCompleter.complete(); 139 if (_isDone) _doneCompleter.complete();
(...skipping 25 matching lines...) Expand all
165 /// Whether [this] has any listeners. 165 /// Whether [this] has any listeners.
166 bool _hasListeners = false; 166 bool _hasListeners = false;
167 167
168 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps 168 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps
169 /// [inner]. 169 /// [inner].
170 _ErrorGroupFuture(this._group, Future inner) { 170 _ErrorGroupFuture(this._group, Future inner) {
171 inner.then((value) { 171 inner.then((value) {
172 if (!_isDone) _completer.complete(value); 172 if (!_isDone) _completer.complete(value);
173 _isDone = true; 173 _isDone = true;
174 _group._signalFutureComplete(this); 174 _group._signalFutureComplete(this);
175 }).catchError((error) => _group._signalError(error)); 175 }).catchError(_group._signalError);
176 176
177 // Make sure _completer.future doesn't automatically send errors to the 177 // Make sure _completer.future doesn't automatically send errors to the
178 // top-level. 178 // top-level.
179 _completer.future.catchError((_) {}); 179 _completer.future.catchError((_) {});
180 } 180 }
181 181
182 Future then(onValue(value), {Function onError}) { 182 Future then(onValue(value), {Function onError}) {
183 _hasListeners = true; 183 _hasListeners = true;
184 return _completer.future.then(onValue, onError: onError); 184 return _completer.future.then(onValue, onError: onError);
185 } 185 }
186 186
187 Future catchError(Function onError, {bool test(Object error)}) { 187 Future catchError(Function onError, {bool test(Object error)}) {
188 _hasListeners = true; 188 _hasListeners = true;
189 return _completer.future.catchError(onError, test: test); 189 return _completer.future.catchError(onError, test: test);
190 } 190 }
191 191
192 Future whenComplete(void action()) { 192 Future whenComplete(void action()) {
193 _hasListeners = true; 193 _hasListeners = true;
194 return _completer.future.whenComplete(action); 194 return _completer.future.whenComplete(action);
195 } 195 }
196 196
197 Stream asStream() { 197 Stream asStream() {
198 _hasListeners = true; 198 _hasListeners = true;
199 return _completer.future.asStream(); 199 return _completer.future.asStream();
200 } 200 }
201 201
202 /// Signal that an error from [_group] should be propagated through [this], 202 /// Signal that an error from [_group] should be propagated through [this],
203 /// unless it's already complete. 203 /// unless it's already complete.
204 void _signalError(var error) { 204 void _signalError(var error, [StackTrace stackTrace]) {
205 if (!_isDone) _completer.completeError(error); 205 if (!_isDone) _completer.completeError(error, stackTrace);
206 _isDone = true; 206 _isDone = true;
207 } 207 }
208 } 208 }
209 209
210 // TODO(nweiz): currently streams never top-level unhandled errors (issue 7843). 210 // TODO(nweiz): currently streams never top-level unhandled errors (issue 7843).
211 // When this is fixed, this class will need to prevent such errors from being 211 // When this is fixed, this class will need to prevent such errors from being
212 // top-leveled. 212 // top-leveled.
213 /// A [Stream] wrapper that keeps track of whether it's been completed and 213 /// A [Stream] wrapper that keeps track of whether it's been completed and
214 /// whether it has any listeners. It also notifies its parent [ErrorGroup] when 214 /// whether it has any listeners. It also notifies its parent [ErrorGroup] when
215 /// it completes successfully or receives an error. 215 /// it completes successfully or receives an error.
(...skipping 22 matching lines...) Expand all
238 /// [inner]. 238 /// [inner].
239 _ErrorGroupStream(this._group, Stream inner) 239 _ErrorGroupStream(this._group, Stream inner)
240 : _controller = new StreamController(sync: true) { 240 : _controller = new StreamController(sync: true) {
241 // Use old-style asBroadcastStream behavior - cancel source _subscription 241 // Use old-style asBroadcastStream behavior - cancel source _subscription
242 // the first time the stream has no listeners. 242 // the first time the stream has no listeners.
243 _stream = inner.isBroadcast 243 _stream = inner.isBroadcast
244 ? _controller.stream.asBroadcastStream(onCancel: (sub) => sub.cancel()) 244 ? _controller.stream.asBroadcastStream(onCancel: (sub) => sub.cancel())
245 : _controller.stream; 245 : _controller.stream;
246 _subscription = inner.listen((v) { 246 _subscription = inner.listen((v) {
247 _controller.add(v); 247 _controller.add(v);
248 }, onError: (e) { 248 }, onError: (e, [stackTrace]) {
249 _group._signalError(e); 249 _group._signalError(e, stackTrace);
250 }, onDone: () { 250 }, onDone: () {
251 _isDone = true; 251 _isDone = true;
252 _group._signalStreamComplete(this); 252 _group._signalStreamComplete(this);
253 _controller.close(); 253 _controller.close();
254 }); 254 });
255 } 255 }
256 256
257 StreamSubscription listen(void onData(value), 257 StreamSubscription listen(void onData(value),
258 {void onError(var error), void onDone(), 258 {Function onError, void onDone(),
259 bool cancelOnError}) { 259 bool cancelOnError}) {
260 return _stream.listen(onData, 260 return _stream.listen(onData,
261 onError: onError, 261 onError: onError,
262 onDone: onDone, 262 onDone: onDone,
263 cancelOnError: true); 263 cancelOnError: true);
264 } 264 }
265 265
266 /// Signal that an error from [_group] should be propagated through [this], 266 /// Signal that an error from [_group] should be propagated through [this],
267 /// unless it's already complete. 267 /// unless it's already complete.
268 void _signalError(var e) { 268 void _signalError(var e, [StackTrace stackTrace]) {
269 if (_isDone) return; 269 if (_isDone) return;
270 _subscription.cancel(); 270 _subscription.cancel();
271 // Call these asynchronously to work around issue 7913. 271 // Call these asynchronously to work around issue 7913.
272 new Future.value().then((_) { 272 new Future.value().then((_) {
273 _controller.addError(e); 273 _controller.addError(e, stackTrace);
274 _controller.close(); 274 _controller.close();
275 }); 275 });
276 } 276 }
277 } 277 }
OLDNEW
« no previous file with comments | « sdk/lib/_internal/pub/lib/src/command/serve.dart ('k') | sdk/lib/_internal/pub/lib/src/io.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698