| OLD | NEW |
| 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library pool; | |
| 6 | |
| 7 import 'dart:async'; | 5 import 'dart:async'; |
| 8 import 'dart:collection'; | 6 import 'dart:collection'; |
| 9 | 7 |
| 10 import 'package:async/async.dart'; | 8 import 'package:async/async.dart'; |
| 11 import 'package:stack_trace/stack_trace.dart'; | 9 import 'package:stack_trace/stack_trace.dart'; |
| 12 | 10 |
| 13 /// Manages an abstract pool of resources with a limit on how many may be in use | 11 /// Manages an abstract pool of resources with a limit on how many may be in use |
| 14 /// at once. | 12 /// at once. |
| 15 /// | 13 /// |
| 16 /// When a resource is needed, the user should call [request]. When the returned | 14 /// When a resource is needed, the user should call [request]. When the returned |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 58 /// The amount of time to wait before timing out the pending resources. | 56 /// The amount of time to wait before timing out the pending resources. |
| 59 final Duration _timeout; | 57 final Duration _timeout; |
| 60 | 58 |
| 61 /// A [FutureGroup] that tracks all the `onRelease` callbacks for resources | 59 /// A [FutureGroup] that tracks all the `onRelease` callbacks for resources |
| 62 /// that have been marked releasable. | 60 /// that have been marked releasable. |
| 63 /// | 61 /// |
| 64 /// This is `null` until [close] is called. | 62 /// This is `null` until [close] is called. |
| 65 FutureGroup _closeGroup; | 63 FutureGroup _closeGroup; |
| 66 | 64 |
| 67 /// Whether [close] has been called. | 65 /// Whether [close] has been called. |
| 68 bool get isClosed => _closeGroup != null; | 66 bool get isClosed => _closeMemo.hasRun; |
| 67 |
| 68 /// A future that completes once the pool is closed and all its outstanding |
| 69 /// resources have been released. |
| 70 /// |
| 71 /// If any [PoolResource.allowRelease] callback throws an exception after the |
| 72 /// pool is closed, this completes with that exception. |
| 73 Future get done => _closeMemo.future; |
| 69 | 74 |
| 70 /// Creates a new pool with the given limit on how many resources may be | 75 /// Creates a new pool with the given limit on how many resources may be |
| 71 /// allocated at once. | 76 /// allocated at once. |
| 72 /// | 77 /// |
| 73 /// If [timeout] is passed, then if that much time passes without any activity | 78 /// If [timeout] is passed, then if that much time passes without any activity |
| 74 /// all pending [request] futures will throw a [TimeoutException]. This is | 79 /// all pending [request] futures will throw a [TimeoutException]. This is |
| 75 /// intended to avoid deadlocks. | 80 /// intended to avoid deadlocks. |
| 76 Pool(this._maxAllocatedResources, {Duration timeout}) | 81 Pool(this._maxAllocatedResources, {Duration timeout}) |
| 77 : _timeout = timeout { | 82 : _timeout = timeout { |
| 78 if (timeout != null) { | 83 if (timeout != null) { |
| (...skipping 22 matching lines...) Expand all Loading... |
| 101 _requestedResources.add(completer); | 106 _requestedResources.add(completer); |
| 102 _resetTimer(); | 107 _resetTimer(); |
| 103 return completer.future; | 108 return completer.future; |
| 104 } | 109 } |
| 105 } | 110 } |
| 106 | 111 |
| 107 /// Requests a resource for the duration of [callback], which may return a | 112 /// Requests a resource for the duration of [callback], which may return a |
| 108 /// Future. | 113 /// Future. |
| 109 /// | 114 /// |
| 110 /// The return value of [callback] is piped to the returned Future. | 115 /// The return value of [callback] is piped to the returned Future. |
| 111 Future withResource(callback()) { | 116 Future<T> withResource<T>(FutureOr<T> callback()) { |
| 112 if (isClosed) { | 117 if (isClosed) { |
| 113 throw new StateError( | 118 throw new StateError( |
| 114 "withResource() may not be called on a closed Pool."); | 119 "withResource() may not be called on a closed Pool."); |
| 115 } | 120 } |
| 116 | 121 |
| 117 // TODO(nweiz): Use async/await when sdk#23497 is fixed. | 122 // We can't use async/await here because we need to start the request |
| 118 return request().then((resource) { | 123 // synchronously in case the pool is closed immediately afterwards. Async |
| 119 return new Future.sync(callback).whenComplete(resource.release); | 124 // functions have an asynchronous gap between calling and running the body, |
| 125 // and [close] could be called during that gap. See #3. |
| 126 return request().then<Future<T>>((resource) { |
| 127 return new Future<T>.sync(callback).whenComplete(resource.release); |
| 120 }); | 128 }); |
| 121 } | 129 } |
| 122 | 130 |
| 123 /// Closes the pool so that no more resources are requested. | 131 /// Closes the pool so that no more resources are requested. |
| 124 /// | 132 /// |
| 125 /// Existing resource requests remain unchanged. | 133 /// Existing resource requests remain unchanged. |
| 126 /// | 134 /// |
| 127 /// Any resources that are marked as releasable using | 135 /// Any resources that are marked as releasable using |
| 128 /// [PoolResource.allowRelease] are released immediately. Once all resources | 136 /// [PoolResource.allowRelease] are released immediately. Once all resources |
| 129 /// have been released and any `onRelease` callbacks have completed, the | 137 /// have been released and any `onRelease` callbacks have completed, the |
| 130 /// returned future completes successfully. If any `onRelease` callback throws | 138 /// returned future completes successfully. If any `onRelease` callback throws |
| 131 /// an error, the returned future completes with that error. | 139 /// an error, the returned future completes with that error. |
| 132 /// | 140 /// |
| 133 /// This may be called more than once; it returns the same [Future] each time. | 141 /// This may be called more than once; it returns the same [Future] each time. |
| 134 Future close() { | 142 Future close() => _closeMemo.runOnce(() { |
| 135 if (_closeGroup != null) return _closeGroup.future; | 143 if (_closeGroup != null) return _closeGroup.future; |
| 136 | 144 |
| 137 _resetTimer(); | 145 _resetTimer(); |
| 138 | 146 |
| 139 _closeGroup = new FutureGroup(); | 147 _closeGroup = new FutureGroup(); |
| 140 for (var callback in _onReleaseCallbacks) { | 148 for (var callback in _onReleaseCallbacks) { |
| 141 _closeGroup.add(new Future.sync(callback)); | 149 _closeGroup.add(new Future.sync(callback)); |
| 142 } | 150 } |
| 143 | 151 |
| 144 _allocatedResources -= _onReleaseCallbacks.length; | 152 _allocatedResources -= _onReleaseCallbacks.length; |
| 145 _onReleaseCallbacks.clear(); | 153 _onReleaseCallbacks.clear(); |
| 146 | 154 |
| 147 if (_allocatedResources == 0) _closeGroup.close(); | 155 if (_allocatedResources == 0) _closeGroup.close(); |
| 148 return _closeGroup.future; | 156 return _closeGroup.future; |
| 149 } | 157 }); |
| 158 final _closeMemo = new AsyncMemoizer(); |
| 150 | 159 |
| 151 /// If there are any pending requests, this will fire the oldest one. | 160 /// If there are any pending requests, this will fire the oldest one. |
| 152 void _onResourceReleased() { | 161 void _onResourceReleased() { |
| 153 _resetTimer(); | 162 _resetTimer(); |
| 154 | 163 |
| 155 if (_requestedResources.isNotEmpty) { | 164 if (_requestedResources.isNotEmpty) { |
| 156 var pending = _requestedResources.removeFirst(); | 165 var pending = _requestedResources.removeFirst(); |
| 157 pending.complete(new PoolResource._(this)); | 166 pending.complete(new PoolResource._(this)); |
| 158 } else { | 167 } else { |
| 159 _allocatedResources--; | 168 _allocatedResources--; |
| (...skipping 24 matching lines...) Expand all Loading... |
| 184 /// | 193 /// |
| 185 /// Futures returned by [_runOnRelease] always complete in the order they were | 194 /// Futures returned by [_runOnRelease] always complete in the order they were |
| 186 /// created, even if earlier [onRelease] callbacks take longer to run. | 195 /// created, even if earlier [onRelease] callbacks take longer to run. |
| 187 Future<PoolResource> _runOnRelease(onRelease()) { | 196 Future<PoolResource> _runOnRelease(onRelease()) { |
| 188 new Future.sync(onRelease).then((value) { | 197 new Future.sync(onRelease).then((value) { |
| 189 _onReleaseCompleters.removeFirst().complete(new PoolResource._(this)); | 198 _onReleaseCompleters.removeFirst().complete(new PoolResource._(this)); |
| 190 }).catchError((error, stackTrace) { | 199 }).catchError((error, stackTrace) { |
| 191 _onReleaseCompleters.removeFirst().completeError(error, stackTrace); | 200 _onReleaseCompleters.removeFirst().completeError(error, stackTrace); |
| 192 }); | 201 }); |
| 193 | 202 |
| 194 var completer = new Completer.sync(); | 203 var completer = new Completer<PoolResource>.sync(); |
| 195 _onReleaseCompleters.add(completer); | 204 _onReleaseCompleters.add(completer); |
| 196 return completer.future; | 205 return completer.future; |
| 197 } | 206 } |
| 198 | 207 |
| 199 /// A resource has been requested, allocated, or released. | 208 /// A resource has been requested, allocated, or released. |
| 200 void _resetTimer() { | 209 void _resetTimer() { |
| 201 if (_timer == null) return; | 210 if (_timer == null) return; |
| 202 | 211 |
| 203 if (_requestedResources.isEmpty) { | 212 if (_requestedResources.isEmpty) { |
| 204 _timer.cancel(); | 213 _timer.cancel(); |
| (...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 257 /// produce additional information later on. For example, an isolate's task | 266 /// produce additional information later on. For example, an isolate's task |
| 258 /// may be complete, but it could still emit asynchronous errors. | 267 /// may be complete, but it could still emit asynchronous errors. |
| 259 void allowRelease(onRelease()) { | 268 void allowRelease(onRelease()) { |
| 260 if (_released) { | 269 if (_released) { |
| 261 throw new StateError("A PoolResource may only be released once."); | 270 throw new StateError("A PoolResource may only be released once."); |
| 262 } | 271 } |
| 263 _released = true; | 272 _released = true; |
| 264 _pool._onResourceReleaseAllowed(onRelease); | 273 _pool._onResourceReleaseAllowed(onRelease); |
| 265 } | 274 } |
| 266 } | 275 } |
| OLD | NEW |