OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 library pool; |
| 6 |
| 7 import 'dart:async'; |
| 8 import 'dart:collection'; |
| 9 |
| 10 import 'package:stack_trace/stack_trace.dart'; |
| 11 |
| 12 /// Manages an abstract pool of resources with a limit on how many may be in use |
| 13 /// at once. |
| 14 /// |
| 15 /// When a resource is needed, the user should call [request]. When the returned |
| 16 /// future completes with a [PoolResource], the resource may be allocated. Once |
| 17 /// the resource has been released, the user should call [PoolResource.release]. |
| 18 /// The pool will ensure that only a certain number of [PoolResource]s may be |
| 19 /// allocated at once. |
| 20 class Pool { |
| 21 /// Completers for requests beyond the first [_maxAllocatedResources]. |
| 22 /// |
| 23 /// When an item is released, the next element of [_requestedResources] will |
| 24 /// be completed. |
| 25 final _requestedResources = new Queue<Completer<PoolResource>>(); |
| 26 |
| 27 /// Callbacks that must be called before additional resources can be |
| 28 /// allocated. |
| 29 /// |
| 30 /// See [PoolResource.allowRelease]. |
| 31 final _onReleaseCallbacks = new Queue<Function>(); |
| 32 |
| 33 /// Completers that will be completed once `onRelease` callbacks are done |
| 34 /// running. |
| 35 /// |
| 36 /// These are kept in a queue to ensure that the earliest request completes |
| 37 /// first regardless of what order the `onRelease` callbacks complete in. |
| 38 final _onReleaseCompleters = new Queue<Completer<PoolResource>>(); |
| 39 |
| 40 /// The maximum number of resources that may be allocated at once. |
| 41 final int _maxAllocatedResources; |
| 42 |
| 43 /// The number of resources that are currently allocated. |
| 44 int _allocatedResources = 0; |
| 45 |
| 46 /// The timeout timer. |
| 47 /// |
| 48 /// If [_timeout] isn't null, this timer is set as soon as the resource limit |
| 49 /// is reached and is reset every time an resource is released or a new |
| 50 /// resource is requested. If it fires, that indicates that the caller became |
| 51 /// deadlocked, likely due to files waiting for additional files to be read |
| 52 /// before they could be closed. |
| 53 Timer _timer; |
| 54 |
| 55 /// The amount of time to wait before timing out the pending resources. |
| 56 final Duration _timeout; |
| 57 |
| 58 /// Creates a new pool with the given limit on how many resources may be |
| 59 /// allocated at once. |
| 60 /// |
| 61 /// If [timeout] is passed, then if that much time passes without any activity |
| 62 /// all pending [request] futures will throw a [TimeoutException]. This is |
| 63 /// intended to avoid deadlocks. |
| 64 Pool(this._maxAllocatedResources, {Duration timeout}) |
| 65 : _timeout = timeout; |
| 66 |
| 67 /// Request a [PoolResource]. |
| 68 /// |
| 69 /// If the maximum number of resources is already allocated, this will delay |
| 70 /// until one of them is released. |
| 71 Future<PoolResource> request() { |
| 72 if (_allocatedResources < _maxAllocatedResources) { |
| 73 _allocatedResources++; |
| 74 return new Future.value(new PoolResource._(this)); |
| 75 } else if (_onReleaseCallbacks.isNotEmpty) { |
| 76 return _runOnRelease(_onReleaseCallbacks.removeFirst()); |
| 77 } else { |
| 78 var completer = new Completer<PoolResource>(); |
| 79 _requestedResources.add(completer); |
| 80 _resetTimer(); |
| 81 return completer.future; |
| 82 } |
| 83 } |
| 84 |
| 85 /// Requests a resource for the duration of [callback], which may return a |
| 86 /// Future. |
| 87 /// |
| 88 /// The return value of [callback] is piped to the returned Future. |
| 89 Future withResource(callback()) { |
| 90 return request().then((resource) => |
| 91 Chain.track(new Future.sync(callback)).whenComplete(resource.release)); |
| 92 } |
| 93 |
| 94 /// If there are any pending requests, this will fire the oldest one. |
| 95 void _onResourceReleased() { |
| 96 _resetTimer(); |
| 97 |
| 98 if (_requestedResources.isEmpty) { |
| 99 _allocatedResources--; |
| 100 return; |
| 101 } |
| 102 |
| 103 var pending = _requestedResources.removeFirst(); |
| 104 pending.complete(new PoolResource._(this)); |
| 105 } |
| 106 |
| 107 /// If there are any pending requests, this will fire the oldest one after |
| 108 /// running [onRelease]. |
| 109 void _onResourceReleaseAllowed(onRelease()) { |
| 110 _resetTimer(); |
| 111 |
| 112 if (_requestedResources.isEmpty) { |
| 113 _onReleaseCallbacks.add( |
| 114 Zone.current.bindCallback(onRelease, runGuarded: false)); |
| 115 return; |
| 116 } |
| 117 |
| 118 var pending = _requestedResources.removeFirst(); |
| 119 pending.complete(_runOnRelease(onRelease)); |
| 120 } |
| 121 |
| 122 /// Runs [onRelease] and returns a Future that completes to a resource once an |
| 123 /// [onRelease] callback completes. |
| 124 /// |
| 125 /// Futures returned by [_runOnRelease] always complete in the order they were |
| 126 /// created, even if earlier [onRelease] callbacks take longer to run. |
| 127 Future<PoolResource> _runOnRelease(onRelease()) { |
| 128 new Future.sync(onRelease).then((value) { |
| 129 _onReleaseCompleters.removeFirst().complete(new PoolResource._(this)); |
| 130 }).catchError((error, stackTrace) { |
| 131 _onReleaseCompleters.removeFirst().completeError(error, stackTrace); |
| 132 }); |
| 133 |
| 134 var completer = new Completer.sync(); |
| 135 _onReleaseCompleters.add(completer); |
| 136 return completer.future; |
| 137 } |
| 138 |
| 139 /// A resource has been requested, allocated, or released. |
| 140 void _resetTimer() { |
| 141 if (_timer != null) _timer.cancel(); |
| 142 if (_timeout == null || _requestedResources.isEmpty) { |
| 143 _timer = null; |
| 144 } else { |
| 145 _timer = new Timer(_timeout, _onTimeout); |
| 146 } |
| 147 } |
| 148 |
| 149 /// Handles [_timer] timing out by causing all pending resource completers to |
| 150 /// emit exceptions. |
| 151 void _onTimeout() { |
| 152 for (var completer in _requestedResources) { |
| 153 completer.completeError( |
| 154 new TimeoutException("Pool deadlock: all resources have been " |
| 155 "allocated for too long.", |
| 156 _timeout), |
| 157 new Chain.current()); |
| 158 } |
| 159 _requestedResources.clear(); |
| 160 _timer = null; |
| 161 } |
| 162 } |
| 163 |
| 164 /// A member of a [Pool]. |
| 165 /// |
| 166 /// A [PoolResource] is a token that indicates that a resource is allocated. |
| 167 /// When the associated resource is released, the user should call [release]. |
| 168 class PoolResource { |
| 169 final Pool _pool; |
| 170 |
| 171 /// Whether [this] has been released yet. |
| 172 bool _released = false; |
| 173 |
| 174 PoolResource._(this._pool); |
| 175 |
| 176 /// Tells the parent [Pool] that the resource associated with this resource is |
| 177 /// no longer allocated, and that a new [PoolResource] may be allocated. |
| 178 void release() { |
| 179 if (_released) { |
| 180 throw new StateError("A PoolResource may only be released once."); |
| 181 } |
| 182 _released = true; |
| 183 _pool._onResourceReleased(); |
| 184 } |
| 185 |
| 186 /// Tells the parent [Pool] that the resource associated with this resource is |
| 187 /// no longer necessary, but should remain allocated until more resources are |
| 188 /// needed. |
| 189 /// |
| 190 /// When [Pool.request] is called and there are no remaining available |
| 191 /// resources, the [onRelease] callback is called. It should free the |
| 192 /// resource, and it may return a Future or `null`. Once that completes, the |
| 193 /// [Pool.request] call will complete to a new [PoolResource]. |
| 194 /// |
| 195 /// This is useful when a resource's main function is complete, but it may |
| 196 /// produce additional information later on. For example, an isolate's task |
| 197 /// may be complete, but it could still emit asynchronous errors. |
| 198 void allowRelease(onRelease()) { |
| 199 if (_released) { |
| 200 throw new StateError("A PoolResource may only be released once."); |
| 201 } |
| 202 _released = true; |
| 203 _pool._onResourceReleaseAllowed(onRelease); |
| 204 } |
| 205 } |
| 206 |
OLD | NEW |