Chromium Code Reviews| Index: lib/pool.dart |
| diff --git a/lib/pool.dart b/lib/pool.dart |
| index e8ee99cab190783c9f0b9e97a89720c165fa5637..ac55388a55cd1a76324f7af668342f5a22f3362d 100644 |
| --- a/lib/pool.dart |
| +++ b/lib/pool.dart |
| @@ -7,6 +7,7 @@ library pool; |
| import 'dart:async'; |
| import 'dart:collection'; |
| +import 'package:async/async.dart'; |
| import 'package:stack_trace/stack_trace.dart'; |
| /// Manages an abstract pool of resources with a limit on how many may be in use |
| @@ -55,6 +56,15 @@ class Pool { |
| /// The amount of time to wait before timing out the pending resources. |
| final Duration _timeout; |
| + /// A [FutureGroup] that tracks all the `onRelease` callbacks for resources |
| + /// that have been marked releasable. |
| + /// |
| + /// This is `null` until [close] is called. |
| + FutureGroup _closeGroup; |
| + |
| + /// Whether [close] has been called. |
| + bool get isClosed => _closeGroup != null; |
| + |
| /// Creates a new pool with the given limit on how many resources may be |
| /// allocated at once. |
| /// |
| @@ -69,6 +79,10 @@ class Pool { |
| /// If the maximum number of resources is already allocated, this will delay |
| /// until one of them is released. |
| Future<PoolResource> request() { |
| + if (isClosed) { |
| + throw new StateError("request() may not be called on a closed Pool."); |
| + } |
| + |
| if (_allocatedResources < _maxAllocatedResources) { |
| _allocatedResources++; |
| return new Future.value(new PoolResource._(this)); |
| @@ -87,20 +101,53 @@ class Pool { |
| /// |
| /// The return value of [callback] is piped to the returned Future. |
| Future withResource(callback()) { |
| - return request().then((resource) => new Future.sync(callback).whenComplete(resource.release)); |
| + if (isClosed) { |
| + throw new StateError( |
| + "withResource() may not be called on a closed Pool."); |
| + } |
| + |
| + return request().then((resource) { |
|
Bob Nystrom
2015/10/09 00:25:25
Use async/await?
nweiz
2015/10/09 00:46:19
This is used in test, which means we have to work
|
| + return new Future.sync(callback).whenComplete(resource.release); |
| + }); |
| + } |
| + |
| + /// Closes the pool so that no more resources are requested. |
| + /// |
| + /// Existing resource requests remain unchanged. |
| + /// |
| + /// Any resources that are marked as releasable using |
| + /// [PoolResource.allowRelease] are released immediately. Once all resources |
| + /// have been released and any `onRelease` callbacks have completed, the |
| + /// returned future completes successfully. If any `onRelease` callback throws |
| + /// an error, the returned future completes with that error. |
| + Future close() { |
| + if (_closeGroup != null) return _closeGroup.future; |
|
Bob Nystrom
2015/10/09 00:25:25
Document that it can be called more than once.
nweiz
2015/10/09 00:46:19
Done.
|
| + |
| + _resetTimer(); |
| + |
| + _closeGroup = new FutureGroup(); |
| + for (var callback in _onReleaseCallbacks) { |
| + _closeGroup.add(new Future.sync(callback)); |
| + } |
| + |
| + _allocatedResources -= _onReleaseCallbacks.length; |
| + _onReleaseCallbacks.clear(); |
| + |
| + if (_allocatedResources == 0) _closeGroup.close(); |
| + return _closeGroup.future; |
| } |
| /// If there are any pending requests, this will fire the oldest one. |
| void _onResourceReleased() { |
| _resetTimer(); |
| - if (_requestedResources.isEmpty) { |
| + if (_requestedResources.isNotEmpty) { |
| + var pending = _requestedResources.removeFirst(); |
| + pending.complete(new PoolResource._(this)); |
| + } else { |
| _allocatedResources--; |
| - return; |
| + if (isClosed && _allocatedResources == 0) _closeGroup.close(); |
| } |
| - |
| - var pending = _requestedResources.removeFirst(); |
| - pending.complete(new PoolResource._(this)); |
| } |
| /// If there are any pending requests, this will fire the oldest one after |
| @@ -108,14 +155,17 @@ class Pool { |
| void _onResourceReleaseAllowed(onRelease()) { |
| _resetTimer(); |
| - if (_requestedResources.isEmpty) { |
| + if (_requestedResources.isNotEmpty) { |
| + var pending = _requestedResources.removeFirst(); |
| + pending.complete(_runOnRelease(onRelease)); |
| + } else if (isClosed) { |
| + _closeGroup.add(new Future.sync(onRelease)); |
| + _allocatedResources--; |
| + if (_allocatedResources == 0) _closeGroup.close(); |
| + } else { |
| _onReleaseCallbacks.add( |
| Zone.current.bindCallback(onRelease, runGuarded: false)); |
| - return; |
| } |
| - |
| - var pending = _requestedResources.removeFirst(); |
| - pending.complete(_runOnRelease(onRelease)); |
| } |
| /// Runs [onRelease] and returns a Future that completes to a resource once an |
| @@ -202,4 +252,3 @@ class PoolResource { |
| _pool._onResourceReleaseAllowed(onRelease); |
| } |
| } |
| - |