Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(272)

Side by Side Diff: pool/lib/pool.dart

Issue 1400473008: Roll Observatory packages and add a roll script (Closed) Base URL: git@github.com:dart-lang/observatory_pub_packages.git@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « polymer_interop/pubspec.yaml ('k') | pool/pubspec.yaml » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library pool;
6
7 import 'dart:async';
8 import 'dart:collection';
9
10 import 'package:stack_trace/stack_trace.dart';
11
12 /// Manages an abstract pool of resources with a limit on how many may be in use
13 /// at once.
14 ///
15 /// When a resource is needed, the user should call [request]. When the returned
16 /// future completes with a [PoolResource], the resource may be allocated. Once
17 /// the resource has been released, the user should call [PoolResource.release].
18 /// The pool will ensure that only a certain number of [PoolResource]s may be
19 /// allocated at once.
20 class Pool {
21 /// Completers for requests beyond the first [_maxAllocatedResources].
22 ///
23 /// When an item is released, the next element of [_requestedResources] will
24 /// be completed.
25 final _requestedResources = new Queue<Completer<PoolResource>>();
26
27 /// Callbacks that must be called before additional resources can be
28 /// allocated.
29 ///
30 /// See [PoolResource.allowRelease].
31 final _onReleaseCallbacks = new Queue<Function>();
32
33 /// Completers that will be completed once `onRelease` callbacks are done
34 /// running.
35 ///
36 /// These are kept in a queue to ensure that the earliest request completes
37 /// first regardless of what order the `onRelease` callbacks complete in.
38 final _onReleaseCompleters = new Queue<Completer<PoolResource>>();
39
40 /// The maximum number of resources that may be allocated at once.
41 final int _maxAllocatedResources;
42
43 /// The number of resources that are currently allocated.
44 int _allocatedResources = 0;
45
46 /// The timeout timer.
47 ///
48 /// If [_timeout] isn't null, this timer is set as soon as the resource limit
49 /// is reached and is reset every time an resource is released or a new
50 /// resource is requested. If it fires, that indicates that the caller became
51 /// deadlocked, likely due to files waiting for additional files to be read
52 /// before they could be closed.
53 Timer _timer;
54
55 /// The amount of time to wait before timing out the pending resources.
56 final Duration _timeout;
57
58 /// Creates a new pool with the given limit on how many resources may be
59 /// allocated at once.
60 ///
61 /// If [timeout] is passed, then if that much time passes without any activity
62 /// all pending [request] futures will throw a [TimeoutException]. This is
63 /// intended to avoid deadlocks.
64 Pool(this._maxAllocatedResources, {Duration timeout})
65 : _timeout = timeout;
66
67 /// Request a [PoolResource].
68 ///
69 /// If the maximum number of resources is already allocated, this will delay
70 /// until one of them is released.
71 Future<PoolResource> request() {
72 if (_allocatedResources < _maxAllocatedResources) {
73 _allocatedResources++;
74 return new Future.value(new PoolResource._(this));
75 } else if (_onReleaseCallbacks.isNotEmpty) {
76 return _runOnRelease(_onReleaseCallbacks.removeFirst());
77 } else {
78 var completer = new Completer<PoolResource>();
79 _requestedResources.add(completer);
80 _resetTimer();
81 return completer.future;
82 }
83 }
84
85 /// Requests a resource for the duration of [callback], which may return a
86 /// Future.
87 ///
88 /// The return value of [callback] is piped to the returned Future.
89 Future withResource(callback()) {
90 return request().then((resource) =>
91 Chain.track(new Future.sync(callback)).whenComplete(resource.release));
92 }
93
94 /// If there are any pending requests, this will fire the oldest one.
95 void _onResourceReleased() {
96 _resetTimer();
97
98 if (_requestedResources.isEmpty) {
99 _allocatedResources--;
100 return;
101 }
102
103 var pending = _requestedResources.removeFirst();
104 pending.complete(new PoolResource._(this));
105 }
106
107 /// If there are any pending requests, this will fire the oldest one after
108 /// running [onRelease].
109 void _onResourceReleaseAllowed(onRelease()) {
110 _resetTimer();
111
112 if (_requestedResources.isEmpty) {
113 _onReleaseCallbacks.add(
114 Zone.current.bindCallback(onRelease, runGuarded: false));
115 return;
116 }
117
118 var pending = _requestedResources.removeFirst();
119 pending.complete(_runOnRelease(onRelease));
120 }
121
122 /// Runs [onRelease] and returns a Future that completes to a resource once an
123 /// [onRelease] callback completes.
124 ///
125 /// Futures returned by [_runOnRelease] always complete in the order they were
126 /// created, even if earlier [onRelease] callbacks take longer to run.
127 Future<PoolResource> _runOnRelease(onRelease()) {
128 new Future.sync(onRelease).then((value) {
129 _onReleaseCompleters.removeFirst().complete(new PoolResource._(this));
130 }).catchError((error, stackTrace) {
131 _onReleaseCompleters.removeFirst().completeError(error, stackTrace);
132 });
133
134 var completer = new Completer.sync();
135 _onReleaseCompleters.add(completer);
136 return completer.future;
137 }
138
139 /// A resource has been requested, allocated, or released.
140 void _resetTimer() {
141 if (_timer != null) _timer.cancel();
142 if (_timeout == null || _requestedResources.isEmpty) {
143 _timer = null;
144 } else {
145 _timer = new Timer(_timeout, _onTimeout);
146 }
147 }
148
149 /// Handles [_timer] timing out by causing all pending resource completers to
150 /// emit exceptions.
151 void _onTimeout() {
152 for (var completer in _requestedResources) {
153 completer.completeError(
154 new TimeoutException("Pool deadlock: all resources have been "
155 "allocated for too long.",
156 _timeout),
157 new Chain.current());
158 }
159 _requestedResources.clear();
160 _timer = null;
161 }
162 }
163
164 /// A member of a [Pool].
165 ///
166 /// A [PoolResource] is a token that indicates that a resource is allocated.
167 /// When the associated resource is released, the user should call [release].
168 class PoolResource {
169 final Pool _pool;
170
171 /// Whether [this] has been released yet.
172 bool _released = false;
173
174 PoolResource._(this._pool);
175
176 /// Tells the parent [Pool] that the resource associated with this resource is
177 /// no longer allocated, and that a new [PoolResource] may be allocated.
178 void release() {
179 if (_released) {
180 throw new StateError("A PoolResource may only be released once.");
181 }
182 _released = true;
183 _pool._onResourceReleased();
184 }
185
186 /// Tells the parent [Pool] that the resource associated with this resource is
187 /// no longer necessary, but should remain allocated until more resources are
188 /// needed.
189 ///
190 /// When [Pool.request] is called and there are no remaining available
191 /// resources, the [onRelease] callback is called. It should free the
192 /// resource, and it may return a Future or `null`. Once that completes, the
193 /// [Pool.request] call will complete to a new [PoolResource].
194 ///
195 /// This is useful when a resource's main function is complete, but it may
196 /// produce additional information later on. For example, an isolate's task
197 /// may be complete, but it could still emit asynchronous errors.
198 void allowRelease(onRelease()) {
199 if (_released) {
200 throw new StateError("A PoolResource may only be released once.");
201 }
202 _released = true;
203 _pool._onResourceReleaseAllowed(onRelease);
204 }
205 }
206
OLDNEW
« no previous file with comments | « polymer_interop/pubspec.yaml ('k') | pool/pubspec.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698