OLD | NEW |
| (Empty) |
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library pool; | |
6 | |
7 import 'dart:async'; | |
8 import 'dart:collection'; | |
9 | |
10 import 'package:stack_trace/stack_trace.dart'; | |
11 | |
12 /// Manages an abstract pool of resources with a limit on how many may be in use | |
13 /// at once. | |
14 /// | |
15 /// When a resource is needed, the user should call [request]. When the returned | |
16 /// future completes with a [PoolResource], the resource may be allocated. Once | |
17 /// the resource has been released, the user should call [PoolResource.release]. | |
18 /// The pool will ensure that only a certain number of [PoolResource]s may be | |
19 /// allocated at once. | |
20 class Pool { | |
21 /// Completers for requests beyond the first [_maxAllocatedResources]. | |
22 /// | |
23 /// When an item is released, the next element of [_requestedResources] will | |
24 /// be completed. | |
25 final _requestedResources = new Queue<Completer<PoolResource>>(); | |
26 | |
27 /// Callbacks that must be called before additional resources can be | |
28 /// allocated. | |
29 /// | |
30 /// See [PoolResource.allowRelease]. | |
31 final _onReleaseCallbacks = new Queue<Function>(); | |
32 | |
33 /// Completers that will be completed once `onRelease` callbacks are done | |
34 /// running. | |
35 /// | |
36 /// These are kept in a queue to ensure that the earliest request completes | |
37 /// first regardless of what order the `onRelease` callbacks complete in. | |
38 final _onReleaseCompleters = new Queue<Completer<PoolResource>>(); | |
39 | |
40 /// The maximum number of resources that may be allocated at once. | |
41 final int _maxAllocatedResources; | |
42 | |
43 /// The number of resources that are currently allocated. | |
44 int _allocatedResources = 0; | |
45 | |
46 /// The timeout timer. | |
47 /// | |
48 /// If [_timeout] isn't null, this timer is set as soon as the resource limit | |
49 /// is reached and is reset every time an resource is released or a new | |
50 /// resource is requested. If it fires, that indicates that the caller became | |
51 /// deadlocked, likely due to files waiting for additional files to be read | |
52 /// before they could be closed. | |
53 Timer _timer; | |
54 | |
55 /// The amount of time to wait before timing out the pending resources. | |
56 final Duration _timeout; | |
57 | |
58 /// Creates a new pool with the given limit on how many resources may be | |
59 /// allocated at once. | |
60 /// | |
61 /// If [timeout] is passed, then if that much time passes without any activity | |
62 /// all pending [request] futures will throw a [TimeoutException]. This is | |
63 /// intended to avoid deadlocks. | |
64 Pool(this._maxAllocatedResources, {Duration timeout}) | |
65 : _timeout = timeout; | |
66 | |
67 /// Request a [PoolResource]. | |
68 /// | |
69 /// If the maximum number of resources is already allocated, this will delay | |
70 /// until one of them is released. | |
71 Future<PoolResource> request() { | |
72 if (_allocatedResources < _maxAllocatedResources) { | |
73 _allocatedResources++; | |
74 return new Future.value(new PoolResource._(this)); | |
75 } else if (_onReleaseCallbacks.isNotEmpty) { | |
76 return _runOnRelease(_onReleaseCallbacks.removeFirst()); | |
77 } else { | |
78 var completer = new Completer<PoolResource>(); | |
79 _requestedResources.add(completer); | |
80 _resetTimer(); | |
81 return completer.future; | |
82 } | |
83 } | |
84 | |
85 /// Requests a resource for the duration of [callback], which may return a | |
86 /// Future. | |
87 /// | |
88 /// The return value of [callback] is piped to the returned Future. | |
89 Future withResource(callback()) { | |
90 return request().then((resource) => | |
91 Chain.track(new Future.sync(callback)).whenComplete(resource.release)); | |
92 } | |
93 | |
94 /// If there are any pending requests, this will fire the oldest one. | |
95 void _onResourceReleased() { | |
96 _resetTimer(); | |
97 | |
98 if (_requestedResources.isEmpty) { | |
99 _allocatedResources--; | |
100 return; | |
101 } | |
102 | |
103 var pending = _requestedResources.removeFirst(); | |
104 pending.complete(new PoolResource._(this)); | |
105 } | |
106 | |
107 /// If there are any pending requests, this will fire the oldest one after | |
108 /// running [onRelease]. | |
109 void _onResourceReleaseAllowed(onRelease()) { | |
110 _resetTimer(); | |
111 | |
112 if (_requestedResources.isEmpty) { | |
113 _onReleaseCallbacks.add( | |
114 Zone.current.bindCallback(onRelease, runGuarded: false)); | |
115 return; | |
116 } | |
117 | |
118 var pending = _requestedResources.removeFirst(); | |
119 pending.complete(_runOnRelease(onRelease)); | |
120 } | |
121 | |
122 /// Runs [onRelease] and returns a Future that completes to a resource once an | |
123 /// [onRelease] callback completes. | |
124 /// | |
125 /// Futures returned by [_runOnRelease] always complete in the order they were | |
126 /// created, even if earlier [onRelease] callbacks take longer to run. | |
127 Future<PoolResource> _runOnRelease(onRelease()) { | |
128 new Future.sync(onRelease).then((value) { | |
129 _onReleaseCompleters.removeFirst().complete(new PoolResource._(this)); | |
130 }).catchError((error, stackTrace) { | |
131 _onReleaseCompleters.removeFirst().completeError(error, stackTrace); | |
132 }); | |
133 | |
134 var completer = new Completer.sync(); | |
135 _onReleaseCompleters.add(completer); | |
136 return completer.future; | |
137 } | |
138 | |
139 /// A resource has been requested, allocated, or released. | |
140 void _resetTimer() { | |
141 if (_timer != null) _timer.cancel(); | |
142 if (_timeout == null || _requestedResources.isEmpty) { | |
143 _timer = null; | |
144 } else { | |
145 _timer = new Timer(_timeout, _onTimeout); | |
146 } | |
147 } | |
148 | |
149 /// Handles [_timer] timing out by causing all pending resource completers to | |
150 /// emit exceptions. | |
151 void _onTimeout() { | |
152 for (var completer in _requestedResources) { | |
153 completer.completeError( | |
154 new TimeoutException("Pool deadlock: all resources have been " | |
155 "allocated for too long.", | |
156 _timeout), | |
157 new Chain.current()); | |
158 } | |
159 _requestedResources.clear(); | |
160 _timer = null; | |
161 } | |
162 } | |
163 | |
164 /// A member of a [Pool]. | |
165 /// | |
166 /// A [PoolResource] is a token that indicates that a resource is allocated. | |
167 /// When the associated resource is released, the user should call [release]. | |
168 class PoolResource { | |
169 final Pool _pool; | |
170 | |
171 /// Whether [this] has been released yet. | |
172 bool _released = false; | |
173 | |
174 PoolResource._(this._pool); | |
175 | |
176 /// Tells the parent [Pool] that the resource associated with this resource is | |
177 /// no longer allocated, and that a new [PoolResource] may be allocated. | |
178 void release() { | |
179 if (_released) { | |
180 throw new StateError("A PoolResource may only be released once."); | |
181 } | |
182 _released = true; | |
183 _pool._onResourceReleased(); | |
184 } | |
185 | |
186 /// Tells the parent [Pool] that the resource associated with this resource is | |
187 /// no longer necessary, but should remain allocated until more resources are | |
188 /// needed. | |
189 /// | |
190 /// When [Pool.request] is called and there are no remaining available | |
191 /// resources, the [onRelease] callback is called. It should free the | |
192 /// resource, and it may return a Future or `null`. Once that completes, the | |
193 /// [Pool.request] call will complete to a new [PoolResource]. | |
194 /// | |
195 /// This is useful when a resource's main function is complete, but it may | |
196 /// produce additional information later on. For example, an isolate's task | |
197 /// may be complete, but it could still emit asynchronous errors. | |
198 void allowRelease(onRelease()) { | |
199 if (_released) { | |
200 throw new StateError("A PoolResource may only be released once."); | |
201 } | |
202 _released = true; | |
203 _pool._onResourceReleaseAllowed(onRelease); | |
204 } | |
205 } | |
206 | |
OLD | NEW |