Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(243)

Side by Side Diff: lib/pool.dart

Issue 1393193004: Add Pool.close(). (Closed) Base URL: git@github.com:dart-lang/pool@master
Patch Set: Code review changes Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « CHANGELOG.md ('k') | pubspec.yaml » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library pool; 5 library pool;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:collection'; 8 import 'dart:collection';
9 9
10 import 'package:async/async.dart';
10 import 'package:stack_trace/stack_trace.dart'; 11 import 'package:stack_trace/stack_trace.dart';
11 12
12 /// Manages an abstract pool of resources with a limit on how many may be in use 13 /// Manages an abstract pool of resources with a limit on how many may be in use
13 /// at once. 14 /// at once.
14 /// 15 ///
15 /// When a resource is needed, the user should call [request]. When the returned 16 /// When a resource is needed, the user should call [request]. When the returned
16 /// future completes with a [PoolResource], the resource may be allocated. Once 17 /// future completes with a [PoolResource], the resource may be allocated. Once
17 /// the resource has been released, the user should call [PoolResource.release]. 18 /// the resource has been released, the user should call [PoolResource.release].
18 /// The pool will ensure that only a certain number of [PoolResource]s may be 19 /// The pool will ensure that only a certain number of [PoolResource]s may be
19 /// allocated at once. 20 /// allocated at once.
(...skipping 28 matching lines...) Expand all
48 /// If [_timeout] isn't null, this timer is set as soon as the resource limit 49 /// If [_timeout] isn't null, this timer is set as soon as the resource limit
49 /// is reached and is reset every time an resource is released or a new 50 /// is reached and is reset every time an resource is released or a new
50 /// resource is requested. If it fires, that indicates that the caller became 51 /// resource is requested. If it fires, that indicates that the caller became
51 /// deadlocked, likely due to files waiting for additional files to be read 52 /// deadlocked, likely due to files waiting for additional files to be read
52 /// before they could be closed. 53 /// before they could be closed.
53 Timer _timer; 54 Timer _timer;
54 55
55 /// The amount of time to wait before timing out the pending resources. 56 /// The amount of time to wait before timing out the pending resources.
56 final Duration _timeout; 57 final Duration _timeout;
57 58
59 /// A [FutureGroup] that tracks all the `onRelease` callbacks for resources
60 /// that have been marked releasable.
61 ///
62 /// This is `null` until [close] is called.
63 FutureGroup _closeGroup;
64
65 /// Whether [close] has been called.
66 bool get isClosed => _closeGroup != null;
67
58 /// Creates a new pool with the given limit on how many resources may be 68 /// Creates a new pool with the given limit on how many resources may be
59 /// allocated at once. 69 /// allocated at once.
60 /// 70 ///
61 /// If [timeout] is passed, then if that much time passes without any activity 71 /// If [timeout] is passed, then if that much time passes without any activity
62 /// all pending [request] futures will throw a [TimeoutException]. This is 72 /// all pending [request] futures will throw a [TimeoutException]. This is
63 /// intended to avoid deadlocks. 73 /// intended to avoid deadlocks.
64 Pool(this._maxAllocatedResources, {Duration timeout}) 74 Pool(this._maxAllocatedResources, {Duration timeout})
65 : _timeout = timeout; 75 : _timeout = timeout;
66 76
67 /// Request a [PoolResource]. 77 /// Request a [PoolResource].
68 /// 78 ///
69 /// If the maximum number of resources is already allocated, this will delay 79 /// If the maximum number of resources is already allocated, this will delay
70 /// until one of them is released. 80 /// until one of them is released.
71 Future<PoolResource> request() { 81 Future<PoolResource> request() {
82 if (isClosed) {
83 throw new StateError("request() may not be called on a closed Pool.");
84 }
85
72 if (_allocatedResources < _maxAllocatedResources) { 86 if (_allocatedResources < _maxAllocatedResources) {
73 _allocatedResources++; 87 _allocatedResources++;
74 return new Future.value(new PoolResource._(this)); 88 return new Future.value(new PoolResource._(this));
75 } else if (_onReleaseCallbacks.isNotEmpty) { 89 } else if (_onReleaseCallbacks.isNotEmpty) {
76 return _runOnRelease(_onReleaseCallbacks.removeFirst()); 90 return _runOnRelease(_onReleaseCallbacks.removeFirst());
77 } else { 91 } else {
78 var completer = new Completer<PoolResource>(); 92 var completer = new Completer<PoolResource>();
79 _requestedResources.add(completer); 93 _requestedResources.add(completer);
80 _resetTimer(); 94 _resetTimer();
81 return completer.future; 95 return completer.future;
82 } 96 }
83 } 97 }
84 98
85 /// Requests a resource for the duration of [callback], which may return a 99 /// Requests a resource for the duration of [callback], which may return a
86 /// Future. 100 /// Future.
87 /// 101 ///
88 /// The return value of [callback] is piped to the returned Future. 102 /// The return value of [callback] is piped to the returned Future.
89 Future withResource(callback()) { 103 Future withResource(callback()) {
90 return request().then((resource) => new Future.sync(callback).whenComplete(r esource.release)); 104 if (isClosed) {
105 throw new StateError(
106 "withResource() may not be called on a closed Pool.");
107 }
108
109 // TODO(nweiz): Use async/await when sdk#23497 is fixed.
110 return request().then((resource) {
111 return new Future.sync(callback).whenComplete(resource.release);
112 });
113 }
114
115 /// Closes the pool so that no more resources are requested.
116 ///
117 /// Existing resource requests remain unchanged.
118 ///
119 /// Any resources that are marked as releasable using
120 /// [PoolResource.allowRelease] are released immediately. Once all resources
121 /// have been released and any `onRelease` callbacks have completed, the
122 /// returned future completes successfully. If any `onRelease` callback throws
123 /// an error, the returned future completes with that error.
124 ///
125 /// This may be called more than once; it returns the same [Future] each time.
126 Future close() {
127 if (_closeGroup != null) return _closeGroup.future;
128
129 _resetTimer();
130
131 _closeGroup = new FutureGroup();
132 for (var callback in _onReleaseCallbacks) {
133 _closeGroup.add(new Future.sync(callback));
134 }
135
136 _allocatedResources -= _onReleaseCallbacks.length;
137 _onReleaseCallbacks.clear();
138
139 if (_allocatedResources == 0) _closeGroup.close();
140 return _closeGroup.future;
91 } 141 }
92 142
93 /// If there are any pending requests, this will fire the oldest one. 143 /// If there are any pending requests, this will fire the oldest one.
94 void _onResourceReleased() { 144 void _onResourceReleased() {
95 _resetTimer(); 145 _resetTimer();
96 146
97 if (_requestedResources.isEmpty) { 147 if (_requestedResources.isNotEmpty) {
148 var pending = _requestedResources.removeFirst();
149 pending.complete(new PoolResource._(this));
150 } else {
98 _allocatedResources--; 151 _allocatedResources--;
99 return; 152 if (isClosed && _allocatedResources == 0) _closeGroup.close();
100 } 153 }
101
102 var pending = _requestedResources.removeFirst();
103 pending.complete(new PoolResource._(this));
104 } 154 }
105 155
106 /// If there are any pending requests, this will fire the oldest one after 156 /// If there are any pending requests, this will fire the oldest one after
107 /// running [onRelease]. 157 /// running [onRelease].
108 void _onResourceReleaseAllowed(onRelease()) { 158 void _onResourceReleaseAllowed(onRelease()) {
109 _resetTimer(); 159 _resetTimer();
110 160
111 if (_requestedResources.isEmpty) { 161 if (_requestedResources.isNotEmpty) {
162 var pending = _requestedResources.removeFirst();
163 pending.complete(_runOnRelease(onRelease));
164 } else if (isClosed) {
165 _closeGroup.add(new Future.sync(onRelease));
166 _allocatedResources--;
167 if (_allocatedResources == 0) _closeGroup.close();
168 } else {
112 _onReleaseCallbacks.add( 169 _onReleaseCallbacks.add(
113 Zone.current.bindCallback(onRelease, runGuarded: false)); 170 Zone.current.bindCallback(onRelease, runGuarded: false));
114 return;
115 } 171 }
116
117 var pending = _requestedResources.removeFirst();
118 pending.complete(_runOnRelease(onRelease));
119 } 172 }
120 173
121 /// Runs [onRelease] and returns a Future that completes to a resource once an 174 /// Runs [onRelease] and returns a Future that completes to a resource once an
122 /// [onRelease] callback completes. 175 /// [onRelease] callback completes.
123 /// 176 ///
124 /// Futures returned by [_runOnRelease] always complete in the order they were 177 /// Futures returned by [_runOnRelease] always complete in the order they were
125 /// created, even if earlier [onRelease] callbacks take longer to run. 178 /// created, even if earlier [onRelease] callbacks take longer to run.
126 Future<PoolResource> _runOnRelease(onRelease()) { 179 Future<PoolResource> _runOnRelease(onRelease()) {
127 new Future.sync(onRelease).then((value) { 180 new Future.sync(onRelease).then((value) {
128 _onReleaseCompleters.removeFirst().complete(new PoolResource._(this)); 181 _onReleaseCompleters.removeFirst().complete(new PoolResource._(this));
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
195 /// produce additional information later on. For example, an isolate's task 248 /// produce additional information later on. For example, an isolate's task
196 /// may be complete, but it could still emit asynchronous errors. 249 /// may be complete, but it could still emit asynchronous errors.
197 void allowRelease(onRelease()) { 250 void allowRelease(onRelease()) {
198 if (_released) { 251 if (_released) {
199 throw new StateError("A PoolResource may only be released once."); 252 throw new StateError("A PoolResource may only be released once.");
200 } 253 }
201 _released = true; 254 _released = true;
202 _pool._onResourceReleaseAllowed(onRelease); 255 _pool._onResourceReleaseAllowed(onRelease);
203 } 256 }
204 } 257 }
205
OLDNEW
« no previous file with comments | « CHANGELOG.md ('k') | pubspec.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698