| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 import 'dart:async'; | 5 import 'dart:async'; |
| 6 | 6 |
| 7 /// An [ErrorGroup] entangles the errors of multiple [Future]s and [Stream]s | 7 /// An [ErrorGroup] entangles the errors of multiple [Future]s and [Stream]s |
| 8 /// with one another. | 8 /// with one another. |
| 9 /// | 9 /// |
| 10 /// This allows APIs to expose multiple [Future]s and [Stream]s that have | 10 /// This allows APIs to expose multiple [Future]s and [Stream]s that have |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 58 ErrorGroup() { | 58 ErrorGroup() { |
| 59 this._done = new _ErrorGroupFuture(this, _doneCompleter.future); | 59 this._done = new _ErrorGroupFuture(this, _doneCompleter.future); |
| 60 } | 60 } |
| 61 | 61 |
| 62 /// Registers a [Future] as a member of [this]. | 62 /// Registers a [Future] as a member of [this]. |
| 63 /// | 63 /// |
| 64 /// Returns a wrapped version of [future] that should be used in its place. | 64 /// Returns a wrapped version of [future] that should be used in its place. |
| 65 /// | 65 /// |
| 66 /// If all members of [this] have already completed successfully or with an | 66 /// If all members of [this] have already completed successfully or with an |
| 67 /// error, it's a [StateError] to try to register a new [Future]. | 67 /// error, it's a [StateError] to try to register a new [Future]. |
| 68 Future registerFuture(Future future) { | 68 Future/*<T>*/ registerFuture/*<T>*/(Future/*<T>*/ future) { |
| 69 if (_isDone) { | 69 if (_isDone) { |
| 70 throw new StateError("Can't register new members on a complete " | 70 throw new StateError("Can't register new members on a complete " |
| 71 "ErrorGroup."); | 71 "ErrorGroup."); |
| 72 } | 72 } |
| 73 | 73 |
| 74 var wrapped = new _ErrorGroupFuture(this, future); | 74 var wrapped = new _ErrorGroupFuture(this, future); |
| 75 _futures.add(wrapped); | 75 _futures.add(wrapped); |
| 76 return wrapped; | 76 return wrapped; |
| 77 } | 77 } |
| 78 | 78 |
| 79 /// Registers a [Stream] as a member of [this]. | 79 /// Registers a [Stream] as a member of [this]. |
| 80 /// | 80 /// |
| 81 /// Returns a wrapped version of [stream] that should be used in its place. | 81 /// Returns a wrapped version of [stream] that should be used in its place. |
| 82 /// The returned [Stream] will be multi-subscription if and only if [stream] | 82 /// The returned [Stream] will be multi-subscription if and only if [stream] |
| 83 /// is. | 83 /// is. |
| 84 /// | 84 /// |
| 85 /// Since all errors in a group are passed to all members, the returned | 85 /// Since all errors in a group are passed to all members, the returned |
| 86 /// [Stream] will automatically unsubscribe all its listeners when it | 86 /// [Stream] will automatically unsubscribe all its listeners when it |
| 87 /// encounters an error. | 87 /// encounters an error. |
| 88 /// | 88 /// |
| 89 /// If all members of [this] have already completed successfully or with an | 89 /// If all members of [this] have already completed successfully or with an |
| 90 /// error, it's a [StateError] to try to register a new [Stream]. | 90 /// error, it's a [StateError] to try to register a new [Stream]. |
| 91 Stream registerStream(Stream stream) { | 91 Stream/*<T>*/ registerStream/*<T>*/(Stream/*<T>*/ stream) { |
| 92 if (_isDone) { | 92 if (_isDone) { |
| 93 throw new StateError("Can't register new members on a complete " | 93 throw new StateError("Can't register new members on a complete " |
| 94 "ErrorGroup."); | 94 "ErrorGroup."); |
| 95 } | 95 } |
| 96 | 96 |
| 97 var wrapped = new _ErrorGroupStream(this, stream); | 97 var wrapped = new _ErrorGroupStream(this, stream); |
| 98 _streams.add(wrapped); | 98 _streams.add(wrapped); |
| 99 return wrapped; | 99 return wrapped; |
| 100 } | 100 } |
| 101 | 101 |
| (...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 154 _streams.every((stream) => stream._isDone); | 154 _streams.every((stream) => stream._isDone); |
| 155 if (_isDone) _doneCompleter.complete(); | 155 if (_isDone) _doneCompleter.complete(); |
| 156 } | 156 } |
| 157 } | 157 } |
| 158 | 158 |
| 159 /// A [Future] wrapper that keeps track of whether it's been completed and | 159 /// A [Future] wrapper that keeps track of whether it's been completed and |
| 160 /// whether it has any listeners. | 160 /// whether it has any listeners. |
| 161 /// | 161 /// |
| 162 /// It also notifies its parent [ErrorGroup] when it completes successfully or | 162 /// It also notifies its parent [ErrorGroup] when it completes successfully or |
| 163 /// receives an error. | 163 /// receives an error. |
| 164 class _ErrorGroupFuture implements Future { | 164 class _ErrorGroupFuture<T> implements Future<T> { |
| 165 /// The parent [ErrorGroup]. | 165 /// The parent [ErrorGroup]. |
| 166 final ErrorGroup _group; | 166 final ErrorGroup _group; |
| 167 | 167 |
| 168 /// Whether [this] has completed, either successfully or with an error. | 168 /// Whether [this] has completed, either successfully or with an error. |
| 169 var _isDone = false; | 169 var _isDone = false; |
| 170 | 170 |
| 171 /// The underlying [Completer] for [this]. | 171 /// The underlying [Completer] for [this]. |
| 172 final _completer = new Completer(); | 172 final _completer = new Completer<T>(); |
| 173 | 173 |
| 174 /// Whether [this] has any listeners. | 174 /// Whether [this] has any listeners. |
| 175 bool _hasListeners = false; | 175 bool _hasListeners = false; |
| 176 | 176 |
| 177 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps | 177 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps |
| 178 /// [inner]. | 178 /// [inner]. |
| 179 _ErrorGroupFuture(this._group, Future inner) { | 179 _ErrorGroupFuture(this._group, Future<T> inner) { |
| 180 inner.then((value) { | 180 inner.then((value) { |
| 181 if (!_isDone) _completer.complete(value); | 181 if (!_isDone) _completer.complete(value); |
| 182 _isDone = true; | 182 _isDone = true; |
| 183 _group._signalFutureComplete(this); | 183 _group._signalFutureComplete(this); |
| 184 }).catchError(_group._signalError); | 184 }).catchError(_group._signalError); |
| 185 | 185 |
| 186 // Make sure _completer.future doesn't automatically send errors to the | 186 // Make sure _completer.future doesn't automatically send errors to the |
| 187 // top-level. | 187 // top-level. |
| 188 _completer.future.catchError((_) {}); | 188 _completer.future.catchError((_) {}); |
| 189 } | 189 } |
| 190 | 190 |
| 191 Future then(onValue(value), {Function onError}) { | 191 Future/*<S>*/ then/*<S>*/(/*=S*/ onValue(/*=T*/ value), {Function onError}) { |
| 192 _hasListeners = true; | 192 _hasListeners = true; |
| 193 return _completer.future.then(onValue, onError: onError); | 193 return _completer.future.then(onValue, onError: onError); |
| 194 } | 194 } |
| 195 | 195 |
| 196 Future catchError(Function onError, {bool test(Object error)}) { | 196 Future<T> catchError(Function onError, {bool test(Object error)}) { |
| 197 _hasListeners = true; | 197 _hasListeners = true; |
| 198 return _completer.future.catchError(onError, test: test); | 198 return _completer.future.catchError(onError, test: test); |
| 199 } | 199 } |
| 200 | 200 |
| 201 Future whenComplete(void action()) { | 201 Future<T> whenComplete(void action()) { |
| 202 _hasListeners = true; | 202 _hasListeners = true; |
| 203 return _completer.future.whenComplete(action); | 203 return _completer.future.whenComplete(action); |
| 204 } | 204 } |
| 205 | 205 |
| 206 Future timeout(Duration timeLimit, {void onTimeout()}) { | 206 Future<T> timeout(Duration timeLimit, {onTimeout()}) { |
| 207 _hasListeners = true; | 207 _hasListeners = true; |
| 208 return _completer.future.timeout(timeLimit, onTimeout: onTimeout); | 208 return _completer.future.timeout(timeLimit, onTimeout: onTimeout); |
| 209 } | 209 } |
| 210 | 210 |
| 211 Stream asStream() { | 211 Stream<T> asStream() { |
| 212 _hasListeners = true; | 212 _hasListeners = true; |
| 213 return _completer.future.asStream(); | 213 return _completer.future.asStream(); |
| 214 } | 214 } |
| 215 | 215 |
| 216 /// Signal that an error from [_group] should be propagated through [this], | 216 /// Signal that an error from [_group] should be propagated through [this], |
| 217 /// unless it's already complete. | 217 /// unless it's already complete. |
| 218 void _signalError(var error, [StackTrace stackTrace]) { | 218 void _signalError(var error, [StackTrace stackTrace]) { |
| 219 if (!_isDone) _completer.completeError(error, stackTrace); | 219 if (!_isDone) _completer.completeError(error, stackTrace); |
| 220 _isDone = true; | 220 _isDone = true; |
| 221 } | 221 } |
| 222 } | 222 } |
| 223 | 223 |
| 224 // TODO(nweiz): currently streams never top-level unhandled errors (issue 7843). | 224 // TODO(nweiz): currently streams never top-level unhandled errors (issue 7843). |
| 225 // When this is fixed, this class will need to prevent such errors from being | 225 // When this is fixed, this class will need to prevent such errors from being |
| 226 // top-leveled. | 226 // top-leveled. |
| 227 /// A [Stream] wrapper that keeps track of whether it's been completed and | 227 /// A [Stream] wrapper that keeps track of whether it's been completed and |
| 228 /// whether it has any listeners. | 228 /// whether it has any listeners. |
| 229 /// | 229 /// |
| 230 /// It also notifies its parent [ErrorGroup] when it completes successfully or | 230 /// It also notifies its parent [ErrorGroup] when it completes successfully or |
| 231 /// receives an error. | 231 /// receives an error. |
| 232 class _ErrorGroupStream extends Stream { | 232 class _ErrorGroupStream<T> extends Stream<T> { |
| 233 /// The parent [ErrorGroup]. | 233 /// The parent [ErrorGroup]. |
| 234 final ErrorGroup _group; | 234 final ErrorGroup _group; |
| 235 | 235 |
| 236 /// Whether [this] has completed, either successfully or with an error. | 236 /// Whether [this] has completed, either successfully or with an error. |
| 237 var _isDone = false; | 237 var _isDone = false; |
| 238 | 238 |
| 239 /// The underlying [StreamController] for [this]. | 239 /// The underlying [StreamController] for [this]. |
| 240 final StreamController _controller; | 240 final StreamController<T> _controller; |
| 241 | 241 |
| 242 /// The controller's [Stream]. | 242 /// The controller's [Stream]. |
| 243 /// | 243 /// |
| 244 /// May be different than `_controller.stream` if the wrapped stream is a | 244 /// May be different than `_controller.stream` if the wrapped stream is a |
| 245 /// broadcasting stream. | 245 /// broadcasting stream. |
| 246 Stream _stream; | 246 Stream<T> _stream; |
| 247 | 247 |
| 248 /// The [StreamSubscription] that connects the wrapped [Stream] to | 248 /// The [StreamSubscription] that connects the wrapped [Stream] to |
| 249 /// [_controller]. | 249 /// [_controller]. |
| 250 StreamSubscription _subscription; | 250 StreamSubscription<T> _subscription; |
| 251 | 251 |
| 252 /// Whether [this] has any listeners. | 252 /// Whether [this] has any listeners. |
| 253 bool get _hasListeners => _controller.hasListener; | 253 bool get _hasListeners => _controller.hasListener; |
| 254 | 254 |
| 255 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps | 255 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps |
| 256 /// [inner]. | 256 /// [inner]. |
| 257 _ErrorGroupStream(this._group, Stream inner) | 257 _ErrorGroupStream(this._group, Stream<T> inner) |
| 258 : _controller = new StreamController(sync: true) { | 258 : _controller = new StreamController(sync: true) { |
| 259 // Use old-style asBroadcastStream behavior - cancel source _subscription | 259 // Use old-style asBroadcastStream behavior - cancel source _subscription |
| 260 // the first time the stream has no listeners. | 260 // the first time the stream has no listeners. |
| 261 _stream = inner.isBroadcast | 261 _stream = inner.isBroadcast |
| 262 ? _controller.stream.asBroadcastStream(onCancel: (sub) => sub.cancel()) | 262 ? _controller.stream.asBroadcastStream(onCancel: (sub) => sub.cancel()) |
| 263 : _controller.stream; | 263 : _controller.stream; |
| 264 _subscription = inner.listen((v) { | 264 _subscription = inner.listen((v) { |
| 265 _controller.add(v); | 265 _controller.add(v); |
| 266 }, onError: (e, [stackTrace]) { | 266 }, onError: (e, [stackTrace]) { |
| 267 _group._signalError(e, stackTrace); | 267 _group._signalError(e, stackTrace); |
| 268 }, onDone: () { | 268 }, onDone: () { |
| 269 _isDone = true; | 269 _isDone = true; |
| 270 _group._signalStreamComplete(this); | 270 _group._signalStreamComplete(this); |
| 271 _controller.close(); | 271 _controller.close(); |
| 272 }); | 272 }); |
| 273 } | 273 } |
| 274 | 274 |
| 275 StreamSubscription listen(void onData(value), | 275 StreamSubscription<T> listen(void onData(T value), |
| 276 {Function onError, void onDone(), | 276 {Function onError, void onDone(), |
| 277 bool cancelOnError}) { | 277 bool cancelOnError}) { |
| 278 return _stream.listen(onData, | 278 return _stream.listen(onData, |
| 279 onError: onError, | 279 onError: onError, |
| 280 onDone: onDone, | 280 onDone: onDone, |
| 281 cancelOnError: true); | 281 cancelOnError: true); |
| 282 } | 282 } |
| 283 | 283 |
| 284 /// Signal that an error from [_group] should be propagated through [this], | 284 /// Signal that an error from [_group] should be propagated through [this], |
| 285 /// unless it's already complete. | 285 /// unless it's already complete. |
| 286 void _signalError(var e, [StackTrace stackTrace]) { | 286 void _signalError(var e, [StackTrace stackTrace]) { |
| 287 if (_isDone) return; | 287 if (_isDone) return; |
| 288 _subscription.cancel(); | 288 _subscription.cancel(); |
| 289 // Call these asynchronously to work around issue 7913. | 289 // Call these asynchronously to work around issue 7913. |
| 290 new Future.value().then((_) { | 290 new Future.value().then((_) { |
| 291 _controller.addError(e, stackTrace); | 291 _controller.addError(e, stackTrace); |
| 292 _controller.close(); | 292 _controller.close(); |
| 293 }); | 293 }); |
| 294 } | 294 } |
| 295 } | 295 } |
| OLD | NEW |