Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(240)

Side by Side Diff: lib/loadbalancer.dart

Issue 928663003: Add IsolateRunner as a helper around Isolate. (Closed) Base URL: https://github.com/dart-lang/isolate.git@master
Patch Set: Add .status. Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/isolaterunner.dart ('k') | lib/ports.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 /**
6 * A load-balancing runner pool.
7 */
8 library dart.pkg.isolate.loadbalancer;
9
10 import "runner.dart";
11 import "src/errors.dart";
12 import "src/lists.dart";
13 import "dart:async" show Future;
14
15 /**
16 * A pool of runners, ordered by load.
17 *
18 * Keeps a pool of runners,
19 * and allows running function through the runner with the lowest current load.
20 */
21 class LoadBalancer implements Runner {
22 // A heap-based priority queue of entries, prioritized by `load`.
23 // Each entry has its own entry in the queue, for faster update.
24 List<_LoadBalancerEntry> _queue;
25
26 // The number of entries currently in the queue.
27 int _length;
28
29 // Whether [stop] has been called.
30 Future _stopFuture = null;
31
32 /**
33 * Create a load balancer for [service] with [size] isolates.
34 */
35 LoadBalancer(Iterable<Runner> runners) : this._(_createEntries(runners));
36
37 LoadBalancer._(List<_LoadBalancerEntry> entries)
38 : _queue = entries,
39 _length = entries.length {
40 for (int i = 0; i < _length; i++) {
41 _queue[i].queueIndex = i;
42 }
43 }
44
45 /**
46 * The number of runners currently in the pool.
47 */
48 int get length => _length;
49
50 /**
51 * Asynchronously create [size] runners and create a `LoadBalancer` of those.
52 *
53 * This is a helper function that makes it easy to create a `LoadBalancer`
54 * with asynchronously created runners, for example:
55 *
56 * var isolatePool = LoadBalancer.create(10, IsolateRunner.spawn);
57 */
58 static Future<LoadBalancer> create(int size, Future<Runner> createRunner()) {
59 return Future.wait(new Iterable.generate(size, (_) => createRunner()),
60 cleanUp: (Runner runner) { runner.close(); })
61 .then((runners) => new LoadBalancer(runners));
62 }
63
64 static List<_LoadBalancerEntry> _createEntries(Iterable<Runner> runners) {
65 var entries = runners.map((runner) => new _LoadBalancerEntry(runner));
66 return new List<_LoadBalancerEntry>.from(entries, growable: false);
67 }
68
69 /**
70 * Execute the command in the currently least loaded isolate.
71 *
72 * The optional [load] parameter represents the load that the command
73 * is causing on the isolate where it runs.
74 * The number has no fixed meaning, but should be seen as relative to
75 * other commands run in the same load balancer.
76 * The `load` must not be negative.
77 *
78 * If [timeout] and [onTimeout] are provided, they are forwarded to
79 * the runner running the function, which will handle a timeout
80 * as normal.
81 */
82 Future run(function(argument), argument, {Duration timeout,
83 onTimeout(),
84 int load: 100}) {
85 RangeError.checkNotNegative(load, "load");
86 _LoadBalancerEntry entry = _first;
87 _increaseLoad(entry, load);
88 return entry.run(this, load, function, argument, timeout, onTimeout);
89 }
90
91 /**
92 * Execute the same function in the least loaded [count] isolates.
93 *
94 * This guarantees that the function isn't run twice in the same isolate,
95 * so `count` is not allowed to exceed [length].
96 *
97 * The optional [load] parameter represents the load that the command
98 * is causing on the isolate where it runs.
99 * The number has no fixed meaning, but should be seen as relative to
100 * other commands run in the same load balancer.
101 * The `load` must not be negative.
102 *
103 * If [timeout] and [onTimeout] are provided, they are forwarded to
104 * the runners running the function, which will handle any timeouts
105 * as normal.
106 */
107 List<Future> runMultiple(int count, function(argument), argument,
108 {Duration timeout,
109 onTimeout(),
110 int load: 100}) {
111 RangeError.checkValueInInterval(count, 1, _length, "count");
112 RangeError.checkNotNegative(load, "load");
113 if (count == 1) {
114 return list1(run(function, argument, load: load,
115 timeout: timeout, onTimeout: onTimeout));
116 }
117 List result = new List<Future>(count);
118 if (count == _length) {
119 // No need to change the order of entries in the queue.
120 for (int i = 0; i < count; i++) {
121 _LoadBalancerEntry entry = _queue[i];
122 entry.load += load;
123 result[i] = entry.run(this, load, function, argument,
124 timeout, onTimeout);
125 }
126 } else {
127 // Remove the [count] least loaded services and run the
128 // command on each, then add them back to the queue.
129 // This avoids running the same command twice in the same
130 // isolate.
131 List entries = new List(count);
132 for (int i = 0; i < count; i++) {
133 entries[i] = _removeFirst();
134 }
135 for (int i = 0; i < count; i++) {
136 _LoadBalancerEntry entry = entries[i];
137 entry.load += load;
138 _add(entry);
139 result[i] = entry.run(this, load, function, argument,
140 timeout, onTimeout);
141 }
142 }
143 return result;
144 }
145
146 Future close() {
147 if (_stopFuture != null) return _stopFuture;
148 _stopFuture = MultiError.waitUnordered(_queue.take(_length)
149 .map((e) => e.close()));
150 // Remove all entries.
151 for (int i = 0; i < _length; i++) _queue[i].queueIndex = -1;
152 _queue = null;
153 _length = 0;
154 return _stopFuture;
155 }
156
157 /**
158 * Place [element] in heap at [index] or above.
159 *
160 * Put element into the empty cell at `index`.
161 * While the `element` has higher priority than the
162 * parent, swap it with the parent.
163 */
164 void _bubbleUp(_LoadBalancerEntry element, int index) {
165 while (index > 0) {
166 int parentIndex = (index - 1) ~/ 2;
167 _LoadBalancerEntry parent = _queue[parentIndex];
168 if (element.compareTo(parent) > 0) break;
169 _queue[index] = parent;
170 parent.queueIndex = index;
171 index = parentIndex;
172 }
173 _queue[index] = element;
174 element.queueIndex = index;
175 }
176
177 /**
178 * Place [element] in heap at [index] or above.
179 *
180 * Put element into the empty cell at `index`.
181 * While the `element` has lower priority than either child,
182 * swap it with the highest priority child.
183 */
184 void _bubbleDown(_LoadBalancerEntry element, int index) {
185 while (true) {
186 int childIndex = index * 2 + 1; // Left child index.
187 if (childIndex >= _length) break;
188 _LoadBalancerEntry child = _queue[childIndex];
189 int rightChildIndex = childIndex + 1;
190 if (rightChildIndex < _length) {
191 _LoadBalancerEntry rightChild = _queue[rightChildIndex];
192 if (rightChild.compareTo(child) < 0) {
193 childIndex = rightChildIndex;
194 child = rightChild;
195 }
196 }
197 if (element.compareTo(child) <= 0) break;
198 _queue[index] = child;
199 child.queueIndex = index;
200 index = childIndex;
201 }
202 _queue[index] = element;
203 element.queueIndex = index;
204 }
205
206 /**
207 * Removes the entry from the queue, but doesn't stop its service.
208 *
209 * The entry is expected to be either added back to the queue
210 * immediately or have its stop method called.
211 */
212 void _remove(_LoadBalancerEntry entry) {
213 int index = entry.queueIndex;
214 if (index < 0) return;
215 entry.queueIndex = -1;
216 _length--;
217 _LoadBalancerEntry replacement = _queue[_length];
218 _queue[_length] = null;
219 if (index < _length) {
220 if (entry.compareTo(replacement) < 0) {
221 _bubbleDown(replacement, index);
222 } else {
223 _bubbleUp(replacement, index);
224 }
225 }
226 }
227
228 /**
229 * Adds entry to the queue.
230 */
231 void _add(_LoadBalancerEntry entry) {
232 if (_stopFuture != null) throw new StateError("LoadBalancer is stopped");
233 assert(entry.queueIndex < 0);
234 if (_queue.length == _length) {
235 _grow();
236 }
237 int index = _length;
238 _length = index + 1;
239 _bubbleUp(entry, index);
240 }
241
242 void _increaseLoad(_LoadBalancerEntry entry, int load) {
243 assert(load >= 0);
244 entry.load += load;
245 if (entry.inQueue) {
246 _bubbleDown(entry, entry.queueIndex);
247 }
248 }
249
250 void _decreaseLoad(_LoadBalancerEntry entry, int load) {
251 assert(load >= 0);
252 entry.load -= load;
253 if (entry.inQueue) {
254 _bubbleUp(entry, entry.queueIndex);
255 }
256 }
257
258 void _grow() {
259 List newQueue = new List(_length * 2);
260 newQueue.setRange(0, _length, _queue);
261 _queue = newQueue;
262 }
263
264 _LoadBalancerEntry get _first {
265 assert(_length > 0);
266 return _queue[0];
267 }
268
269 _LoadBalancerEntry _removeFirst() {
270 _LoadBalancerEntry result = _first;
271 _remove(result);
272 return result;
273 }
274 }
275
276 class _LoadBalancerEntry implements Comparable<_LoadBalancerEntry> {
277 // The current load on the isolate.
278 int load = 0;
279 // The current index in the heap-queue.
280 // Negative when the entry is not part of the queue.
281 int queueIndex = -1;
282
283 // The service used to send commands to the other isolate.
284 Runner runner;
285
286 _LoadBalancerEntry(Runner runner)
287 : runner = runner;
288
289 /** Whether the entry is still in the queue. */
290 bool get inQueue => queueIndex >= 0;
291
292 Future run(LoadBalancer balancer, int load, function(argumen), argument,
293 Duration timeout, onTimeout()) {
294 return runner.run(function, argument,
295 timeout: timeout, onTimeout: onTimeout).whenComplete(() {
296 balancer._decreaseLoad(this, load);
297 });
298 }
299
300 Future close() => runner.close();
301
302 int compareTo(_LoadBalancerEntry other) => load - other.load;
303 }
304
305
OLDNEW
« no previous file with comments | « lib/isolaterunner.dart ('k') | lib/ports.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698