| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library pub.error_group; | 5 library pub.error_group; |
| 6 | 6 |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 | 8 |
| 9 /// An [ErrorGroup] entangles the errors of multiple [Future]s and [Stream]s | 9 /// An [ErrorGroup] entangles the errors of multiple [Future]s and [Stream]s |
| 10 /// with one another. This allows APIs to expose multiple [Future]s and | 10 /// with one another. This allows APIs to expose multiple [Future]s and |
| (...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 94 _streams.add(wrapped); | 94 _streams.add(wrapped); |
| 95 return wrapped; | 95 return wrapped; |
| 96 } | 96 } |
| 97 | 97 |
| 98 /// Sends [error] to all members of [this]. Like errors that come from | 98 /// Sends [error] to all members of [this]. Like errors that come from |
| 99 /// members, this will only be passed to the top-level error handler if no | 99 /// members, this will only be passed to the top-level error handler if no |
| 100 /// members have listeners. | 100 /// members have listeners. |
| 101 /// | 101 /// |
| 102 /// If all members of [this] have already completed successfully or with an | 102 /// If all members of [this] have already completed successfully or with an |
| 103 /// error, it's a [StateError] to try to signal an error. | 103 /// error, it's a [StateError] to try to signal an error. |
| 104 void signalError(var error) { | 104 void signalError(var error, [StackTrace stackTrace]) { |
| 105 if (_isDone) { | 105 if (_isDone) { |
| 106 throw new StateError("Can't signal errors on a complete ErrorGroup."); | 106 throw new StateError("Can't signal errors on a complete ErrorGroup."); |
| 107 } | 107 } |
| 108 | 108 |
| 109 _signalError(error); | 109 _signalError(error, stackTrace); |
| 110 } | 110 } |
| 111 | 111 |
| 112 /// Signal an error internally. This is just like [signalError], but instead | 112 /// Signal an error internally. This is just like [signalError], but instead |
| 113 /// of throwing an error if [this] is complete, it just does nothing. | 113 /// of throwing an error if [this] is complete, it just does nothing. |
| 114 void _signalError(var error) { | 114 void _signalError(var error, [StackTrace stackTrace]) { |
| 115 if (_isDone) return; | 115 if (_isDone) return; |
| 116 | 116 |
| 117 var caught = false; | 117 var caught = false; |
| 118 for (var future in _futures) { | 118 for (var future in _futures) { |
| 119 if (future._isDone || future._hasListeners) caught = true; | 119 if (future._isDone || future._hasListeners) caught = true; |
| 120 future._signalError(error); | 120 future._signalError(error, stackTrace); |
| 121 } | 121 } |
| 122 | 122 |
| 123 for (var stream in _streams) { | 123 for (var stream in _streams) { |
| 124 if (stream._isDone || stream._hasListeners) caught = true; | 124 if (stream._isDone || stream._hasListeners) caught = true; |
| 125 stream._signalError(error); | 125 stream._signalError(error, stackTrace); |
| 126 } | 126 } |
| 127 | 127 |
| 128 _isDone = true; | 128 _isDone = true; |
| 129 _done._signalError(error); | 129 _done._signalError(error, stackTrace); |
| 130 if (!caught && !_done._hasListeners) runAsync((){ throw error; }); | 130 if (!caught && !_done._hasListeners) runAsync((){ throw error; }); |
| 131 } | 131 } |
| 132 | 132 |
| 133 /// Notifies [this] that one of its member [Future]s is complete. | 133 /// Notifies [this] that one of its member [Future]s is complete. |
| 134 void _signalFutureComplete(_ErrorGroupFuture future) { | 134 void _signalFutureComplete(_ErrorGroupFuture future) { |
| 135 if (_isDone) return; | 135 if (_isDone) return; |
| 136 | 136 |
| 137 _isDone = _futures.every((future) => future._isDone) && | 137 _isDone = _futures.every((future) => future._isDone) && |
| 138 _streams.every((stream) => stream._isDone); | 138 _streams.every((stream) => stream._isDone); |
| 139 if (_isDone) _doneCompleter.complete(); | 139 if (_isDone) _doneCompleter.complete(); |
| (...skipping 25 matching lines...) Expand all Loading... |
| 165 /// Whether [this] has any listeners. | 165 /// Whether [this] has any listeners. |
| 166 bool _hasListeners = false; | 166 bool _hasListeners = false; |
| 167 | 167 |
| 168 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps | 168 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps |
| 169 /// [inner]. | 169 /// [inner]. |
| 170 _ErrorGroupFuture(this._group, Future inner) { | 170 _ErrorGroupFuture(this._group, Future inner) { |
| 171 inner.then((value) { | 171 inner.then((value) { |
| 172 if (!_isDone) _completer.complete(value); | 172 if (!_isDone) _completer.complete(value); |
| 173 _isDone = true; | 173 _isDone = true; |
| 174 _group._signalFutureComplete(this); | 174 _group._signalFutureComplete(this); |
| 175 }).catchError((error) => _group._signalError(error)); | 175 }).catchError(_group._signalError); |
| 176 | 176 |
| 177 // Make sure _completer.future doesn't automatically send errors to the | 177 // Make sure _completer.future doesn't automatically send errors to the |
| 178 // top-level. | 178 // top-level. |
| 179 _completer.future.catchError((_) {}); | 179 _completer.future.catchError((_) {}); |
| 180 } | 180 } |
| 181 | 181 |
| 182 Future then(onValue(value), {Function onError}) { | 182 Future then(onValue(value), {Function onError}) { |
| 183 _hasListeners = true; | 183 _hasListeners = true; |
| 184 return _completer.future.then(onValue, onError: onError); | 184 return _completer.future.then(onValue, onError: onError); |
| 185 } | 185 } |
| 186 | 186 |
| 187 Future catchError(Function onError, {bool test(Object error)}) { | 187 Future catchError(Function onError, {bool test(Object error)}) { |
| 188 _hasListeners = true; | 188 _hasListeners = true; |
| 189 return _completer.future.catchError(onError, test: test); | 189 return _completer.future.catchError(onError, test: test); |
| 190 } | 190 } |
| 191 | 191 |
| 192 Future whenComplete(void action()) { | 192 Future whenComplete(void action()) { |
| 193 _hasListeners = true; | 193 _hasListeners = true; |
| 194 return _completer.future.whenComplete(action); | 194 return _completer.future.whenComplete(action); |
| 195 } | 195 } |
| 196 | 196 |
| 197 Stream asStream() { | 197 Stream asStream() { |
| 198 _hasListeners = true; | 198 _hasListeners = true; |
| 199 return _completer.future.asStream(); | 199 return _completer.future.asStream(); |
| 200 } | 200 } |
| 201 | 201 |
| 202 /// Signal that an error from [_group] should be propagated through [this], | 202 /// Signal that an error from [_group] should be propagated through [this], |
| 203 /// unless it's already complete. | 203 /// unless it's already complete. |
| 204 void _signalError(var error) { | 204 void _signalError(var error, [StackTrace stackTrace]) { |
| 205 if (!_isDone) _completer.completeError(error); | 205 if (!_isDone) _completer.completeError(error, stackTrace); |
| 206 _isDone = true; | 206 _isDone = true; |
| 207 } | 207 } |
| 208 } | 208 } |
| 209 | 209 |
| 210 // TODO(nweiz): currently streams never top-level unhandled errors (issue 7843). | 210 // TODO(nweiz): currently streams never top-level unhandled errors (issue 7843). |
| 211 // When this is fixed, this class will need to prevent such errors from being | 211 // When this is fixed, this class will need to prevent such errors from being |
| 212 // top-leveled. | 212 // top-leveled. |
| 213 /// A [Stream] wrapper that keeps track of whether it's been completed and | 213 /// A [Stream] wrapper that keeps track of whether it's been completed and |
| 214 /// whether it has any listeners. It also notifies its parent [ErrorGroup] when | 214 /// whether it has any listeners. It also notifies its parent [ErrorGroup] when |
| 215 /// it completes successfully or receives an error. | 215 /// it completes successfully or receives an error. |
| (...skipping 22 matching lines...) Expand all Loading... |
| 238 /// [inner]. | 238 /// [inner]. |
| 239 _ErrorGroupStream(this._group, Stream inner) | 239 _ErrorGroupStream(this._group, Stream inner) |
| 240 : _controller = new StreamController(sync: true) { | 240 : _controller = new StreamController(sync: true) { |
| 241 // Use old-style asBroadcastStream behavior - cancel source _subscription | 241 // Use old-style asBroadcastStream behavior - cancel source _subscription |
| 242 // the first time the stream has no listeners. | 242 // the first time the stream has no listeners. |
| 243 _stream = inner.isBroadcast | 243 _stream = inner.isBroadcast |
| 244 ? _controller.stream.asBroadcastStream(onCancel: (sub) => sub.cancel()) | 244 ? _controller.stream.asBroadcastStream(onCancel: (sub) => sub.cancel()) |
| 245 : _controller.stream; | 245 : _controller.stream; |
| 246 _subscription = inner.listen((v) { | 246 _subscription = inner.listen((v) { |
| 247 _controller.add(v); | 247 _controller.add(v); |
| 248 }, onError: (e) { | 248 }, onError: (e, [stackTrace]) { |
| 249 _group._signalError(e); | 249 _group._signalError(e, stackTrace); |
| 250 }, onDone: () { | 250 }, onDone: () { |
| 251 _isDone = true; | 251 _isDone = true; |
| 252 _group._signalStreamComplete(this); | 252 _group._signalStreamComplete(this); |
| 253 _controller.close(); | 253 _controller.close(); |
| 254 }); | 254 }); |
| 255 } | 255 } |
| 256 | 256 |
| 257 StreamSubscription listen(void onData(value), | 257 StreamSubscription listen(void onData(value), |
| 258 {void onError(var error), void onDone(), | 258 {Function onError, void onDone(), |
| 259 bool cancelOnError}) { | 259 bool cancelOnError}) { |
| 260 return _stream.listen(onData, | 260 return _stream.listen(onData, |
| 261 onError: onError, | 261 onError: onError, |
| 262 onDone: onDone, | 262 onDone: onDone, |
| 263 cancelOnError: true); | 263 cancelOnError: true); |
| 264 } | 264 } |
| 265 | 265 |
| 266 /// Signal that an error from [_group] should be propagated through [this], | 266 /// Signal that an error from [_group] should be propagated through [this], |
| 267 /// unless it's already complete. | 267 /// unless it's already complete. |
| 268 void _signalError(var e) { | 268 void _signalError(var e, [StackTrace stackTrace]) { |
| 269 if (_isDone) return; | 269 if (_isDone) return; |
| 270 _subscription.cancel(); | 270 _subscription.cancel(); |
| 271 // Call these asynchronously to work around issue 7913. | 271 // Call these asynchronously to work around issue 7913. |
| 272 new Future.value().then((_) { | 272 new Future.value().then((_) { |
| 273 _controller.addError(e); | 273 _controller.addError(e, stackTrace); |
| 274 _controller.close(); | 274 _controller.close(); |
| 275 }); | 275 }); |
| 276 } | 276 } |
| 277 } | 277 } |
| OLD | NEW |