| OLD | NEW | 
|---|
| 1 // Copyright (c) 2014, the Dart project authors.  Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors.  Please see the AUTHORS file | 
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a | 
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. | 
| 4 | 4 | 
| 5 library pool; | 5 library pool; | 
| 6 | 6 | 
| 7 import 'dart:async'; | 7 import 'dart:async'; | 
| 8 import 'dart:collection'; | 8 import 'dart:collection'; | 
| 9 | 9 | 
|  | 10 import 'package:async/async.dart'; | 
| 10 import 'package:stack_trace/stack_trace.dart'; | 11 import 'package:stack_trace/stack_trace.dart'; | 
| 11 | 12 | 
| 12 /// Manages an abstract pool of resources with a limit on how many may be in use | 13 /// Manages an abstract pool of resources with a limit on how many may be in use | 
| 13 /// at once. | 14 /// at once. | 
| 14 /// | 15 /// | 
| 15 /// When a resource is needed, the user should call [request]. When the returned | 16 /// When a resource is needed, the user should call [request]. When the returned | 
| 16 /// future completes with a [PoolResource], the resource may be allocated. Once | 17 /// future completes with a [PoolResource], the resource may be allocated. Once | 
| 17 /// the resource has been released, the user should call [PoolResource.release]. | 18 /// the resource has been released, the user should call [PoolResource.release]. | 
| 18 /// The pool will ensure that only a certain number of [PoolResource]s may be | 19 /// The pool will ensure that only a certain number of [PoolResource]s may be | 
| 19 /// allocated at once. | 20 /// allocated at once. | 
| (...skipping 28 matching lines...) Expand all  Loading... | 
| 48   /// If [_timeout] isn't null, this timer is set as soon as the resource limit | 49   /// If [_timeout] isn't null, this timer is set as soon as the resource limit | 
| 49   /// is reached and is reset every time an resource is released or a new | 50   /// is reached and is reset every time an resource is released or a new | 
| 50   /// resource is requested. If it fires, that indicates that the caller became | 51   /// resource is requested. If it fires, that indicates that the caller became | 
| 51   /// deadlocked, likely due to files waiting for additional files to be read | 52   /// deadlocked, likely due to files waiting for additional files to be read | 
| 52   /// before they could be closed. | 53   /// before they could be closed. | 
| 53   Timer _timer; | 54   Timer _timer; | 
| 54 | 55 | 
| 55   /// The amount of time to wait before timing out the pending resources. | 56   /// The amount of time to wait before timing out the pending resources. | 
| 56   final Duration _timeout; | 57   final Duration _timeout; | 
| 57 | 58 | 
|  | 59   /// A [FutureGroup] that tracks all the `onRelease` callbacks for resources | 
|  | 60   /// that have been marked releasable. | 
|  | 61   /// | 
|  | 62   /// This is `null` until [close] is called. | 
|  | 63   FutureGroup _closeGroup; | 
|  | 64 | 
|  | 65   /// Whether [close] has been called. | 
|  | 66   bool get isClosed => _closeGroup != null; | 
|  | 67 | 
| 58   /// Creates a new pool with the given limit on how many resources may be | 68   /// Creates a new pool with the given limit on how many resources may be | 
| 59   /// allocated at once. | 69   /// allocated at once. | 
| 60   /// | 70   /// | 
| 61   /// If [timeout] is passed, then if that much time passes without any activity | 71   /// If [timeout] is passed, then if that much time passes without any activity | 
| 62   /// all pending [request] futures will throw a [TimeoutException]. This is | 72   /// all pending [request] futures will throw a [TimeoutException]. This is | 
| 63   /// intended to avoid deadlocks. | 73   /// intended to avoid deadlocks. | 
| 64   Pool(this._maxAllocatedResources, {Duration timeout}) | 74   Pool(this._maxAllocatedResources, {Duration timeout}) | 
| 65       : _timeout = timeout; | 75       : _timeout = timeout; | 
| 66 | 76 | 
| 67   /// Request a [PoolResource]. | 77   /// Request a [PoolResource]. | 
| 68   /// | 78   /// | 
| 69   /// If the maximum number of resources is already allocated, this will delay | 79   /// If the maximum number of resources is already allocated, this will delay | 
| 70   /// until one of them is released. | 80   /// until one of them is released. | 
| 71   Future<PoolResource> request() { | 81   Future<PoolResource> request() { | 
|  | 82     if (isClosed) { | 
|  | 83       throw new StateError("request() may not be called on a closed Pool."); | 
|  | 84     } | 
|  | 85 | 
| 72     if (_allocatedResources < _maxAllocatedResources) { | 86     if (_allocatedResources < _maxAllocatedResources) { | 
| 73       _allocatedResources++; | 87       _allocatedResources++; | 
| 74       return new Future.value(new PoolResource._(this)); | 88       return new Future.value(new PoolResource._(this)); | 
| 75     } else if (_onReleaseCallbacks.isNotEmpty) { | 89     } else if (_onReleaseCallbacks.isNotEmpty) { | 
| 76       return _runOnRelease(_onReleaseCallbacks.removeFirst()); | 90       return _runOnRelease(_onReleaseCallbacks.removeFirst()); | 
| 77     } else { | 91     } else { | 
| 78       var completer = new Completer<PoolResource>(); | 92       var completer = new Completer<PoolResource>(); | 
| 79       _requestedResources.add(completer); | 93       _requestedResources.add(completer); | 
| 80       _resetTimer(); | 94       _resetTimer(); | 
| 81       return completer.future; | 95       return completer.future; | 
| 82     } | 96     } | 
| 83   } | 97   } | 
| 84 | 98 | 
| 85   /// Requests a resource for the duration of [callback], which may return a | 99   /// Requests a resource for the duration of [callback], which may return a | 
| 86   /// Future. | 100   /// Future. | 
| 87   /// | 101   /// | 
| 88   /// The return value of [callback] is piped to the returned Future. | 102   /// The return value of [callback] is piped to the returned Future. | 
| 89   Future withResource(callback()) { | 103   Future withResource(callback()) { | 
| 90     return request().then((resource) => new Future.sync(callback).whenComplete(r
     esource.release)); | 104     if (isClosed) { | 
|  | 105       throw new StateError( | 
|  | 106           "withResource() may not be called on a closed Pool."); | 
|  | 107     } | 
|  | 108 | 
|  | 109     // TODO(nweiz): Use async/await when sdk#23497 is fixed. | 
|  | 110     return request().then((resource) { | 
|  | 111       return new Future.sync(callback).whenComplete(resource.release); | 
|  | 112     }); | 
|  | 113   } | 
|  | 114 | 
|  | 115   /// Closes the pool so that no more resources are requested. | 
|  | 116   /// | 
|  | 117   /// Existing resource requests remain unchanged. | 
|  | 118   /// | 
|  | 119   /// Any resources that are marked as releasable using | 
|  | 120   /// [PoolResource.allowRelease] are released immediately. Once all resources | 
|  | 121   /// have been released and any `onRelease` callbacks have completed, the | 
|  | 122   /// returned future completes successfully. If any `onRelease` callback throws | 
|  | 123   /// an error, the returned future completes with that error. | 
|  | 124   /// | 
|  | 125   /// This may be called more than once; it returns the same [Future] each time. | 
|  | 126   Future close() { | 
|  | 127     if (_closeGroup != null) return _closeGroup.future; | 
|  | 128 | 
|  | 129     _resetTimer(); | 
|  | 130 | 
|  | 131     _closeGroup = new FutureGroup(); | 
|  | 132     for (var callback in _onReleaseCallbacks) { | 
|  | 133       _closeGroup.add(new Future.sync(callback)); | 
|  | 134     } | 
|  | 135 | 
|  | 136     _allocatedResources -= _onReleaseCallbacks.length; | 
|  | 137     _onReleaseCallbacks.clear(); | 
|  | 138 | 
|  | 139     if (_allocatedResources == 0) _closeGroup.close(); | 
|  | 140     return _closeGroup.future; | 
| 91   } | 141   } | 
| 92 | 142 | 
| 93   /// If there are any pending requests, this will fire the oldest one. | 143   /// If there are any pending requests, this will fire the oldest one. | 
| 94   void _onResourceReleased() { | 144   void _onResourceReleased() { | 
| 95     _resetTimer(); | 145     _resetTimer(); | 
| 96 | 146 | 
| 97     if (_requestedResources.isEmpty) { | 147     if (_requestedResources.isNotEmpty) { | 
|  | 148       var pending = _requestedResources.removeFirst(); | 
|  | 149       pending.complete(new PoolResource._(this)); | 
|  | 150     } else { | 
| 98       _allocatedResources--; | 151       _allocatedResources--; | 
| 99       return; | 152       if (isClosed && _allocatedResources == 0) _closeGroup.close(); | 
| 100     } | 153     } | 
| 101 |  | 
| 102     var pending = _requestedResources.removeFirst(); |  | 
| 103     pending.complete(new PoolResource._(this)); |  | 
| 104   } | 154   } | 
| 105 | 155 | 
| 106   /// If there are any pending requests, this will fire the oldest one after | 156   /// If there are any pending requests, this will fire the oldest one after | 
| 107   /// running [onRelease]. | 157   /// running [onRelease]. | 
| 108   void _onResourceReleaseAllowed(onRelease()) { | 158   void _onResourceReleaseAllowed(onRelease()) { | 
| 109     _resetTimer(); | 159     _resetTimer(); | 
| 110 | 160 | 
| 111     if (_requestedResources.isEmpty) { | 161     if (_requestedResources.isNotEmpty) { | 
|  | 162       var pending = _requestedResources.removeFirst(); | 
|  | 163       pending.complete(_runOnRelease(onRelease)); | 
|  | 164     } else if (isClosed) { | 
|  | 165       _closeGroup.add(new Future.sync(onRelease)); | 
|  | 166       _allocatedResources--; | 
|  | 167       if (_allocatedResources == 0) _closeGroup.close(); | 
|  | 168     } else { | 
| 112       _onReleaseCallbacks.add( | 169       _onReleaseCallbacks.add( | 
| 113           Zone.current.bindCallback(onRelease, runGuarded: false)); | 170           Zone.current.bindCallback(onRelease, runGuarded: false)); | 
| 114       return; |  | 
| 115     } | 171     } | 
| 116 |  | 
| 117     var pending = _requestedResources.removeFirst(); |  | 
| 118     pending.complete(_runOnRelease(onRelease)); |  | 
| 119   } | 172   } | 
| 120 | 173 | 
| 121   /// Runs [onRelease] and returns a Future that completes to a resource once an | 174   /// Runs [onRelease] and returns a Future that completes to a resource once an | 
| 122   /// [onRelease] callback completes. | 175   /// [onRelease] callback completes. | 
| 123   /// | 176   /// | 
| 124   /// Futures returned by [_runOnRelease] always complete in the order they were | 177   /// Futures returned by [_runOnRelease] always complete in the order they were | 
| 125   /// created, even if earlier [onRelease] callbacks take longer to run. | 178   /// created, even if earlier [onRelease] callbacks take longer to run. | 
| 126   Future<PoolResource> _runOnRelease(onRelease()) { | 179   Future<PoolResource> _runOnRelease(onRelease()) { | 
| 127     new Future.sync(onRelease).then((value) { | 180     new Future.sync(onRelease).then((value) { | 
| 128       _onReleaseCompleters.removeFirst().complete(new PoolResource._(this)); | 181       _onReleaseCompleters.removeFirst().complete(new PoolResource._(this)); | 
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 195   /// produce additional information later on. For example, an isolate's task | 248   /// produce additional information later on. For example, an isolate's task | 
| 196   /// may be complete, but it could still emit asynchronous errors. | 249   /// may be complete, but it could still emit asynchronous errors. | 
| 197   void allowRelease(onRelease()) { | 250   void allowRelease(onRelease()) { | 
| 198     if (_released) { | 251     if (_released) { | 
| 199       throw new StateError("A PoolResource may only be released once."); | 252       throw new StateError("A PoolResource may only be released once."); | 
| 200     } | 253     } | 
| 201     _released = true; | 254     _released = true; | 
| 202     _pool._onResourceReleaseAllowed(onRelease); | 255     _pool._onResourceReleaseAllowed(onRelease); | 
| 203   } | 256   } | 
| 204 } | 257 } | 
| 205 |  | 
| OLD | NEW | 
|---|