| Index: lib/pool.dart
|
| diff --git a/lib/pool.dart b/lib/pool.dart
|
| index e8ee99cab190783c9f0b9e97a89720c165fa5637..59b949e0eef0da58ef4d49844e205a46c67dc325 100644
|
| --- a/lib/pool.dart
|
| +++ b/lib/pool.dart
|
| @@ -7,6 +7,7 @@ library pool;
|
| import 'dart:async';
|
| import 'dart:collection';
|
|
|
| +import 'package:async/async.dart';
|
| import 'package:stack_trace/stack_trace.dart';
|
|
|
| /// Manages an abstract pool of resources with a limit on how many may be in use
|
| @@ -55,6 +56,15 @@ class Pool {
|
| /// The amount of time to wait before timing out the pending resources.
|
| final Duration _timeout;
|
|
|
| + /// A [FutureGroup] that tracks all the `onRelease` callbacks for resources
|
| + /// that have been marked releasable.
|
| + ///
|
| + /// This is `null` until [close] is called.
|
| + FutureGroup _closeGroup;
|
| +
|
| + /// Whether [close] has been called.
|
| + bool get isClosed => _closeGroup != null;
|
| +
|
| /// Creates a new pool with the given limit on how many resources may be
|
| /// allocated at once.
|
| ///
|
| @@ -69,6 +79,10 @@ class Pool {
|
| /// If the maximum number of resources is already allocated, this will delay
|
| /// until one of them is released.
|
| Future<PoolResource> request() {
|
| + if (isClosed) {
|
| + throw new StateError("request() may not be called on a closed Pool.");
|
| + }
|
| +
|
| if (_allocatedResources < _maxAllocatedResources) {
|
| _allocatedResources++;
|
| return new Future.value(new PoolResource._(this));
|
| @@ -87,20 +101,56 @@ class Pool {
|
| ///
|
| /// The return value of [callback] is piped to the returned Future.
|
| Future withResource(callback()) {
|
| - return request().then((resource) => new Future.sync(callback).whenComplete(resource.release));
|
| + if (isClosed) {
|
| + throw new StateError(
|
| + "withResource() may not be called on a closed Pool.");
|
| + }
|
| +
|
| + // TODO(nweiz): Use async/await when sdk#23497 is fixed.
|
| + return request().then((resource) {
|
| + return new Future.sync(callback).whenComplete(resource.release);
|
| + });
|
| + }
|
| +
|
| + /// Closes the pool so that no more resources are requested.
|
| + ///
|
| + /// Existing resource requests remain unchanged.
|
| + ///
|
| + /// Any resources that are marked as releasable using
|
| + /// [PoolResource.allowRelease] are released immediately. Once all resources
|
| + /// have been released and any `onRelease` callbacks have completed, the
|
| + /// returned future completes successfully. If any `onRelease` callback throws
|
| + /// an error, the returned future completes with that error.
|
| + ///
|
| + /// This may be called more than once; it returns the same [Future] each time.
|
| + Future close() {
|
| + if (_closeGroup != null) return _closeGroup.future;
|
| +
|
| + _resetTimer();
|
| +
|
| + _closeGroup = new FutureGroup();
|
| + for (var callback in _onReleaseCallbacks) {
|
| + _closeGroup.add(new Future.sync(callback));
|
| + }
|
| +
|
| + _allocatedResources -= _onReleaseCallbacks.length;
|
| + _onReleaseCallbacks.clear();
|
| +
|
| + if (_allocatedResources == 0) _closeGroup.close();
|
| + return _closeGroup.future;
|
| }
|
|
|
| /// If there are any pending requests, this will fire the oldest one.
|
| void _onResourceReleased() {
|
| _resetTimer();
|
|
|
| - if (_requestedResources.isEmpty) {
|
| + if (_requestedResources.isNotEmpty) {
|
| + var pending = _requestedResources.removeFirst();
|
| + pending.complete(new PoolResource._(this));
|
| + } else {
|
| _allocatedResources--;
|
| - return;
|
| + if (isClosed && _allocatedResources == 0) _closeGroup.close();
|
| }
|
| -
|
| - var pending = _requestedResources.removeFirst();
|
| - pending.complete(new PoolResource._(this));
|
| }
|
|
|
| /// If there are any pending requests, this will fire the oldest one after
|
| @@ -108,14 +158,17 @@ class Pool {
|
| void _onResourceReleaseAllowed(onRelease()) {
|
| _resetTimer();
|
|
|
| - if (_requestedResources.isEmpty) {
|
| + if (_requestedResources.isNotEmpty) {
|
| + var pending = _requestedResources.removeFirst();
|
| + pending.complete(_runOnRelease(onRelease));
|
| + } else if (isClosed) {
|
| + _closeGroup.add(new Future.sync(onRelease));
|
| + _allocatedResources--;
|
| + if (_allocatedResources == 0) _closeGroup.close();
|
| + } else {
|
| _onReleaseCallbacks.add(
|
| Zone.current.bindCallback(onRelease, runGuarded: false));
|
| - return;
|
| }
|
| -
|
| - var pending = _requestedResources.removeFirst();
|
| - pending.complete(_runOnRelease(onRelease));
|
| }
|
|
|
| /// Runs [onRelease] and returns a Future that completes to a resource once an
|
| @@ -202,4 +255,3 @@ class PoolResource {
|
| _pool._onResourceReleaseAllowed(onRelease);
|
| }
|
| }
|
| -
|
|
|