OLD | NEW |
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library pool; | |
6 | |
7 import 'dart:async'; | 5 import 'dart:async'; |
8 import 'dart:collection'; | 6 import 'dart:collection'; |
9 | 7 |
10 import 'package:async/async.dart'; | 8 import 'package:async/async.dart'; |
11 import 'package:stack_trace/stack_trace.dart'; | 9 import 'package:stack_trace/stack_trace.dart'; |
12 | 10 |
13 /// Manages an abstract pool of resources with a limit on how many may be in use | 11 /// Manages an abstract pool of resources with a limit on how many may be in use |
14 /// at once. | 12 /// at once. |
15 /// | 13 /// |
16 /// When a resource is needed, the user should call [request]. When the returned | 14 /// When a resource is needed, the user should call [request]. When the returned |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
58 /// The amount of time to wait before timing out the pending resources. | 56 /// The amount of time to wait before timing out the pending resources. |
59 final Duration _timeout; | 57 final Duration _timeout; |
60 | 58 |
61 /// A [FutureGroup] that tracks all the `onRelease` callbacks for resources | 59 /// A [FutureGroup] that tracks all the `onRelease` callbacks for resources |
62 /// that have been marked releasable. | 60 /// that have been marked releasable. |
63 /// | 61 /// |
64 /// This is `null` until [close] is called. | 62 /// This is `null` until [close] is called. |
65 FutureGroup _closeGroup; | 63 FutureGroup _closeGroup; |
66 | 64 |
67 /// Whether [close] has been called. | 65 /// Whether [close] has been called. |
68 bool get isClosed => _closeGroup != null; | 66 bool get isClosed => _closeMemo.hasRun; |
| 67 |
| 68 /// A future that completes once the pool is closed and all its outstanding |
| 69 /// resources have been released. |
| 70 /// |
| 71 /// If any [PoolResource.allowRelease] callback throws an exception after the |
| 72 /// pool is closed, this completes with that exception. |
| 73 Future get done => _closeMemo.future; |
69 | 74 |
70 /// Creates a new pool with the given limit on how many resources may be | 75 /// Creates a new pool with the given limit on how many resources may be |
71 /// allocated at once. | 76 /// allocated at once. |
72 /// | 77 /// |
73 /// If [timeout] is passed, then if that much time passes without any activity | 78 /// If [timeout] is passed, then if that much time passes without any activity |
74 /// all pending [request] futures will throw a [TimeoutException]. This is | 79 /// all pending [request] futures will throw a [TimeoutException]. This is |
75 /// intended to avoid deadlocks. | 80 /// intended to avoid deadlocks. |
76 Pool(this._maxAllocatedResources, {Duration timeout}) | 81 Pool(this._maxAllocatedResources, {Duration timeout}) |
77 : _timeout = timeout { | 82 : _timeout = timeout { |
78 if (timeout != null) { | 83 if (timeout != null) { |
(...skipping 22 matching lines...) Expand all Loading... |
101 _requestedResources.add(completer); | 106 _requestedResources.add(completer); |
102 _resetTimer(); | 107 _resetTimer(); |
103 return completer.future; | 108 return completer.future; |
104 } | 109 } |
105 } | 110 } |
106 | 111 |
107 /// Requests a resource for the duration of [callback], which may return a | 112 /// Requests a resource for the duration of [callback], which may return a |
108 /// Future. | 113 /// Future. |
109 /// | 114 /// |
110 /// The return value of [callback] is piped to the returned Future. | 115 /// The return value of [callback] is piped to the returned Future. |
111 Future withResource(callback()) { | 116 Future<T> withResource<T>(FutureOr<T> callback()) { |
112 if (isClosed) { | 117 if (isClosed) { |
113 throw new StateError( | 118 throw new StateError( |
114 "withResource() may not be called on a closed Pool."); | 119 "withResource() may not be called on a closed Pool."); |
115 } | 120 } |
116 | 121 |
117 // TODO(nweiz): Use async/await when sdk#23497 is fixed. | 122 // We can't use async/await here because we need to start the request |
118 return request().then((resource) { | 123 // synchronously in case the pool is closed immediately afterwards. Async |
119 return new Future.sync(callback).whenComplete(resource.release); | 124 // functions have an asynchronous gap between calling and running the body, |
| 125 // and [close] could be called during that gap. See #3. |
| 126 return request().then<Future<T>>((resource) { |
| 127 return new Future<T>.sync(callback).whenComplete(resource.release); |
120 }); | 128 }); |
121 } | 129 } |
122 | 130 |
123 /// Closes the pool so that no more resources are requested. | 131 /// Closes the pool so that no more resources are requested. |
124 /// | 132 /// |
125 /// Existing resource requests remain unchanged. | 133 /// Existing resource requests remain unchanged. |
126 /// | 134 /// |
127 /// Any resources that are marked as releasable using | 135 /// Any resources that are marked as releasable using |
128 /// [PoolResource.allowRelease] are released immediately. Once all resources | 136 /// [PoolResource.allowRelease] are released immediately. Once all resources |
129 /// have been released and any `onRelease` callbacks have completed, the | 137 /// have been released and any `onRelease` callbacks have completed, the |
130 /// returned future completes successfully. If any `onRelease` callback throws | 138 /// returned future completes successfully. If any `onRelease` callback throws |
131 /// an error, the returned future completes with that error. | 139 /// an error, the returned future completes with that error. |
132 /// | 140 /// |
133 /// This may be called more than once; it returns the same [Future] each time. | 141 /// This may be called more than once; it returns the same [Future] each time. |
134 Future close() { | 142 Future close() => _closeMemo.runOnce(() { |
135 if (_closeGroup != null) return _closeGroup.future; | 143 if (_closeGroup != null) return _closeGroup.future; |
136 | 144 |
137 _resetTimer(); | 145 _resetTimer(); |
138 | 146 |
139 _closeGroup = new FutureGroup(); | 147 _closeGroup = new FutureGroup(); |
140 for (var callback in _onReleaseCallbacks) { | 148 for (var callback in _onReleaseCallbacks) { |
141 _closeGroup.add(new Future.sync(callback)); | 149 _closeGroup.add(new Future.sync(callback)); |
142 } | 150 } |
143 | 151 |
144 _allocatedResources -= _onReleaseCallbacks.length; | 152 _allocatedResources -= _onReleaseCallbacks.length; |
145 _onReleaseCallbacks.clear(); | 153 _onReleaseCallbacks.clear(); |
146 | 154 |
147 if (_allocatedResources == 0) _closeGroup.close(); | 155 if (_allocatedResources == 0) _closeGroup.close(); |
148 return _closeGroup.future; | 156 return _closeGroup.future; |
149 } | 157 }); |
| 158 final _closeMemo = new AsyncMemoizer(); |
150 | 159 |
151 /// If there are any pending requests, this will fire the oldest one. | 160 /// If there are any pending requests, this will fire the oldest one. |
152 void _onResourceReleased() { | 161 void _onResourceReleased() { |
153 _resetTimer(); | 162 _resetTimer(); |
154 | 163 |
155 if (_requestedResources.isNotEmpty) { | 164 if (_requestedResources.isNotEmpty) { |
156 var pending = _requestedResources.removeFirst(); | 165 var pending = _requestedResources.removeFirst(); |
157 pending.complete(new PoolResource._(this)); | 166 pending.complete(new PoolResource._(this)); |
158 } else { | 167 } else { |
159 _allocatedResources--; | 168 _allocatedResources--; |
(...skipping 24 matching lines...) Expand all Loading... |
184 /// | 193 /// |
185 /// Futures returned by [_runOnRelease] always complete in the order they were | 194 /// Futures returned by [_runOnRelease] always complete in the order they were |
186 /// created, even if earlier [onRelease] callbacks take longer to run. | 195 /// created, even if earlier [onRelease] callbacks take longer to run. |
187 Future<PoolResource> _runOnRelease(onRelease()) { | 196 Future<PoolResource> _runOnRelease(onRelease()) { |
188 new Future.sync(onRelease).then((value) { | 197 new Future.sync(onRelease).then((value) { |
189 _onReleaseCompleters.removeFirst().complete(new PoolResource._(this)); | 198 _onReleaseCompleters.removeFirst().complete(new PoolResource._(this)); |
190 }).catchError((error, stackTrace) { | 199 }).catchError((error, stackTrace) { |
191 _onReleaseCompleters.removeFirst().completeError(error, stackTrace); | 200 _onReleaseCompleters.removeFirst().completeError(error, stackTrace); |
192 }); | 201 }); |
193 | 202 |
194 var completer = new Completer.sync(); | 203 var completer = new Completer<PoolResource>.sync(); |
195 _onReleaseCompleters.add(completer); | 204 _onReleaseCompleters.add(completer); |
196 return completer.future; | 205 return completer.future; |
197 } | 206 } |
198 | 207 |
199 /// A resource has been requested, allocated, or released. | 208 /// A resource has been requested, allocated, or released. |
200 void _resetTimer() { | 209 void _resetTimer() { |
201 if (_timer == null) return; | 210 if (_timer == null) return; |
202 | 211 |
203 if (_requestedResources.isEmpty) { | 212 if (_requestedResources.isEmpty) { |
204 _timer.cancel(); | 213 _timer.cancel(); |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
257 /// produce additional information later on. For example, an isolate's task | 266 /// produce additional information later on. For example, an isolate's task |
258 /// may be complete, but it could still emit asynchronous errors. | 267 /// may be complete, but it could still emit asynchronous errors. |
259 void allowRelease(onRelease()) { | 268 void allowRelease(onRelease()) { |
260 if (_released) { | 269 if (_released) { |
261 throw new StateError("A PoolResource may only be released once."); | 270 throw new StateError("A PoolResource may only be released once."); |
262 } | 271 } |
263 _released = true; | 272 _released = true; |
264 _pool._onResourceReleaseAllowed(onRelease); | 273 _pool._onResourceReleaseAllowed(onRelease); |
265 } | 274 } |
266 } | 275 } |
OLD | NEW |