OLD | NEW |
---|---|
1 import 'dart:async'; | 1 import 'dart:async'; |
2 import 'dart:collection'; | 2 import 'dart:collection'; |
3 | 3 |
4 import 'package:stack_trace/stack_trace.dart'; | 4 import 'package:stack_trace/stack_trace.dart'; |
5 | 5 |
6 /// Manages an abstract pool of resources with a limit on how many may be in use | 6 /// Manages an abstract pool of resources with a limit on how many may be in use |
7 /// at once. | 7 /// at once. |
8 /// | 8 /// |
9 /// When a resource is needed, the user should call [checkOut]. When the | 9 /// When a resource is needed, the user should call [acquire]. When the returned |
Bob Nystrom
2013/10/29 17:25:29
I find mixing "acquire" and "allocated" a tad conf
nweiz
2013/10/29 19:27:26
Done.
| |
10 /// returned future completes with a [PoolResource], the resource may be | 10 /// future completes with a [PoolResource], the resource may be allocated. Once |
11 /// allocated. Once the resource has been released, the user should call | 11 /// the resource has been released, the user should call [PoolResource.release]. |
12 /// [PoolResource.release]. The pool will ensure that only a certain number of | 12 /// The pool will ensure that only a certain number of [PoolResource]s may be |
13 /// [PoolResource]s may be checked out at once. | 13 /// allocated at once. |
14 class Pool { | 14 class Pool { |
15 /// Completers for checkouts beyond the first [_maxCheckedOutResources]. | 15 /// Completers for acquires beyond the first [_maxAllocatedResources]. |
16 /// | 16 /// |
17 /// When an item is released, the next element of [_pendingResources] will be | 17 /// When an item is released, the next element of [_pendingResources] will be |
18 /// completed. | 18 /// completed. |
19 final _pendingResources = new Queue<Completer<PoolResource>>(); | 19 final _pendingResources = new Queue<Completer<PoolResource>>(); |
Bob Nystrom
2013/10/29 17:25:29
Assuming the above comment, you could call this _r
nweiz
2013/10/29 19:27:26
Done.
| |
20 | 20 |
21 /// The maximum number of resources that may be checked out at once. | 21 /// The maximum number of resources that may be allocated at once. |
22 final int _maxCheckedOutResources; | 22 final int _maxAllocatedResources; |
23 | 23 |
24 /// The number of resources that are currently checked out. | 24 /// The number of resources that are currently allocated. |
25 int _checkedOutResources = 0; | 25 int _allocatedResources = 0; |
26 | 26 |
27 /// The timeout timer. | 27 /// The timeout timer. |
28 /// | 28 /// |
29 /// If [_timeout] isn't null, this timer is set as soon as the resource limit | 29 /// If [_timeout] isn't null, this timer is set as soon as the resource limit |
30 /// is reached and is reset every time an resource is released or a new | 30 /// is reached and is reset every time an resource is released or a new |
31 /// resource is requested. If it fires, that indicates that the caller became | 31 /// resource is requested. If it fires, that indicates that the caller became |
32 /// deadlocked, likely due to files waiting for additional files to be read | 32 /// deadlocked, likely due to files waiting for additional files to be read |
33 /// before they could be closed. | 33 /// before they could be closed. |
34 Timer _timer; | 34 Timer _timer; |
35 | 35 |
36 /// The amount of time to wait before timing out the pending resources. | 36 /// The amount of time to wait before timing out the pending resources. |
37 Duration _timeout; | 37 Duration _timeout; |
38 | 38 |
39 /// Creates a new pool with the given limit on how many resources may be | 39 /// Creates a new pool with the given limit on how many resources may be |
40 /// checked out at once. | 40 /// allocated at once. |
41 /// | 41 /// |
42 /// If [timeout] is passed, then if that much time passes without any activity | 42 /// If [timeout] is passed, then if that much time passes without any activity |
43 /// all pending [checkOut] futures will throw an exception. This is indented | 43 /// all pending [acquire] futures will throw an exception. This is indented |
44 /// to avoid deadlocks. | 44 /// to avoid deadlocks. |
45 Pool(this._maxCheckedOutResources, {Duration timeout}) | 45 Pool(this._maxAllocatedResources, {Duration timeout}) |
46 : _timeout = timeout; | 46 : _timeout = timeout; |
47 | 47 |
48 /// Check out a [PoolResource]. | 48 /// Acquire a [PoolResource]. |
49 /// | 49 /// |
50 /// If the maximum number of resources is already checked out, this will delay | 50 /// If the maximum number of resources is already allocated, this will delay |
51 /// until one of them is released. | 51 /// until one of them is released. |
52 Future<PoolResource> checkOut() { | 52 Future<PoolResource> acquire() { |
53 if (_checkedOutResources < _maxCheckedOutResources) { | 53 if (_allocatedResources < _maxAllocatedResources) { |
54 _checkedOutResources++; | 54 _allocatedResources++; |
55 return new Future.value(new PoolResource._(this)); | 55 return new Future.value(new PoolResource._(this)); |
56 } else { | 56 } else { |
57 var completer = new Completer<PoolResource>(); | 57 var completer = new Completer<PoolResource>(); |
58 _pendingResources.add(completer); | 58 _pendingResources.add(completer); |
59 _heartbeat(); | 59 _resetTimer(); |
60 return completer.future; | 60 return completer.future; |
61 } | 61 } |
62 } | 62 } |
63 | 63 |
64 /// Checks out a resource for the duration of [callback], which may return a | 64 /// Acquires a resource for the duration of [callback], which may return a |
65 /// Future. | 65 /// Future. |
66 /// | 66 /// |
67 /// The return value of [callback] is piped to the returned Future. | 67 /// The return value of [callback] is piped to the returned Future. |
68 Future withResource(callback()) { | 68 Future withResource(callback()) { |
69 return checkOut().then((resource) => | 69 return acquire().then((resource) => |
70 new Future.sync(callback).whenComplete(resource.release)); | 70 new Future.sync(callback).whenComplete(resource.release)); |
71 } | 71 } |
72 | 72 |
73 /// If there are any pending checkouts, this will fire the oldest one. | 73 /// If there are any pending acquires, this will fire the oldest one. |
74 void _onResourceReleased() { | 74 void _onResourceReleased() { |
75 if (_pendingResources.isEmpty) { | 75 if (_pendingResources.isEmpty) { |
76 _checkedOutResources--; | 76 _allocatedResources--; |
77 if (_timer != null) { | 77 if (_timer != null) { |
78 _timer.cancel(); | 78 _timer.cancel(); |
79 _timer = null; | 79 _timer = null; |
80 } | 80 } |
81 return; | 81 return; |
82 } | 82 } |
83 | 83 |
84 _heartbeat(); | 84 _resetTimer(); |
85 var pending = _pendingResources.removeFirst(); | 85 var pending = _pendingResources.removeFirst(); |
86 pending.complete(new PoolResource._(this)); | 86 pending.complete(new PoolResource._(this)); |
87 } | 87 } |
88 | 88 |
89 /// Indicates that some external action has occurred and the timer should be | 89 /// A resource has been checked out or released. |
Bob Nystrom
2013/10/29 17:25:29
"checked out" -> "allocated".
nweiz
2013/10/29 19:27:26
Done.
| |
90 /// restarted. | 90 void _resetTimer() { |
91 void _heartbeat() { | |
92 if (_timer != null) _timer.cancel(); | 91 if (_timer != null) _timer.cancel(); |
93 if (_timeout == null) { | 92 if (_timeout == null) { |
94 _timer = null; | 93 _timer = null; |
95 } else { | 94 } else { |
96 _timer = new Timer(_timeout, _onTimeout); | 95 _timer = new Timer(_timeout, _onTimeout); |
97 } | 96 } |
98 } | 97 } |
99 | 98 |
100 /// Handles [_timer] timing out by causing all pending resource completers to | 99 /// Handles [_timer] timing out by causing all pending resource completers to |
101 /// emit exceptions. | 100 /// emit exceptions. |
102 void _onTimeout() { | 101 void _onTimeout() { |
103 for (var completer in _pendingResources) { | 102 for (var completer in _pendingResources) { |
104 completer.completeException("Pool deadlock: all resources have been " | 103 completer.completeException("Pool deadlock: all resources have been " |
105 "checked out for too long.", new Trace.current().vmTrace); | 104 "allocated for too long.", new Trace.current().vmTrace); |
106 } | 105 } |
107 _pendingResources.clear(); | 106 _pendingResources.clear(); |
108 _timer = null; | 107 _timer = null; |
109 } | 108 } |
110 } | 109 } |
111 | 110 |
112 /// A member of a [Pool]. | 111 /// A member of a [Pool]. |
113 /// | 112 /// |
114 /// A [PoolResource] is a token that indicates that a resource is allocated. | 113 /// A [PoolResource] is a token that indicates that a resource is allocated. |
115 /// When the associated resource is released, the user should call [release]. | 114 /// When the associated resource is released, the user should call [release]. |
116 class PoolResource { | 115 class PoolResource { |
117 final Pool _pool; | 116 final Pool _pool; |
118 | 117 |
119 /// Whether [this] has been released yet. | 118 /// Whether [this] has been released yet. |
120 bool _released = false; | 119 bool _released = false; |
121 | 120 |
122 PoolResource._(this._pool); | 121 PoolResource._(this._pool); |
123 | 122 |
124 /// Tells the parent [Pool] that the resource associated with this resource is | 123 /// Tells the parent [Pool] that the resource associated with this resource is |
125 /// no longer allocated, and that a new [PoolResource] may be checked out. | 124 /// no longer allocated, and that a new [PoolResource] may be acquired. |
Bob Nystrom
2013/10/29 17:25:29
"acquired" -> "allocated".
nweiz
2013/10/29 19:27:26
Done.
| |
126 void release() { | 125 void release() { |
127 if (_released) { | 126 if (_released) { |
128 throw new StateError("A PoolResource may only be released once."); | 127 throw new StateError("A PoolResource may only be released once."); |
129 } | 128 } |
130 _released = true; | 129 _released = true; |
131 _pool._onResourceReleased(); | 130 _pool._onResourceReleased(); |
132 } | 131 } |
133 } | 132 } |
OLD | NEW |