Index: packages/pool/lib/pool.dart |
diff --git a/pool/lib/pool.dart b/packages/pool/lib/pool.dart |
similarity index 75% |
rename from pool/lib/pool.dart |
rename to packages/pool/lib/pool.dart |
index 6941229b3e084d18f2d2096ed0630569b688f9bb..59b949e0eef0da58ef4d49844e205a46c67dc325 100644 |
--- a/pool/lib/pool.dart |
+++ b/packages/pool/lib/pool.dart |
@@ -7,6 +7,7 @@ library pool; |
import 'dart:async'; |
import 'dart:collection'; |
+import 'package:async/async.dart'; |
import 'package:stack_trace/stack_trace.dart'; |
/// Manages an abstract pool of resources with a limit on how many may be in use |
@@ -55,6 +56,15 @@ class Pool { |
/// The amount of time to wait before timing out the pending resources. |
final Duration _timeout; |
+ /// A [FutureGroup] that tracks all the `onRelease` callbacks for resources |
+ /// that have been marked releasable. |
+ /// |
+ /// This is `null` until [close] is called. |
+ FutureGroup _closeGroup; |
+ |
+ /// Whether [close] has been called. |
+ bool get isClosed => _closeGroup != null; |
+ |
/// Creates a new pool with the given limit on how many resources may be |
/// allocated at once. |
/// |
@@ -69,6 +79,10 @@ class Pool { |
/// If the maximum number of resources is already allocated, this will delay |
/// until one of them is released. |
Future<PoolResource> request() { |
+ if (isClosed) { |
+ throw new StateError("request() may not be called on a closed Pool."); |
+ } |
+ |
if (_allocatedResources < _maxAllocatedResources) { |
_allocatedResources++; |
return new Future.value(new PoolResource._(this)); |
@@ -87,21 +101,56 @@ class Pool { |
/// |
/// The return value of [callback] is piped to the returned Future. |
Future withResource(callback()) { |
- return request().then((resource) => |
- Chain.track(new Future.sync(callback)).whenComplete(resource.release)); |
+ if (isClosed) { |
+ throw new StateError( |
+ "withResource() may not be called on a closed Pool."); |
+ } |
+ |
+ // TODO(nweiz): Use async/await when sdk#23497 is fixed. |
+ return request().then((resource) { |
+ return new Future.sync(callback).whenComplete(resource.release); |
+ }); |
+ } |
+ |
+ /// Closes the pool so that no more resources are requested. |
+ /// |
+ /// Existing resource requests remain unchanged. |
+ /// |
+ /// Any resources that are marked as releasable using |
+ /// [PoolResource.allowRelease] are released immediately. Once all resources |
+ /// have been released and any `onRelease` callbacks have completed, the |
+ /// returned future completes successfully. If any `onRelease` callback throws |
+ /// an error, the returned future completes with that error. |
+ /// |
+ /// This may be called more than once; it returns the same [Future] each time. |
+ Future close() { |
+ if (_closeGroup != null) return _closeGroup.future; |
+ |
+ _resetTimer(); |
+ |
+ _closeGroup = new FutureGroup(); |
+ for (var callback in _onReleaseCallbacks) { |
+ _closeGroup.add(new Future.sync(callback)); |
+ } |
+ |
+ _allocatedResources -= _onReleaseCallbacks.length; |
+ _onReleaseCallbacks.clear(); |
+ |
+ if (_allocatedResources == 0) _closeGroup.close(); |
+ return _closeGroup.future; |
} |
/// If there are any pending requests, this will fire the oldest one. |
void _onResourceReleased() { |
_resetTimer(); |
- if (_requestedResources.isEmpty) { |
+ if (_requestedResources.isNotEmpty) { |
+ var pending = _requestedResources.removeFirst(); |
+ pending.complete(new PoolResource._(this)); |
+ } else { |
_allocatedResources--; |
- return; |
+ if (isClosed && _allocatedResources == 0) _closeGroup.close(); |
} |
- |
- var pending = _requestedResources.removeFirst(); |
- pending.complete(new PoolResource._(this)); |
} |
/// If there are any pending requests, this will fire the oldest one after |
@@ -109,14 +158,17 @@ class Pool { |
void _onResourceReleaseAllowed(onRelease()) { |
_resetTimer(); |
- if (_requestedResources.isEmpty) { |
+ if (_requestedResources.isNotEmpty) { |
+ var pending = _requestedResources.removeFirst(); |
+ pending.complete(_runOnRelease(onRelease)); |
+ } else if (isClosed) { |
+ _closeGroup.add(new Future.sync(onRelease)); |
+ _allocatedResources--; |
+ if (_allocatedResources == 0) _closeGroup.close(); |
+ } else { |
_onReleaseCallbacks.add( |
Zone.current.bindCallback(onRelease, runGuarded: false)); |
- return; |
} |
- |
- var pending = _requestedResources.removeFirst(); |
- pending.complete(_runOnRelease(onRelease)); |
} |
/// Runs [onRelease] and returns a Future that completes to a resource once an |
@@ -203,4 +255,3 @@ class PoolResource { |
_pool._onResourceReleaseAllowed(onRelease); |
} |
} |
- |