Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(66)

Side by Side Diff: pkg/barback/lib/src/utils.dart

Issue 26273003: Pool future creation to ensure no more than 10 ops are in flight (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: using in polymer Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « pkg/barback/lib/src/phase_input.dart ('k') | pkg/polymer/lib/src/build/runner.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library barback.utils; 5 library barback.utils;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 8
9 /// A pair of values. 9 /// A pair of values.
10 class Pair<E, F> { 10 class Pair<E, F> {
(...skipping 178 matching lines...) Expand 10 before | Expand all | Expand 10 after
189 stream.listen( 189 stream.listen(
190 controller.add, 190 controller.add,
191 onError: (error) => controller.addError(error), 191 onError: (error) => controller.addError(error),
192 onDone: controller.close); 192 onDone: controller.close);
193 }).catchError((e) { 193 }).catchError((e) {
194 controller.addError(e); 194 controller.addError(e);
195 controller.close(); 195 controller.close();
196 }); 196 });
197 return controller.stream; 197 return controller.stream;
198 } 198 }
199
200 typedef Future<T> _FutureThing<T>(dynamic value);
201
202 class _FuturePool<T> {
203 static const int _GROUP_SIZE = 10;
204
205 final Iterator<T> _iterator;
206 final _FutureThing<T> _factory;
207 final Completer _completer = new Completer();
208 int _runningCount = 0;
209
210 _FuturePool(this._iterator, this._factory) {
211 // TODO: runAsync?
212 _populate();
213 }
214
215 Future get future => _completer.future;
216
217 void _populate() {
218 bool itemsLeft = false;
219 while(_runningCount < _GROUP_SIZE && !_completer.isCompleted) {
220 itemsLeft = _iterator.moveNext();
221 if(!itemsLeft) break;
222
223 T item = _iterator.current;
224 Future future = _factory(item);
225 if(future != null) {
226 _runningCount++;
227 future.then(_onSuccess, onError: _onError);
228 }
229 }
230
231 assert(_runningCount >= 0);
232
233 if(!itemsLeft && _runningCount == 0 && !_completer.isCompleted) {
234 _completer.complete();
235 }
236 }
237
238 void _onSuccess(T value) {
239 assert(_runningCount > 0);
240 _runningCount--;
241 // TODO: runAsync?
242 _populate();
243 }
244
245 void _onError(Object err) {
246 assert(_runningCount > 0);
247 _runningCount--;
248 if(!_completer.isCompleted) _completer.completeError(err);
249 }
250 }
251
252 Future forEachPooledFuture(Iterable source, Future action(dynamic item)) {
253 var pool = new _FuturePool(source.iterator, action);
254 return pool.future;
255 }
OLDNEW
« no previous file with comments | « pkg/barback/lib/src/phase_input.dart ('k') | pkg/polymer/lib/src/build/runner.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698