OLD | NEW |
1 // Copyright 2013 Google Inc. All Rights Reserved. | 1 // Copyright 2013 Google Inc. All Rights Reserved. |
2 // | 2 // |
3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
6 // | 6 // |
7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
8 // | 8 // |
9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 // See the License for the specific language governing permissions and | 12 // See the License for the specific language governing permissions and |
13 // limitations under the License. | 13 // limitations under the License. |
14 | 14 |
15 part of quiver.async; | 15 part of quiver.async; |
16 | 16 |
17 /** | 17 /// An asynchronous callback that returns a value. |
18 * An asynchronous callback that returns a value. | |
19 */ | |
20 typedef Future<T> AsyncAction<T>(e); | 18 typedef Future<T> AsyncAction<T>(e); |
21 | 19 |
22 /** | 20 /// An asynchronous funcuntion that combines an element [e] with a previous |
23 * An asynchronous funcuntion that combines an element [e] with a previous value | 21 /// value [previous], for use with [reduceAsync]. |
24 * [previous], for use with [reduceAsync]. | |
25 */ | |
26 typedef Future<T> AsyncCombiner<T>(T previous, e); | 22 typedef Future<T> AsyncCombiner<T>(T previous, e); |
27 | 23 |
28 /** | 24 /// Calls [action] for each item in [iterable] in turn, waiting for the Future |
29 * Calls [action] for each item in [iterable] in turn, waiting for the Future | 25 /// returned by action to complete. |
30 * returned by action to complete. | 26 /// |
31 * | 27 /// If the Future completes to [true], iteration continues. |
32 * If the Future completes to [true], iteration continues. | 28 /// |
33 * | 29 /// The Future returned completes to [true] if the entire iterable was |
34 * The Future returned completes to [true] if the entire iterable was processed, | 30 /// processed, otherwise [false]. |
35 * otherwise [false]. | |
36 */ | |
37 Future doWhileAsync(Iterable iterable, AsyncAction<bool> action) => | 31 Future doWhileAsync(Iterable iterable, AsyncAction<bool> action) => |
38 _doWhileAsync(iterable.iterator, action); | 32 _doWhileAsync(iterable.iterator, action); |
39 | 33 |
40 Future _doWhileAsync( | 34 Future _doWhileAsync(Iterator iterator, AsyncAction<bool> action) async { |
41 Iterator iterator, AsyncAction<bool> action) => (iterator.moveNext()) | 35 if (iterator.moveNext()) { |
42 ? action(iterator.current).then((bool result) => | 36 return await action(iterator.current) |
43 (result) ? _doWhileAsync(iterator, action) : new Future.value(false)) | 37 ? _doWhileAsync(iterator, action) |
44 : new Future.value(true); | 38 : false; |
| 39 } |
| 40 return true; |
| 41 } |
45 | 42 |
46 /** | 43 /// Reduces a collection to a single value by iteratively combining elements of |
47 * Reduces a collection to a single value by iteratively combining elements | 44 /// the collection using the provided [combine] function. Similar to |
48 * of the collection using the provided [combine] function. Similar to | 45 /// [Iterable.reduce], except that [combine] is an async function that returns |
49 * [Iterable.reduce], except that [combine] is an async function that returns a | 46 /// a [Future]. |
50 * [Future]. | |
51 */ | |
52 Future reduceAsync(Iterable iterable, initialValue, AsyncCombiner combine) => | 47 Future reduceAsync(Iterable iterable, initialValue, AsyncCombiner combine) => |
53 _reduceAsync(iterable.iterator, initialValue, combine); | 48 _reduceAsync(iterable.iterator, initialValue, combine); |
54 | 49 |
55 Future _reduceAsync(Iterator iterator, currentValue, AsyncCombiner combine) { | 50 Future _reduceAsync(Iterator iterator, current, AsyncCombiner combine) async { |
56 if (iterator.moveNext()) { | 51 if (iterator.moveNext()) { |
57 return combine(currentValue, iterator.current) | 52 var result = await combine(current, iterator.current); |
58 .then((result) => _reduceAsync(iterator, result, combine)); | 53 return _reduceAsync(iterator, result, combine); |
59 } | 54 } |
60 return new Future.value(currentValue); | 55 return current; |
61 } | 56 } |
62 | 57 |
63 /** | 58 /// Schedules calls to [action] for each element in [iterable]. No more than |
64 * Schedules calls to [action] for each element in [iterable]. No more than | 59 /// [maxTasks] calls to [action] will be pending at once. |
65 * [maxTasks] calls to [action] will be pending at once. | |
66 */ | |
67 Future forEachAsync(Iterable iterable, AsyncAction action, {int maxTasks: 1}) { | 60 Future forEachAsync(Iterable iterable, AsyncAction action, {int maxTasks: 1}) { |
68 if (maxTasks == null || maxTasks < 1) { | 61 if (maxTasks == null || maxTasks < 1) { |
69 throw new ArgumentError("maxTasks must be greater than 0, was: $maxTasks"); | 62 throw new ArgumentError("maxTasks must be greater than 0, was: $maxTasks"); |
70 } | 63 } |
71 | 64 |
72 if (iterable == null) { | 65 if (iterable == null) { |
73 throw new ArgumentError("iterable must not be null"); | 66 throw new ArgumentError("iterable must not be null"); |
74 } | 67 } |
75 | 68 |
76 if (iterable.isEmpty) return new Future.value(); | 69 if (iterable.isEmpty) return new Future.value(); |
77 | 70 |
78 var completer = new Completer(); | 71 var completer = new Completer(); |
79 var iterator = iterable.iterator; | 72 var iterator = iterable.iterator; |
80 int pending = 0; | 73 int pending = 0; |
81 bool failed = false; | 74 bool failed = false; |
82 | 75 |
83 bool scheduleTask() { | 76 bool scheduleTask() { |
84 if (pending < maxTasks && iterator.moveNext()) { | 77 if (pending < maxTasks && iterator.moveNext()) { |
85 pending++; | 78 pending++; |
86 var item = iterator.current; | 79 var item = iterator.current; |
87 scheduleMicrotask(() { | 80 scheduleMicrotask(() { |
88 var task = action(item); | 81 var task = action(item); |
89 task.then((_) { | 82 task.then((_) { |
90 pending--; | 83 pending--; |
91 if (failed) return; | 84 if (failed) return; |
92 if (!scheduleTask() && pending == 0) { | 85 if (!scheduleTask() && pending == 0) { |
93 completer.complete(); | 86 completer.complete(); |
94 } | 87 } |
95 }).catchError((e) { | 88 }).catchError((e, stack) { |
96 if (failed) return; | 89 if (failed) return; |
97 failed = true; | 90 failed = true; |
98 completer.completeError(e); | 91 completer.completeError(e, stack); |
99 }); | 92 }); |
100 }); | 93 }); |
101 return true; | 94 return true; |
102 } | 95 } |
103 return false; | 96 return false; |
104 } | 97 } |
| 98 |
105 while (scheduleTask()) {} | 99 while (scheduleTask()) {} |
106 return completer.future; | 100 return completer.future; |
107 } | 101 } |
OLD | NEW |