Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library pool; | 5 library pool; |
| 6 | 6 |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 import 'dart:collection'; | 8 import 'dart:collection'; |
| 9 | 9 |
| 10 import 'package:async/async.dart'; | |
| 10 import 'package:stack_trace/stack_trace.dart'; | 11 import 'package:stack_trace/stack_trace.dart'; |
| 11 | 12 |
| 12 /// Manages an abstract pool of resources with a limit on how many may be in use | 13 /// Manages an abstract pool of resources with a limit on how many may be in use |
| 13 /// at once. | 14 /// at once. |
| 14 /// | 15 /// |
| 15 /// When a resource is needed, the user should call [request]. When the returned | 16 /// When a resource is needed, the user should call [request]. When the returned |
| 16 /// future completes with a [PoolResource], the resource may be allocated. Once | 17 /// future completes with a [PoolResource], the resource may be allocated. Once |
| 17 /// the resource has been released, the user should call [PoolResource.release]. | 18 /// the resource has been released, the user should call [PoolResource.release]. |
| 18 /// The pool will ensure that only a certain number of [PoolResource]s may be | 19 /// The pool will ensure that only a certain number of [PoolResource]s may be |
| 19 /// allocated at once. | 20 /// allocated at once. |
| (...skipping 28 matching lines...) Expand all Loading... | |
| 48 /// If [_timeout] isn't null, this timer is set as soon as the resource limit | 49 /// If [_timeout] isn't null, this timer is set as soon as the resource limit |
| 49 /// is reached and is reset every time an resource is released or a new | 50 /// is reached and is reset every time an resource is released or a new |
| 50 /// resource is requested. If it fires, that indicates that the caller became | 51 /// resource is requested. If it fires, that indicates that the caller became |
| 51 /// deadlocked, likely due to files waiting for additional files to be read | 52 /// deadlocked, likely due to files waiting for additional files to be read |
| 52 /// before they could be closed. | 53 /// before they could be closed. |
| 53 Timer _timer; | 54 Timer _timer; |
| 54 | 55 |
| 55 /// The amount of time to wait before timing out the pending resources. | 56 /// The amount of time to wait before timing out the pending resources. |
| 56 final Duration _timeout; | 57 final Duration _timeout; |
| 57 | 58 |
| 59 /// A [FutureGroup] that tracks all the `onRelease` callbacks for resources | |
| 60 /// that have been marked releasable. | |
| 61 /// | |
| 62 /// This is `null` until [close] is called. | |
| 63 FutureGroup _closeGroup; | |
| 64 | |
| 65 /// Whether [close] has been called. | |
| 66 bool get isClosed => _closeGroup != null; | |
| 67 | |
| 58 /// Creates a new pool with the given limit on how many resources may be | 68 /// Creates a new pool with the given limit on how many resources may be |
| 59 /// allocated at once. | 69 /// allocated at once. |
| 60 /// | 70 /// |
| 61 /// If [timeout] is passed, then if that much time passes without any activity | 71 /// If [timeout] is passed, then if that much time passes without any activity |
| 62 /// all pending [request] futures will throw a [TimeoutException]. This is | 72 /// all pending [request] futures will throw a [TimeoutException]. This is |
| 63 /// intended to avoid deadlocks. | 73 /// intended to avoid deadlocks. |
| 64 Pool(this._maxAllocatedResources, {Duration timeout}) | 74 Pool(this._maxAllocatedResources, {Duration timeout}) |
| 65 : _timeout = timeout; | 75 : _timeout = timeout; |
| 66 | 76 |
| 67 /// Request a [PoolResource]. | 77 /// Request a [PoolResource]. |
| 68 /// | 78 /// |
| 69 /// If the maximum number of resources is already allocated, this will delay | 79 /// If the maximum number of resources is already allocated, this will delay |
| 70 /// until one of them is released. | 80 /// until one of them is released. |
| 71 Future<PoolResource> request() { | 81 Future<PoolResource> request() { |
| 82 if (isClosed) { | |
| 83 throw new StateError("request() may not be called on a closed Pool."); | |
| 84 } | |
| 85 | |
| 72 if (_allocatedResources < _maxAllocatedResources) { | 86 if (_allocatedResources < _maxAllocatedResources) { |
| 73 _allocatedResources++; | 87 _allocatedResources++; |
| 74 return new Future.value(new PoolResource._(this)); | 88 return new Future.value(new PoolResource._(this)); |
| 75 } else if (_onReleaseCallbacks.isNotEmpty) { | 89 } else if (_onReleaseCallbacks.isNotEmpty) { |
| 76 return _runOnRelease(_onReleaseCallbacks.removeFirst()); | 90 return _runOnRelease(_onReleaseCallbacks.removeFirst()); |
| 77 } else { | 91 } else { |
| 78 var completer = new Completer<PoolResource>(); | 92 var completer = new Completer<PoolResource>(); |
| 79 _requestedResources.add(completer); | 93 _requestedResources.add(completer); |
| 80 _resetTimer(); | 94 _resetTimer(); |
| 81 return completer.future; | 95 return completer.future; |
| 82 } | 96 } |
| 83 } | 97 } |
| 84 | 98 |
| 85 /// Requests a resource for the duration of [callback], which may return a | 99 /// Requests a resource for the duration of [callback], which may return a |
| 86 /// Future. | 100 /// Future. |
| 87 /// | 101 /// |
| 88 /// The return value of [callback] is piped to the returned Future. | 102 /// The return value of [callback] is piped to the returned Future. |
| 89 Future withResource(callback()) { | 103 Future withResource(callback()) { |
| 90 return request().then((resource) => new Future.sync(callback).whenComplete(r esource.release)); | 104 if (isClosed) { |
| 105 throw new StateError( | |
| 106 "withResource() may not be called on a closed Pool."); | |
| 107 } | |
| 108 | |
| 109 return request().then((resource) { | |
|
Bob Nystrom
2015/10/09 00:25:25
Use async/await?
nweiz
2015/10/09 00:46:19
This is used in test, which means we have to work
| |
| 110 return new Future.sync(callback).whenComplete(resource.release); | |
| 111 }); | |
| 112 } | |
| 113 | |
| 114 /// Closes the pool so that no more resources are requested. | |
| 115 /// | |
| 116 /// Existing resource requests remain unchanged. | |
| 117 /// | |
| 118 /// Any resources that are marked as releasable using | |
| 119 /// [PoolResource.allowRelease] are released immediately. Once all resources | |
| 120 /// have been released and any `onRelease` callbacks have completed, the | |
| 121 /// returned future completes successfully. If any `onRelease` callback throws | |
| 122 /// an error, the returned future completes with that error. | |
| 123 Future close() { | |
| 124 if (_closeGroup != null) return _closeGroup.future; | |
|
Bob Nystrom
2015/10/09 00:25:25
Document that it can be called more than once.
nweiz
2015/10/09 00:46:19
Done.
| |
| 125 | |
| 126 _resetTimer(); | |
| 127 | |
| 128 _closeGroup = new FutureGroup(); | |
| 129 for (var callback in _onReleaseCallbacks) { | |
| 130 _closeGroup.add(new Future.sync(callback)); | |
| 131 } | |
| 132 | |
| 133 _allocatedResources -= _onReleaseCallbacks.length; | |
| 134 _onReleaseCallbacks.clear(); | |
| 135 | |
| 136 if (_allocatedResources == 0) _closeGroup.close(); | |
| 137 return _closeGroup.future; | |
| 91 } | 138 } |
| 92 | 139 |
| 93 /// If there are any pending requests, this will fire the oldest one. | 140 /// If there are any pending requests, this will fire the oldest one. |
| 94 void _onResourceReleased() { | 141 void _onResourceReleased() { |
| 95 _resetTimer(); | 142 _resetTimer(); |
| 96 | 143 |
| 97 if (_requestedResources.isEmpty) { | 144 if (_requestedResources.isNotEmpty) { |
| 145 var pending = _requestedResources.removeFirst(); | |
| 146 pending.complete(new PoolResource._(this)); | |
| 147 } else { | |
| 98 _allocatedResources--; | 148 _allocatedResources--; |
| 99 return; | 149 if (isClosed && _allocatedResources == 0) _closeGroup.close(); |
| 100 } | 150 } |
| 101 | |
| 102 var pending = _requestedResources.removeFirst(); | |
| 103 pending.complete(new PoolResource._(this)); | |
| 104 } | 151 } |
| 105 | 152 |
| 106 /// If there are any pending requests, this will fire the oldest one after | 153 /// If there are any pending requests, this will fire the oldest one after |
| 107 /// running [onRelease]. | 154 /// running [onRelease]. |
| 108 void _onResourceReleaseAllowed(onRelease()) { | 155 void _onResourceReleaseAllowed(onRelease()) { |
| 109 _resetTimer(); | 156 _resetTimer(); |
| 110 | 157 |
| 111 if (_requestedResources.isEmpty) { | 158 if (_requestedResources.isNotEmpty) { |
| 159 var pending = _requestedResources.removeFirst(); | |
| 160 pending.complete(_runOnRelease(onRelease)); | |
| 161 } else if (isClosed) { | |
| 162 _closeGroup.add(new Future.sync(onRelease)); | |
| 163 _allocatedResources--; | |
| 164 if (_allocatedResources == 0) _closeGroup.close(); | |
| 165 } else { | |
| 112 _onReleaseCallbacks.add( | 166 _onReleaseCallbacks.add( |
| 113 Zone.current.bindCallback(onRelease, runGuarded: false)); | 167 Zone.current.bindCallback(onRelease, runGuarded: false)); |
| 114 return; | |
| 115 } | 168 } |
| 116 | |
| 117 var pending = _requestedResources.removeFirst(); | |
| 118 pending.complete(_runOnRelease(onRelease)); | |
| 119 } | 169 } |
| 120 | 170 |
| 121 /// Runs [onRelease] and returns a Future that completes to a resource once an | 171 /// Runs [onRelease] and returns a Future that completes to a resource once an |
| 122 /// [onRelease] callback completes. | 172 /// [onRelease] callback completes. |
| 123 /// | 173 /// |
| 124 /// Futures returned by [_runOnRelease] always complete in the order they were | 174 /// Futures returned by [_runOnRelease] always complete in the order they were |
| 125 /// created, even if earlier [onRelease] callbacks take longer to run. | 175 /// created, even if earlier [onRelease] callbacks take longer to run. |
| 126 Future<PoolResource> _runOnRelease(onRelease()) { | 176 Future<PoolResource> _runOnRelease(onRelease()) { |
| 127 new Future.sync(onRelease).then((value) { | 177 new Future.sync(onRelease).then((value) { |
| 128 _onReleaseCompleters.removeFirst().complete(new PoolResource._(this)); | 178 _onReleaseCompleters.removeFirst().complete(new PoolResource._(this)); |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 195 /// produce additional information later on. For example, an isolate's task | 245 /// produce additional information later on. For example, an isolate's task |
| 196 /// may be complete, but it could still emit asynchronous errors. | 246 /// may be complete, but it could still emit asynchronous errors. |
| 197 void allowRelease(onRelease()) { | 247 void allowRelease(onRelease()) { |
| 198 if (_released) { | 248 if (_released) { |
| 199 throw new StateError("A PoolResource may only be released once."); | 249 throw new StateError("A PoolResource may only be released once."); |
| 200 } | 250 } |
| 201 _released = true; | 251 _released = true; |
| 202 _pool._onResourceReleaseAllowed(onRelease); | 252 _pool._onResourceReleaseAllowed(onRelease); |
| 203 } | 253 } |
| 204 } | 254 } |
| 205 | |
| OLD | NEW |