OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 library async.future_group; |
| 6 |
| 7 import 'dart:async'; |
| 8 |
| 9 /// A collection of futures waits until all added [Future]s complete. |
| 10 /// |
| 11 /// Futures are added to the group with [add]. Once you're finished adding |
| 12 /// futures, signal that by calling [close]. Then, once all added futures have |
| 13 /// completed, [future] will complete with a list of values from the futures in |
| 14 /// the group, in the order they were added. |
| 15 /// |
| 16 /// If any added future completes with an error, [future] will emit that error |
| 17 /// and the group will be closed, regardless of the state of other futures in |
| 18 /// the group. |
| 19 /// |
| 20 /// This is similar to [Future.wait] with `eagerError` set to `true`, except |
| 21 /// that a [FutureGroup] can have futures added gradually over time rather than |
| 22 /// needing them all at once. |
| 23 class FutureGroup<T> implements Sink<Future<T>> { |
| 24 /// The number of futures that have yet to complete. |
| 25 var _pending = 0; |
| 26 |
| 27 /// Whether [close] has been called. |
| 28 var _closed = false; |
| 29 |
| 30 /// The future that fires once [close] has been called and all futures in the |
| 31 /// group have completed. |
| 32 /// |
| 33 /// This will also complete with an error if any of the futures in the group |
| 34 /// fails, regardless of whether [close] was called. |
| 35 Future<List<T>> get future => _completer.future; |
| 36 final _completer = new Completer<List<T>>(); |
| 37 |
| 38 /// Whether this group is waiting on any futures. |
| 39 bool get isIdle => _pending == 0; |
| 40 |
| 41 /// A broadcast stream that emits a `null` event whenever the last pending |
| 42 /// future in this group completes. |
| 43 /// |
| 44 /// Once this group isn't waiting on any futures *and* [close] has been |
| 45 /// called, this stream will close. |
| 46 Stream get onIdle { |
| 47 if (_onIdleController == null) { |
| 48 _onIdleController = new StreamController.broadcast(sync: true); |
| 49 } |
| 50 return _onIdleController.stream; |
| 51 } |
| 52 StreamController _onIdleController; |
| 53 |
| 54 /// The values emitted by the futures that have been added to the group, in |
| 55 /// the order they were added. |
| 56 /// |
| 57 /// The slots for futures that haven't completed yet are `null`. |
| 58 final _values = new List<T>(); |
| 59 |
| 60 /// Wait for [task] to complete. |
| 61 void add(Future<T> task) { |
| 62 if (_closed) throw new StateError("The FutureGroup is closed."); |
| 63 |
| 64 // Ensure that future values are put into [values] in the same order they're |
| 65 // added to the group by pre-allocating a slot for them and recording its |
| 66 // index. |
| 67 var index = _values.length; |
| 68 _values.add(null); |
| 69 |
| 70 _pending++; |
| 71 task.then((value) { |
| 72 if (_completer.isCompleted) return; |
| 73 |
| 74 _pending--; |
| 75 _values[index] = value; |
| 76 |
| 77 if (_pending != 0) return; |
| 78 if (_onIdleController != null) _onIdleController.add(null); |
| 79 |
| 80 if (!_closed) return; |
| 81 if (_onIdleController != null) _onIdleController.close(); |
| 82 _completer.complete(_values); |
| 83 }).catchError((error, stackTrace) { |
| 84 if (_completer.isCompleted) return; |
| 85 _completer.completeError(error, stackTrace); |
| 86 }); |
| 87 } |
| 88 |
| 89 /// Signals to the group that the caller is done adding futures, and so |
| 90 /// [future] should fire once all added futures have completed. |
| 91 void close() { |
| 92 _closed = true; |
| 93 if (_pending != 0) return; |
| 94 if (_completer.isCompleted) return; |
| 95 _completer.complete(_values); |
| 96 } |
| 97 } |
| 98 |
OLD | NEW |