Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(165)

Side by Side Diff: packages/async/lib/src/future_group.dart

Issue 1400473008: Roll Observatory packages and add a roll script (Closed) Base URL: git@github.com:dart-lang/observatory_pub_packages.git@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.future_group;
6
7 import 'dart:async';
8
9 /// A collection of futures waits until all added [Future]s complete.
10 ///
11 /// Futures are added to the group with [add]. Once you're finished adding
12 /// futures, signal that by calling [close]. Then, once all added futures have
13 /// completed, [future] will complete with a list of values from the futures in
14 /// the group, in the order they were added.
15 ///
16 /// If any added future completes with an error, [future] will emit that error
17 /// and the group will be closed, regardless of the state of other futures in
18 /// the group.
19 ///
20 /// This is similar to [Future.wait] with `eagerError` set to `true`, except
21 /// that a [FutureGroup] can have futures added gradually over time rather than
22 /// needing them all at once.
23 class FutureGroup<T> implements Sink<Future<T>> {
24 /// The number of futures that have yet to complete.
25 var _pending = 0;
26
27 /// Whether [close] has been called.
28 var _closed = false;
29
30 /// The future that fires once [close] has been called and all futures in the
31 /// group have completed.
32 ///
33 /// This will also complete with an error if any of the futures in the group
34 /// fails, regardless of whether [close] was called.
35 Future<List<T>> get future => _completer.future;
36 final _completer = new Completer<List<T>>();
37
38 /// Whether this group is waiting on any futures.
39 bool get isIdle => _pending == 0;
40
41 /// A broadcast stream that emits a `null` event whenever the last pending
42 /// future in this group completes.
43 ///
44 /// Once this group isn't waiting on any futures *and* [close] has been
45 /// called, this stream will close.
46 Stream get onIdle {
47 if (_onIdleController == null) {
48 _onIdleController = new StreamController.broadcast(sync: true);
49 }
50 return _onIdleController.stream;
51 }
52 StreamController _onIdleController;
53
54 /// The values emitted by the futures that have been added to the group, in
55 /// the order they were added.
56 ///
57 /// The slots for futures that haven't completed yet are `null`.
58 final _values = new List<T>();
59
60 /// Wait for [task] to complete.
61 void add(Future<T> task) {
62 if (_closed) throw new StateError("The FutureGroup is closed.");
63
64 // Ensure that future values are put into [values] in the same order they're
65 // added to the group by pre-allocating a slot for them and recording its
66 // index.
67 var index = _values.length;
68 _values.add(null);
69
70 _pending++;
71 task.then((value) {
72 if (_completer.isCompleted) return;
73
74 _pending--;
75 _values[index] = value;
76
77 if (_pending != 0) return;
78 if (_onIdleController != null) _onIdleController.add(null);
79
80 if (!_closed) return;
81 if (_onIdleController != null) _onIdleController.close();
82 _completer.complete(_values);
83 }).catchError((error, stackTrace) {
84 if (_completer.isCompleted) return;
85 _completer.completeError(error, stackTrace);
86 });
87 }
88
89 /// Signals to the group that the caller is done adding futures, and so
90 /// [future] should fire once all added futures have completed.
91 void close() {
92 _closed = true;
93 if (_pending != 0) return;
94 if (_completer.isCompleted) return;
95 _completer.complete(_values);
96 }
97 }
98
OLDNEW
« no previous file with comments | « packages/async/lib/src/delegate/stream_subscription.dart ('k') | packages/async/lib/src/result_future.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698