OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import 'dart:async'; | 5 import 'dart:async'; |
6 | 6 |
7 /// An [ErrorGroup] entangles the errors of multiple [Future]s and [Stream]s | 7 /// An [ErrorGroup] entangles the errors of multiple [Future]s and [Stream]s |
8 /// with one another. | 8 /// with one another. |
9 /// | 9 /// |
10 /// This allows APIs to expose multiple [Future]s and [Stream]s that have | 10 /// This allows APIs to expose multiple [Future]s and [Stream]s that have |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
58 ErrorGroup() { | 58 ErrorGroup() { |
59 this._done = new _ErrorGroupFuture(this, _doneCompleter.future); | 59 this._done = new _ErrorGroupFuture(this, _doneCompleter.future); |
60 } | 60 } |
61 | 61 |
62 /// Registers a [Future] as a member of [this]. | 62 /// Registers a [Future] as a member of [this]. |
63 /// | 63 /// |
64 /// Returns a wrapped version of [future] that should be used in its place. | 64 /// Returns a wrapped version of [future] that should be used in its place. |
65 /// | 65 /// |
66 /// If all members of [this] have already completed successfully or with an | 66 /// If all members of [this] have already completed successfully or with an |
67 /// error, it's a [StateError] to try to register a new [Future]. | 67 /// error, it's a [StateError] to try to register a new [Future]. |
68 Future registerFuture(Future future) { | 68 Future/*<T>*/ registerFuture/*<T>*/(Future/*<T>*/ future) { |
69 if (_isDone) { | 69 if (_isDone) { |
70 throw new StateError("Can't register new members on a complete " | 70 throw new StateError("Can't register new members on a complete " |
71 "ErrorGroup."); | 71 "ErrorGroup."); |
72 } | 72 } |
73 | 73 |
74 var wrapped = new _ErrorGroupFuture(this, future); | 74 var wrapped = new _ErrorGroupFuture(this, future); |
75 _futures.add(wrapped); | 75 _futures.add(wrapped); |
76 return wrapped; | 76 return wrapped; |
77 } | 77 } |
78 | 78 |
79 /// Registers a [Stream] as a member of [this]. | 79 /// Registers a [Stream] as a member of [this]. |
80 /// | 80 /// |
81 /// Returns a wrapped version of [stream] that should be used in its place. | 81 /// Returns a wrapped version of [stream] that should be used in its place. |
82 /// The returned [Stream] will be multi-subscription if and only if [stream] | 82 /// The returned [Stream] will be multi-subscription if and only if [stream] |
83 /// is. | 83 /// is. |
84 /// | 84 /// |
85 /// Since all errors in a group are passed to all members, the returned | 85 /// Since all errors in a group are passed to all members, the returned |
86 /// [Stream] will automatically unsubscribe all its listeners when it | 86 /// [Stream] will automatically unsubscribe all its listeners when it |
87 /// encounters an error. | 87 /// encounters an error. |
88 /// | 88 /// |
89 /// If all members of [this] have already completed successfully or with an | 89 /// If all members of [this] have already completed successfully or with an |
90 /// error, it's a [StateError] to try to register a new [Stream]. | 90 /// error, it's a [StateError] to try to register a new [Stream]. |
91 Stream registerStream(Stream stream) { | 91 Stream/*<T>*/ registerStream/*<T>*/(Stream/*<T>*/ stream) { |
92 if (_isDone) { | 92 if (_isDone) { |
93 throw new StateError("Can't register new members on a complete " | 93 throw new StateError("Can't register new members on a complete " |
94 "ErrorGroup."); | 94 "ErrorGroup."); |
95 } | 95 } |
96 | 96 |
97 var wrapped = new _ErrorGroupStream(this, stream); | 97 var wrapped = new _ErrorGroupStream(this, stream); |
98 _streams.add(wrapped); | 98 _streams.add(wrapped); |
99 return wrapped; | 99 return wrapped; |
100 } | 100 } |
101 | 101 |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
154 _streams.every((stream) => stream._isDone); | 154 _streams.every((stream) => stream._isDone); |
155 if (_isDone) _doneCompleter.complete(); | 155 if (_isDone) _doneCompleter.complete(); |
156 } | 156 } |
157 } | 157 } |
158 | 158 |
159 /// A [Future] wrapper that keeps track of whether it's been completed and | 159 /// A [Future] wrapper that keeps track of whether it's been completed and |
160 /// whether it has any listeners. | 160 /// whether it has any listeners. |
161 /// | 161 /// |
162 /// It also notifies its parent [ErrorGroup] when it completes successfully or | 162 /// It also notifies its parent [ErrorGroup] when it completes successfully or |
163 /// receives an error. | 163 /// receives an error. |
164 class _ErrorGroupFuture implements Future { | 164 class _ErrorGroupFuture<T> implements Future<T> { |
165 /// The parent [ErrorGroup]. | 165 /// The parent [ErrorGroup]. |
166 final ErrorGroup _group; | 166 final ErrorGroup _group; |
167 | 167 |
168 /// Whether [this] has completed, either successfully or with an error. | 168 /// Whether [this] has completed, either successfully or with an error. |
169 var _isDone = false; | 169 var _isDone = false; |
170 | 170 |
171 /// The underlying [Completer] for [this]. | 171 /// The underlying [Completer] for [this]. |
172 final _completer = new Completer(); | 172 final _completer = new Completer<T>(); |
173 | 173 |
174 /// Whether [this] has any listeners. | 174 /// Whether [this] has any listeners. |
175 bool _hasListeners = false; | 175 bool _hasListeners = false; |
176 | 176 |
177 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps | 177 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps |
178 /// [inner]. | 178 /// [inner]. |
179 _ErrorGroupFuture(this._group, Future inner) { | 179 _ErrorGroupFuture(this._group, Future<T> inner) { |
180 inner.then((value) { | 180 inner.then((value) { |
181 if (!_isDone) _completer.complete(value); | 181 if (!_isDone) _completer.complete(value); |
182 _isDone = true; | 182 _isDone = true; |
183 _group._signalFutureComplete(this); | 183 _group._signalFutureComplete(this); |
184 }).catchError(_group._signalError); | 184 }).catchError(_group._signalError); |
185 | 185 |
186 // Make sure _completer.future doesn't automatically send errors to the | 186 // Make sure _completer.future doesn't automatically send errors to the |
187 // top-level. | 187 // top-level. |
188 _completer.future.catchError((_) {}); | 188 _completer.future.catchError((_) {}); |
189 } | 189 } |
190 | 190 |
191 Future then(onValue(value), {Function onError}) { | 191 Future/*<S>*/ then/*<S>*/(/*=S*/ onValue(/*=T*/ value), {Function onError}) { |
192 _hasListeners = true; | 192 _hasListeners = true; |
193 return _completer.future.then(onValue, onError: onError); | 193 return _completer.future.then(onValue, onError: onError); |
194 } | 194 } |
195 | 195 |
196 Future catchError(Function onError, {bool test(Object error)}) { | 196 Future<T> catchError(Function onError, {bool test(Object error)}) { |
197 _hasListeners = true; | 197 _hasListeners = true; |
198 return _completer.future.catchError(onError, test: test); | 198 return _completer.future.catchError(onError, test: test); |
199 } | 199 } |
200 | 200 |
201 Future whenComplete(void action()) { | 201 Future<T> whenComplete(void action()) { |
202 _hasListeners = true; | 202 _hasListeners = true; |
203 return _completer.future.whenComplete(action); | 203 return _completer.future.whenComplete(action); |
204 } | 204 } |
205 | 205 |
206 Future timeout(Duration timeLimit, {void onTimeout()}) { | 206 Future<T> timeout(Duration timeLimit, {onTimeout()}) { |
207 _hasListeners = true; | 207 _hasListeners = true; |
208 return _completer.future.timeout(timeLimit, onTimeout: onTimeout); | 208 return _completer.future.timeout(timeLimit, onTimeout: onTimeout); |
209 } | 209 } |
210 | 210 |
211 Stream asStream() { | 211 Stream<T> asStream() { |
212 _hasListeners = true; | 212 _hasListeners = true; |
213 return _completer.future.asStream(); | 213 return _completer.future.asStream(); |
214 } | 214 } |
215 | 215 |
216 /// Signal that an error from [_group] should be propagated through [this], | 216 /// Signal that an error from [_group] should be propagated through [this], |
217 /// unless it's already complete. | 217 /// unless it's already complete. |
218 void _signalError(var error, [StackTrace stackTrace]) { | 218 void _signalError(var error, [StackTrace stackTrace]) { |
219 if (!_isDone) _completer.completeError(error, stackTrace); | 219 if (!_isDone) _completer.completeError(error, stackTrace); |
220 _isDone = true; | 220 _isDone = true; |
221 } | 221 } |
222 } | 222 } |
223 | 223 |
224 // TODO(nweiz): currently streams never top-level unhandled errors (issue 7843). | 224 // TODO(nweiz): currently streams never top-level unhandled errors (issue 7843). |
225 // When this is fixed, this class will need to prevent such errors from being | 225 // When this is fixed, this class will need to prevent such errors from being |
226 // top-leveled. | 226 // top-leveled. |
227 /// A [Stream] wrapper that keeps track of whether it's been completed and | 227 /// A [Stream] wrapper that keeps track of whether it's been completed and |
228 /// whether it has any listeners. | 228 /// whether it has any listeners. |
229 /// | 229 /// |
230 /// It also notifies its parent [ErrorGroup] when it completes successfully or | 230 /// It also notifies its parent [ErrorGroup] when it completes successfully or |
231 /// receives an error. | 231 /// receives an error. |
232 class _ErrorGroupStream extends Stream { | 232 class _ErrorGroupStream<T> extends Stream<T> { |
233 /// The parent [ErrorGroup]. | 233 /// The parent [ErrorGroup]. |
234 final ErrorGroup _group; | 234 final ErrorGroup _group; |
235 | 235 |
236 /// Whether [this] has completed, either successfully or with an error. | 236 /// Whether [this] has completed, either successfully or with an error. |
237 var _isDone = false; | 237 var _isDone = false; |
238 | 238 |
239 /// The underlying [StreamController] for [this]. | 239 /// The underlying [StreamController] for [this]. |
240 final StreamController _controller; | 240 final StreamController<T> _controller; |
241 | 241 |
242 /// The controller's [Stream]. | 242 /// The controller's [Stream]. |
243 /// | 243 /// |
244 /// May be different than `_controller.stream` if the wrapped stream is a | 244 /// May be different than `_controller.stream` if the wrapped stream is a |
245 /// broadcasting stream. | 245 /// broadcasting stream. |
246 Stream _stream; | 246 Stream<T> _stream; |
247 | 247 |
248 /// The [StreamSubscription] that connects the wrapped [Stream] to | 248 /// The [StreamSubscription] that connects the wrapped [Stream] to |
249 /// [_controller]. | 249 /// [_controller]. |
250 StreamSubscription _subscription; | 250 StreamSubscription<T> _subscription; |
251 | 251 |
252 /// Whether [this] has any listeners. | 252 /// Whether [this] has any listeners. |
253 bool get _hasListeners => _controller.hasListener; | 253 bool get _hasListeners => _controller.hasListener; |
254 | 254 |
255 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps | 255 /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps |
256 /// [inner]. | 256 /// [inner]. |
257 _ErrorGroupStream(this._group, Stream inner) | 257 _ErrorGroupStream(this._group, Stream<T> inner) |
258 : _controller = new StreamController(sync: true) { | 258 : _controller = new StreamController(sync: true) { |
259 // Use old-style asBroadcastStream behavior - cancel source _subscription | 259 // Use old-style asBroadcastStream behavior - cancel source _subscription |
260 // the first time the stream has no listeners. | 260 // the first time the stream has no listeners. |
261 _stream = inner.isBroadcast | 261 _stream = inner.isBroadcast |
262 ? _controller.stream.asBroadcastStream(onCancel: (sub) => sub.cancel()) | 262 ? _controller.stream.asBroadcastStream(onCancel: (sub) => sub.cancel()) |
263 : _controller.stream; | 263 : _controller.stream; |
264 _subscription = inner.listen((v) { | 264 _subscription = inner.listen((v) { |
265 _controller.add(v); | 265 _controller.add(v); |
266 }, onError: (e, [stackTrace]) { | 266 }, onError: (e, [stackTrace]) { |
267 _group._signalError(e, stackTrace); | 267 _group._signalError(e, stackTrace); |
268 }, onDone: () { | 268 }, onDone: () { |
269 _isDone = true; | 269 _isDone = true; |
270 _group._signalStreamComplete(this); | 270 _group._signalStreamComplete(this); |
271 _controller.close(); | 271 _controller.close(); |
272 }); | 272 }); |
273 } | 273 } |
274 | 274 |
275 StreamSubscription listen(void onData(value), | 275 StreamSubscription<T> listen(void onData(T value), |
276 {Function onError, void onDone(), | 276 {Function onError, void onDone(), |
277 bool cancelOnError}) { | 277 bool cancelOnError}) { |
278 return _stream.listen(onData, | 278 return _stream.listen(onData, |
279 onError: onError, | 279 onError: onError, |
280 onDone: onDone, | 280 onDone: onDone, |
281 cancelOnError: true); | 281 cancelOnError: true); |
282 } | 282 } |
283 | 283 |
284 /// Signal that an error from [_group] should be propagated through [this], | 284 /// Signal that an error from [_group] should be propagated through [this], |
285 /// unless it's already complete. | 285 /// unless it's already complete. |
286 void _signalError(var e, [StackTrace stackTrace]) { | 286 void _signalError(var e, [StackTrace stackTrace]) { |
287 if (_isDone) return; | 287 if (_isDone) return; |
288 _subscription.cancel(); | 288 _subscription.cancel(); |
289 // Call these asynchronously to work around issue 7913. | 289 // Call these asynchronously to work around issue 7913. |
290 new Future.value().then((_) { | 290 new Future.value().then((_) { |
291 _controller.addError(e, stackTrace); | 291 _controller.addError(e, stackTrace); |
292 _controller.close(); | 292 _controller.close(); |
293 }); | 293 }); |
294 } | 294 } |
295 } | 295 } |
OLD | NEW |