Index: mojo/public/dart/third_party/pool/lib/pool.dart |
diff --git a/mojo/public/dart/third_party/pool/lib/pool.dart b/mojo/public/dart/third_party/pool/lib/pool.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..6941229b3e084d18f2d2096ed0630569b688f9bb |
--- /dev/null |
+++ b/mojo/public/dart/third_party/pool/lib/pool.dart |
@@ -0,0 +1,206 @@ |
+// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+library pool; |
+ |
+import 'dart:async'; |
+import 'dart:collection'; |
+ |
+import 'package:stack_trace/stack_trace.dart'; |
+ |
+/// Manages an abstract pool of resources with a limit on how many may be in use |
+/// at once. |
+/// |
+/// When a resource is needed, the user should call [request]. When the returned |
+/// future completes with a [PoolResource], the resource may be allocated. Once |
+/// the resource has been released, the user should call [PoolResource.release]. |
+/// The pool will ensure that only a certain number of [PoolResource]s may be |
+/// allocated at once. |
+class Pool { |
+ /// Completers for requests beyond the first [_maxAllocatedResources]. |
+ /// |
+ /// When an item is released, the next element of [_requestedResources] will |
+ /// be completed. |
+ final _requestedResources = new Queue<Completer<PoolResource>>(); |
+ |
+ /// Callbacks that must be called before additional resources can be |
+ /// allocated. |
+ /// |
+ /// See [PoolResource.allowRelease]. |
+ final _onReleaseCallbacks = new Queue<Function>(); |
+ |
+ /// Completers that will be completed once `onRelease` callbacks are done |
+ /// running. |
+ /// |
+ /// These are kept in a queue to ensure that the earliest request completes |
+ /// first regardless of what order the `onRelease` callbacks complete in. |
+ final _onReleaseCompleters = new Queue<Completer<PoolResource>>(); |
+ |
+ /// The maximum number of resources that may be allocated at once. |
+ final int _maxAllocatedResources; |
+ |
+ /// The number of resources that are currently allocated. |
+ int _allocatedResources = 0; |
+ |
+ /// The timeout timer. |
+ /// |
+ /// If [_timeout] isn't null, this timer is set as soon as the resource limit |
+ /// is reached and is reset every time an resource is released or a new |
+ /// resource is requested. If it fires, that indicates that the caller became |
+ /// deadlocked, likely due to files waiting for additional files to be read |
+ /// before they could be closed. |
+ Timer _timer; |
+ |
+ /// The amount of time to wait before timing out the pending resources. |
+ final Duration _timeout; |
+ |
+ /// Creates a new pool with the given limit on how many resources may be |
+ /// allocated at once. |
+ /// |
+ /// If [timeout] is passed, then if that much time passes without any activity |
+ /// all pending [request] futures will throw a [TimeoutException]. This is |
+ /// intended to avoid deadlocks. |
+ Pool(this._maxAllocatedResources, {Duration timeout}) |
+ : _timeout = timeout; |
+ |
+ /// Request a [PoolResource]. |
+ /// |
+ /// If the maximum number of resources is already allocated, this will delay |
+ /// until one of them is released. |
+ Future<PoolResource> request() { |
+ if (_allocatedResources < _maxAllocatedResources) { |
+ _allocatedResources++; |
+ return new Future.value(new PoolResource._(this)); |
+ } else if (_onReleaseCallbacks.isNotEmpty) { |
+ return _runOnRelease(_onReleaseCallbacks.removeFirst()); |
+ } else { |
+ var completer = new Completer<PoolResource>(); |
+ _requestedResources.add(completer); |
+ _resetTimer(); |
+ return completer.future; |
+ } |
+ } |
+ |
+ /// Requests a resource for the duration of [callback], which may return a |
+ /// Future. |
+ /// |
+ /// The return value of [callback] is piped to the returned Future. |
+ Future withResource(callback()) { |
+ return request().then((resource) => |
+ Chain.track(new Future.sync(callback)).whenComplete(resource.release)); |
+ } |
+ |
+ /// If there are any pending requests, this will fire the oldest one. |
+ void _onResourceReleased() { |
+ _resetTimer(); |
+ |
+ if (_requestedResources.isEmpty) { |
+ _allocatedResources--; |
+ return; |
+ } |
+ |
+ var pending = _requestedResources.removeFirst(); |
+ pending.complete(new PoolResource._(this)); |
+ } |
+ |
+ /// If there are any pending requests, this will fire the oldest one after |
+ /// running [onRelease]. |
+ void _onResourceReleaseAllowed(onRelease()) { |
+ _resetTimer(); |
+ |
+ if (_requestedResources.isEmpty) { |
+ _onReleaseCallbacks.add( |
+ Zone.current.bindCallback(onRelease, runGuarded: false)); |
+ return; |
+ } |
+ |
+ var pending = _requestedResources.removeFirst(); |
+ pending.complete(_runOnRelease(onRelease)); |
+ } |
+ |
+ /// Runs [onRelease] and returns a Future that completes to a resource once an |
+ /// [onRelease] callback completes. |
+ /// |
+ /// Futures returned by [_runOnRelease] always complete in the order they were |
+ /// created, even if earlier [onRelease] callbacks take longer to run. |
+ Future<PoolResource> _runOnRelease(onRelease()) { |
+ new Future.sync(onRelease).then((value) { |
+ _onReleaseCompleters.removeFirst().complete(new PoolResource._(this)); |
+ }).catchError((error, stackTrace) { |
+ _onReleaseCompleters.removeFirst().completeError(error, stackTrace); |
+ }); |
+ |
+ var completer = new Completer.sync(); |
+ _onReleaseCompleters.add(completer); |
+ return completer.future; |
+ } |
+ |
+ /// A resource has been requested, allocated, or released. |
+ void _resetTimer() { |
+ if (_timer != null) _timer.cancel(); |
+ if (_timeout == null || _requestedResources.isEmpty) { |
+ _timer = null; |
+ } else { |
+ _timer = new Timer(_timeout, _onTimeout); |
+ } |
+ } |
+ |
+ /// Handles [_timer] timing out by causing all pending resource completers to |
+ /// emit exceptions. |
+ void _onTimeout() { |
+ for (var completer in _requestedResources) { |
+ completer.completeError( |
+ new TimeoutException("Pool deadlock: all resources have been " |
+ "allocated for too long.", |
+ _timeout), |
+ new Chain.current()); |
+ } |
+ _requestedResources.clear(); |
+ _timer = null; |
+ } |
+} |
+ |
+/// A member of a [Pool]. |
+/// |
+/// A [PoolResource] is a token that indicates that a resource is allocated. |
+/// When the associated resource is released, the user should call [release]. |
+class PoolResource { |
+ final Pool _pool; |
+ |
+ /// Whether [this] has been released yet. |
+ bool _released = false; |
+ |
+ PoolResource._(this._pool); |
+ |
+ /// Tells the parent [Pool] that the resource associated with this resource is |
+ /// no longer allocated, and that a new [PoolResource] may be allocated. |
+ void release() { |
+ if (_released) { |
+ throw new StateError("A PoolResource may only be released once."); |
+ } |
+ _released = true; |
+ _pool._onResourceReleased(); |
+ } |
+ |
+ /// Tells the parent [Pool] that the resource associated with this resource is |
+ /// no longer necessary, but should remain allocated until more resources are |
+ /// needed. |
+ /// |
+ /// When [Pool.request] is called and there are no remaining available |
+ /// resources, the [onRelease] callback is called. It should free the |
+ /// resource, and it may return a Future or `null`. Once that completes, the |
+ /// [Pool.request] call will complete to a new [PoolResource]. |
+ /// |
+ /// This is useful when a resource's main function is complete, but it may |
+ /// produce additional information later on. For example, an isolate's task |
+ /// may be complete, but it could still emit asynchronous errors. |
+ void allowRelease(onRelease()) { |
+ if (_released) { |
+ throw new StateError("A PoolResource may only be released once."); |
+ } |
+ _released = true; |
+ _pool._onResourceReleaseAllowed(onRelease); |
+ } |
+} |
+ |