OLD | NEW |
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library pool; | 5 library pool; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:collection'; | 8 import 'dart:collection'; |
9 | 9 |
| 10 import 'package:async/async.dart'; |
10 import 'package:stack_trace/stack_trace.dart'; | 11 import 'package:stack_trace/stack_trace.dart'; |
11 | 12 |
12 /// Manages an abstract pool of resources with a limit on how many may be in use | 13 /// Manages an abstract pool of resources with a limit on how many may be in use |
13 /// at once. | 14 /// at once. |
14 /// | 15 /// |
15 /// When a resource is needed, the user should call [request]. When the returned | 16 /// When a resource is needed, the user should call [request]. When the returned |
16 /// future completes with a [PoolResource], the resource may be allocated. Once | 17 /// future completes with a [PoolResource], the resource may be allocated. Once |
17 /// the resource has been released, the user should call [PoolResource.release]. | 18 /// the resource has been released, the user should call [PoolResource.release]. |
18 /// The pool will ensure that only a certain number of [PoolResource]s may be | 19 /// The pool will ensure that only a certain number of [PoolResource]s may be |
19 /// allocated at once. | 20 /// allocated at once. |
(...skipping 28 matching lines...) Expand all Loading... |
48 /// If [_timeout] isn't null, this timer is set as soon as the resource limit | 49 /// If [_timeout] isn't null, this timer is set as soon as the resource limit |
49 /// is reached and is reset every time an resource is released or a new | 50 /// is reached and is reset every time an resource is released or a new |
50 /// resource is requested. If it fires, that indicates that the caller became | 51 /// resource is requested. If it fires, that indicates that the caller became |
51 /// deadlocked, likely due to files waiting for additional files to be read | 52 /// deadlocked, likely due to files waiting for additional files to be read |
52 /// before they could be closed. | 53 /// before they could be closed. |
53 Timer _timer; | 54 Timer _timer; |
54 | 55 |
55 /// The amount of time to wait before timing out the pending resources. | 56 /// The amount of time to wait before timing out the pending resources. |
56 final Duration _timeout; | 57 final Duration _timeout; |
57 | 58 |
| 59 /// A [FutureGroup] that tracks all the `onRelease` callbacks for resources |
| 60 /// that have been marked releasable. |
| 61 /// |
| 62 /// This is `null` until [close] is called. |
| 63 FutureGroup _closeGroup; |
| 64 |
| 65 /// Whether [close] has been called. |
| 66 bool get isClosed => _closeGroup != null; |
| 67 |
58 /// Creates a new pool with the given limit on how many resources may be | 68 /// Creates a new pool with the given limit on how many resources may be |
59 /// allocated at once. | 69 /// allocated at once. |
60 /// | 70 /// |
61 /// If [timeout] is passed, then if that much time passes without any activity | 71 /// If [timeout] is passed, then if that much time passes without any activity |
62 /// all pending [request] futures will throw a [TimeoutException]. This is | 72 /// all pending [request] futures will throw a [TimeoutException]. This is |
63 /// intended to avoid deadlocks. | 73 /// intended to avoid deadlocks. |
64 Pool(this._maxAllocatedResources, {Duration timeout}) | 74 Pool(this._maxAllocatedResources, {Duration timeout}) |
65 : _timeout = timeout; | 75 : _timeout = timeout; |
66 | 76 |
67 /// Request a [PoolResource]. | 77 /// Request a [PoolResource]. |
68 /// | 78 /// |
69 /// If the maximum number of resources is already allocated, this will delay | 79 /// If the maximum number of resources is already allocated, this will delay |
70 /// until one of them is released. | 80 /// until one of them is released. |
71 Future<PoolResource> request() { | 81 Future<PoolResource> request() { |
| 82 if (isClosed) { |
| 83 throw new StateError("request() may not be called on a closed Pool."); |
| 84 } |
| 85 |
72 if (_allocatedResources < _maxAllocatedResources) { | 86 if (_allocatedResources < _maxAllocatedResources) { |
73 _allocatedResources++; | 87 _allocatedResources++; |
74 return new Future.value(new PoolResource._(this)); | 88 return new Future.value(new PoolResource._(this)); |
75 } else if (_onReleaseCallbacks.isNotEmpty) { | 89 } else if (_onReleaseCallbacks.isNotEmpty) { |
76 return _runOnRelease(_onReleaseCallbacks.removeFirst()); | 90 return _runOnRelease(_onReleaseCallbacks.removeFirst()); |
77 } else { | 91 } else { |
78 var completer = new Completer<PoolResource>(); | 92 var completer = new Completer<PoolResource>(); |
79 _requestedResources.add(completer); | 93 _requestedResources.add(completer); |
80 _resetTimer(); | 94 _resetTimer(); |
81 return completer.future; | 95 return completer.future; |
82 } | 96 } |
83 } | 97 } |
84 | 98 |
85 /// Requests a resource for the duration of [callback], which may return a | 99 /// Requests a resource for the duration of [callback], which may return a |
86 /// Future. | 100 /// Future. |
87 /// | 101 /// |
88 /// The return value of [callback] is piped to the returned Future. | 102 /// The return value of [callback] is piped to the returned Future. |
89 Future withResource(callback()) { | 103 Future withResource(callback()) { |
90 return request().then((resource) => | 104 if (isClosed) { |
91 Chain.track(new Future.sync(callback)).whenComplete(resource.release)); | 105 throw new StateError( |
| 106 "withResource() may not be called on a closed Pool."); |
| 107 } |
| 108 |
| 109 // TODO(nweiz): Use async/await when sdk#23497 is fixed. |
| 110 return request().then((resource) { |
| 111 return new Future.sync(callback).whenComplete(resource.release); |
| 112 }); |
| 113 } |
| 114 |
| 115 /// Closes the pool so that no more resources are requested. |
| 116 /// |
| 117 /// Existing resource requests remain unchanged. |
| 118 /// |
| 119 /// Any resources that are marked as releasable using |
| 120 /// [PoolResource.allowRelease] are released immediately. Once all resources |
| 121 /// have been released and any `onRelease` callbacks have completed, the |
| 122 /// returned future completes successfully. If any `onRelease` callback throws |
| 123 /// an error, the returned future completes with that error. |
| 124 /// |
| 125 /// This may be called more than once; it returns the same [Future] each time. |
| 126 Future close() { |
| 127 if (_closeGroup != null) return _closeGroup.future; |
| 128 |
| 129 _resetTimer(); |
| 130 |
| 131 _closeGroup = new FutureGroup(); |
| 132 for (var callback in _onReleaseCallbacks) { |
| 133 _closeGroup.add(new Future.sync(callback)); |
| 134 } |
| 135 |
| 136 _allocatedResources -= _onReleaseCallbacks.length; |
| 137 _onReleaseCallbacks.clear(); |
| 138 |
| 139 if (_allocatedResources == 0) _closeGroup.close(); |
| 140 return _closeGroup.future; |
92 } | 141 } |
93 | 142 |
94 /// If there are any pending requests, this will fire the oldest one. | 143 /// If there are any pending requests, this will fire the oldest one. |
95 void _onResourceReleased() { | 144 void _onResourceReleased() { |
96 _resetTimer(); | 145 _resetTimer(); |
97 | 146 |
98 if (_requestedResources.isEmpty) { | 147 if (_requestedResources.isNotEmpty) { |
| 148 var pending = _requestedResources.removeFirst(); |
| 149 pending.complete(new PoolResource._(this)); |
| 150 } else { |
99 _allocatedResources--; | 151 _allocatedResources--; |
100 return; | 152 if (isClosed && _allocatedResources == 0) _closeGroup.close(); |
101 } | 153 } |
102 | |
103 var pending = _requestedResources.removeFirst(); | |
104 pending.complete(new PoolResource._(this)); | |
105 } | 154 } |
106 | 155 |
107 /// If there are any pending requests, this will fire the oldest one after | 156 /// If there are any pending requests, this will fire the oldest one after |
108 /// running [onRelease]. | 157 /// running [onRelease]. |
109 void _onResourceReleaseAllowed(onRelease()) { | 158 void _onResourceReleaseAllowed(onRelease()) { |
110 _resetTimer(); | 159 _resetTimer(); |
111 | 160 |
112 if (_requestedResources.isEmpty) { | 161 if (_requestedResources.isNotEmpty) { |
| 162 var pending = _requestedResources.removeFirst(); |
| 163 pending.complete(_runOnRelease(onRelease)); |
| 164 } else if (isClosed) { |
| 165 _closeGroup.add(new Future.sync(onRelease)); |
| 166 _allocatedResources--; |
| 167 if (_allocatedResources == 0) _closeGroup.close(); |
| 168 } else { |
113 _onReleaseCallbacks.add( | 169 _onReleaseCallbacks.add( |
114 Zone.current.bindCallback(onRelease, runGuarded: false)); | 170 Zone.current.bindCallback(onRelease, runGuarded: false)); |
115 return; | |
116 } | 171 } |
117 | |
118 var pending = _requestedResources.removeFirst(); | |
119 pending.complete(_runOnRelease(onRelease)); | |
120 } | 172 } |
121 | 173 |
122 /// Runs [onRelease] and returns a Future that completes to a resource once an | 174 /// Runs [onRelease] and returns a Future that completes to a resource once an |
123 /// [onRelease] callback completes. | 175 /// [onRelease] callback completes. |
124 /// | 176 /// |
125 /// Futures returned by [_runOnRelease] always complete in the order they were | 177 /// Futures returned by [_runOnRelease] always complete in the order they were |
126 /// created, even if earlier [onRelease] callbacks take longer to run. | 178 /// created, even if earlier [onRelease] callbacks take longer to run. |
127 Future<PoolResource> _runOnRelease(onRelease()) { | 179 Future<PoolResource> _runOnRelease(onRelease()) { |
128 new Future.sync(onRelease).then((value) { | 180 new Future.sync(onRelease).then((value) { |
129 _onReleaseCompleters.removeFirst().complete(new PoolResource._(this)); | 181 _onReleaseCompleters.removeFirst().complete(new PoolResource._(this)); |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
196 /// produce additional information later on. For example, an isolate's task | 248 /// produce additional information later on. For example, an isolate's task |
197 /// may be complete, but it could still emit asynchronous errors. | 249 /// may be complete, but it could still emit asynchronous errors. |
198 void allowRelease(onRelease()) { | 250 void allowRelease(onRelease()) { |
199 if (_released) { | 251 if (_released) { |
200 throw new StateError("A PoolResource may only be released once."); | 252 throw new StateError("A PoolResource may only be released once."); |
201 } | 253 } |
202 _released = true; | 254 _released = true; |
203 _pool._onResourceReleaseAllowed(onRelease); | 255 _pool._onResourceReleaseAllowed(onRelease); |
204 } | 256 } |
205 } | 257 } |
206 | |
OLD | NEW |