| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library barback.utils; | 5 library barback.utils; |
| 6 | 6 |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 | 8 |
| 9 /// A pair of values. | 9 /// A pair of values. |
| 10 class Pair<E, F> { | 10 class Pair<E, F> { |
| (...skipping 178 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 189 stream.listen( | 189 stream.listen( |
| 190 controller.add, | 190 controller.add, |
| 191 onError: (error) => controller.addError(error), | 191 onError: (error) => controller.addError(error), |
| 192 onDone: controller.close); | 192 onDone: controller.close); |
| 193 }).catchError((e) { | 193 }).catchError((e) { |
| 194 controller.addError(e); | 194 controller.addError(e); |
| 195 controller.close(); | 195 controller.close(); |
| 196 }); | 196 }); |
| 197 return controller.stream; | 197 return controller.stream; |
| 198 } | 198 } |
| 199 |
| 200 typedef Future<T> _FutureThing<T>(dynamic value); |
| 201 |
| 202 class _FuturePool<T> { |
| 203 static const int _GROUP_SIZE = 10; |
| 204 |
| 205 final Iterator<T> _iterator; |
| 206 final _FutureThing<T> _factory; |
| 207 final Completer _completer = new Completer(); |
| 208 int _runningCount = 0; |
| 209 |
| 210 _FuturePool(this._iterator, this._factory) { |
| 211 // TODO: runAsync? |
| 212 _populate(); |
| 213 } |
| 214 |
| 215 Future get future => _completer.future; |
| 216 |
| 217 void _populate() { |
| 218 bool itemsLeft = false; |
| 219 while(_runningCount < _GROUP_SIZE && !_completer.isCompleted) { |
| 220 itemsLeft = _iterator.moveNext(); |
| 221 if(!itemsLeft) break; |
| 222 |
| 223 T item = _iterator.current; |
| 224 Future future = _factory(item); |
| 225 if(future != null) { |
| 226 _runningCount++; |
| 227 future.then(_onSuccess, onError: _onError); |
| 228 } |
| 229 } |
| 230 |
| 231 assert(_runningCount >= 0); |
| 232 |
| 233 if(!itemsLeft && _runningCount == 0 && !_completer.isCompleted) { |
| 234 _completer.complete(); |
| 235 } |
| 236 } |
| 237 |
| 238 void _onSuccess(T value) { |
| 239 assert(_runningCount > 0); |
| 240 _runningCount--; |
| 241 // TODO: runAsync? |
| 242 _populate(); |
| 243 } |
| 244 |
| 245 void _onError(Object err) { |
| 246 assert(_runningCount > 0); |
| 247 _runningCount--; |
| 248 if(!_completer.isCompleted) _completer.completeError(err); |
| 249 } |
| 250 } |
| 251 |
| 252 Future forEachPooledFuture(Iterable source, Future action(dynamic item)) { |
| 253 var pool = new _FuturePool(source.iterator, action); |
| 254 return pool.future; |
| 255 } |
| OLD | NEW |