Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(511)

Side by Side Diff: packages/isolate/lib/load_balancer.dart

Issue 2990843002: Removed fixed dependencies (Closed)
Patch Set: Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « packages/isolate/lib/isolate_runner.dart ('k') | packages/isolate/lib/ports.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 /// A load-balancing runner pool.
6 library isolate.load_balancer;
7
8 import 'dart:async' show Future;
9
10 import 'runner.dart';
11 import 'src/errors.dart';
12 import 'src/lists.dart';
13
14 /// A pool of runners, ordered by load.
15 ///
16 /// Keeps a pool of runners,
17 /// and allows running function through the runner with the lowest current load.
18 class LoadBalancer implements Runner {
19 // A heap-based priority queue of entries, prioritized by `load`.
20 // Each entry has its own entry in the queue, for faster update.
21 List<_LoadBalancerEntry> _queue;
22
23 // The number of entries currently in the queue.
24 int _length;
25
26 // Whether [stop] has been called.
27 Future _stopFuture = null;
28
29 /// Create a load balancer for [service] with [size] isolates.
30 LoadBalancer(Iterable<Runner> runners) : this._(_createEntries(runners));
31
32 LoadBalancer._(List<_LoadBalancerEntry> entries)
33 : _queue = entries,
34 _length = entries.length {
35 for (int i = 0; i < _length; i++) {
36 _queue[i].queueIndex = i;
37 }
38 }
39
40 /// The number of runners currently in the pool.
41 int get length => _length;
42
43 /// Asynchronously create [size] runners and create a `LoadBalancer` of those.
44 ///
45 /// This is a helper function that makes it easy to create a `LoadBalancer`
46 /// with asynchronously created runners, for example:
47 ///
48 /// var isolatePool = LoadBalancer.create(10, IsolateRunner.spawn);
49 static Future<LoadBalancer> create(int size, Future<Runner> createRunner()) {
50 return Future.wait(new Iterable.generate(size, (_) => createRunner()),
51 cleanUp: (Runner runner) { runner.close(); })
52 .then((runners) => new LoadBalancer(runners));
53 }
54
55 static List<_LoadBalancerEntry> _createEntries(Iterable<Runner> runners) {
56 var entries = runners.map((runner) => new _LoadBalancerEntry(runner));
57 return new List<_LoadBalancerEntry>.from(entries, growable: false);
58 }
59
60 /// Execute the command in the currently least loaded isolate.
61 ///
62 /// The optional [load] parameter represents the load that the command
63 /// is causing on the isolate where it runs.
64 /// The number has no fixed meaning, but should be seen as relative to
65 /// other commands run in the same load balancer.
66 /// The `load` must not be negative.
67 ///
68 /// If [timeout] and [onTimeout] are provided, they are forwarded to
69 /// the runner running the function, which will handle a timeout
70 /// as normal.
71 Future run(function(argument), argument, {Duration timeout,
72 onTimeout(),
73 int load: 100}) {
74 RangeError.checkNotNegative(load, "load");
75 _LoadBalancerEntry entry = _first;
76 _increaseLoad(entry, load);
77 return entry.run(this, load, function, argument, timeout, onTimeout);
78 }
79
80 /// Execute the same function in the least loaded [count] isolates.
81 ///
82 /// This guarantees that the function isn't run twice in the same isolate,
83 /// so `count` is not allowed to exceed [length].
84 ///
85 /// The optional [load] parameter represents the load that the command
86 /// is causing on the isolate where it runs.
87 /// The number has no fixed meaning, but should be seen as relative to
88 /// other commands run in the same load balancer.
89 /// The `load` must not be negative.
90 ///
91 /// If [timeout] and [onTimeout] are provided, they are forwarded to
92 /// the runners running the function, which will handle any timeouts
93 /// as normal.
94 List<Future> runMultiple(int count, function(argument), argument,
95 {Duration timeout,
96 onTimeout(),
97 int load: 100}) {
98 RangeError.checkValueInInterval(count, 1, _length, "count");
99 RangeError.checkNotNegative(load, "load");
100 if (count == 1) {
101 return list1(run(function, argument, load: load,
102 timeout: timeout, onTimeout: onTimeout));
103 }
104 List result = new List<Future>(count);
105 if (count == _length) {
106 // No need to change the order of entries in the queue.
107 for (int i = 0; i < count; i++) {
108 _LoadBalancerEntry entry = _queue[i];
109 entry.load += load;
110 result[i] =
111 entry.run(this, load, function, argument, timeout, onTimeout);
112 }
113 } else {
114 // Remove the [count] least loaded services and run the
115 // command on each, then add them back to the queue.
116 // This avoids running the same command twice in the same
117 // isolate.
118 List entries = new List(count);
119 for (int i = 0; i < count; i++) {
120 entries[i] = _removeFirst();
121 }
122 for (int i = 0; i < count; i++) {
123 _LoadBalancerEntry entry = entries[i];
124 entry.load += load;
125 _add(entry);
126 result[i] =
127 entry.run(this, load, function, argument, timeout, onTimeout);
128 }
129 }
130 return result;
131 }
132
133 Future close() {
134 if (_stopFuture != null) return _stopFuture;
135 _stopFuture =
136 MultiError.waitUnordered(_queue.take(_length).map((e) => e.close()));
137 // Remove all entries.
138 for (int i = 0; i < _length; i++) _queue[i].queueIndex = -1;
139 _queue = null;
140 _length = 0;
141 return _stopFuture;
142 }
143
144 /// Place [element] in heap at [index] or above.
145 ///
146 /// Put element into the empty cell at `index`.
147 /// While the `element` has higher priority than the
148 /// parent, swap it with the parent.
149 void _bubbleUp(_LoadBalancerEntry element, int index) {
150 while (index > 0) {
151 int parentIndex = (index - 1) ~/ 2;
152 _LoadBalancerEntry parent = _queue[parentIndex];
153 if (element.compareTo(parent) > 0) break;
154 _queue[index] = parent;
155 parent.queueIndex = index;
156 index = parentIndex;
157 }
158 _queue[index] = element;
159 element.queueIndex = index;
160 }
161
162 /// Place [element] in heap at [index] or above.
163 ///
164 /// Put element into the empty cell at `index`.
165 /// While the `element` has lower priority than either child,
166 /// swap it with the highest priority child.
167 void _bubbleDown(_LoadBalancerEntry element, int index) {
168 while (true) {
169 int childIndex = index * 2 + 1; // Left child index.
170 if (childIndex >= _length) break;
171 _LoadBalancerEntry child = _queue[childIndex];
172 int rightChildIndex = childIndex + 1;
173 if (rightChildIndex < _length) {
174 _LoadBalancerEntry rightChild = _queue[rightChildIndex];
175 if (rightChild.compareTo(child) < 0) {
176 childIndex = rightChildIndex;
177 child = rightChild;
178 }
179 }
180 if (element.compareTo(child) <= 0) break;
181 _queue[index] = child;
182 child.queueIndex = index;
183 index = childIndex;
184 }
185 _queue[index] = element;
186 element.queueIndex = index;
187 }
188
189 /// Removes the entry from the queue, but doesn't stop its service.
190 ///
191 /// The entry is expected to be either added back to the queue
192 /// immediately or have its stop method called.
193 void _remove(_LoadBalancerEntry entry) {
194 int index = entry.queueIndex;
195 if (index < 0) return;
196 entry.queueIndex = -1;
197 _length--;
198 _LoadBalancerEntry replacement = _queue[_length];
199 _queue[_length] = null;
200 if (index < _length) {
201 if (entry.compareTo(replacement) < 0) {
202 _bubbleDown(replacement, index);
203 } else {
204 _bubbleUp(replacement, index);
205 }
206 }
207 }
208
209 /// Adds entry to the queue.
210 void _add(_LoadBalancerEntry entry) {
211 if (_stopFuture != null) throw new StateError("LoadBalancer is stopped");
212 assert(entry.queueIndex < 0);
213 if (_queue.length == _length) {
214 _grow();
215 }
216 int index = _length;
217 _length = index + 1;
218 _bubbleUp(entry, index);
219 }
220
221 void _increaseLoad(_LoadBalancerEntry entry, int load) {
222 assert(load >= 0);
223 entry.load += load;
224 if (entry.inQueue) {
225 _bubbleDown(entry, entry.queueIndex);
226 }
227 }
228
229 void _decreaseLoad(_LoadBalancerEntry entry, int load) {
230 assert(load >= 0);
231 entry.load -= load;
232 if (entry.inQueue) {
233 _bubbleUp(entry, entry.queueIndex);
234 }
235 }
236
237 void _grow() {
238 List newQueue = new List(_length * 2);
239 newQueue.setRange(0, _length, _queue);
240 _queue = newQueue;
241 }
242
243 _LoadBalancerEntry get _first {
244 assert(_length > 0);
245 return _queue[0];
246 }
247
248 _LoadBalancerEntry _removeFirst() {
249 _LoadBalancerEntry result = _first;
250 _remove(result);
251 return result;
252 }
253 }
254
255 class _LoadBalancerEntry implements Comparable<_LoadBalancerEntry> {
256 // The current load on the isolate.
257 int load = 0;
258 // The current index in the heap-queue.
259 // Negative when the entry is not part of the queue.
260 int queueIndex = -1;
261
262 // The service used to send commands to the other isolate.
263 Runner runner;
264
265 _LoadBalancerEntry(Runner runner) : runner = runner;
266
267 /// Whether the entry is still in the queue.
268 bool get inQueue => queueIndex >= 0;
269
270 Future run(LoadBalancer balancer, int load, function(argument), argument,
271 Duration timeout, onTimeout()) {
272 return runner.run(function, argument,
273 timeout: timeout, onTimeout: onTimeout).whenComplete(() {
274 balancer._decreaseLoad(this, load);
275 });
276 }
277
278 Future close() => runner.close();
279
280 int compareTo(_LoadBalancerEntry other) => load - other.load;
281 }
OLDNEW
« no previous file with comments | « packages/isolate/lib/isolate_runner.dart ('k') | packages/isolate/lib/ports.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698