Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1260)

Side by Side Diff: lib/loadbalancer.dart

Issue 1025293003: pkg/isolate: library renaming, removed unused method, fix creation of TimeoutException (Closed) Base URL: https://github.com/dart-lang/isolate.git@master
Patch Set: Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 /// A load-balancing runner pool.
6 library dart.pkg.isolate.loadbalancer;
7
8 import "runner.dart";
9 import "src/errors.dart";
10 import "src/lists.dart";
11 import "dart:async" show Future;
12
13 /// A pool of runners, ordered by load.
14 ///
15 /// Keeps a pool of runners,
16 /// and allows running function through the runner with the lowest current load.
17 class LoadBalancer implements Runner {
18 // A heap-based priority queue of entries, prioritized by `load`.
19 // Each entry has its own entry in the queue, for faster update.
20 List<_LoadBalancerEntry> _queue;
21
22 // The number of entries currently in the queue.
23 int _length;
24
25 // Whether [stop] has been called.
26 Future _stopFuture = null;
27
28 /// Create a load balancer for [service] with [size] isolates.
29 LoadBalancer(Iterable<Runner> runners) : this._(_createEntries(runners));
30
31 LoadBalancer._(List<_LoadBalancerEntry> entries)
32 : _queue = entries,
33 _length = entries.length {
34 for (int i = 0; i < _length; i++) {
35 _queue[i].queueIndex = i;
36 }
37 }
38
39 /// The number of runners currently in the pool.
40 int get length => _length;
41
42 /// Asynchronously create [size] runners and create a `LoadBalancer` of those.
43 ///
44 /// This is a helper function that makes it easy to create a `LoadBalancer`
45 /// with asynchronously created runners, for example:
46 ///
47 /// var isolatePool = LoadBalancer.create(10, IsolateRunner.spawn);
48 static Future<LoadBalancer> create(int size, Future<Runner> createRunner()) {
49 return Future.wait(new Iterable.generate(size, (_) => createRunner()),
50 cleanUp: (Runner runner) { runner.close(); })
51 .then((runners) => new LoadBalancer(runners));
52 }
53
54 static List<_LoadBalancerEntry> _createEntries(Iterable<Runner> runners) {
55 var entries = runners.map((runner) => new _LoadBalancerEntry(runner));
56 return new List<_LoadBalancerEntry>.from(entries, growable: false);
57 }
58
59 /// Execute the command in the currently least loaded isolate.
60 ///
61 /// The optional [load] parameter represents the load that the command
62 /// is causing on the isolate where it runs.
63 /// The number has no fixed meaning, but should be seen as relative to
64 /// other commands run in the same load balancer.
65 /// The `load` must not be negative.
66 ///
67 /// If [timeout] and [onTimeout] are provided, they are forwarded to
68 /// the runner running the function, which will handle a timeout
69 /// as normal.
70 Future run(function(argument), argument, {Duration timeout,
71 onTimeout(),
72 int load: 100}) {
73 RangeError.checkNotNegative(load, "load");
74 _LoadBalancerEntry entry = _first;
75 _increaseLoad(entry, load);
76 return entry.run(this, load, function, argument, timeout, onTimeout);
77 }
78
79 /// Execute the same function in the least loaded [count] isolates.
80 ///
81 /// This guarantees that the function isn't run twice in the same isolate,
82 /// so `count` is not allowed to exceed [length].
83 ///
84 /// The optional [load] parameter represents the load that the command
85 /// is causing on the isolate where it runs.
86 /// The number has no fixed meaning, but should be seen as relative to
87 /// other commands run in the same load balancer.
88 /// The `load` must not be negative.
89 ///
90 /// If [timeout] and [onTimeout] are provided, they are forwarded to
91 /// the runners running the function, which will handle any timeouts
92 /// as normal.
93 List<Future> runMultiple(int count, function(argument), argument,
94 {Duration timeout,
95 onTimeout(),
96 int load: 100}) {
97 RangeError.checkValueInInterval(count, 1, _length, "count");
98 RangeError.checkNotNegative(load, "load");
99 if (count == 1) {
100 return list1(run(function, argument, load: load,
101 timeout: timeout, onTimeout: onTimeout));
102 }
103 List result = new List<Future>(count);
104 if (count == _length) {
105 // No need to change the order of entries in the queue.
106 for (int i = 0; i < count; i++) {
107 _LoadBalancerEntry entry = _queue[i];
108 entry.load += load;
109 result[i] =
110 entry.run(this, load, function, argument, timeout, onTimeout);
111 }
112 } else {
113 // Remove the [count] least loaded services and run the
114 // command on each, then add them back to the queue.
115 // This avoids running the same command twice in the same
116 // isolate.
117 List entries = new List(count);
118 for (int i = 0; i < count; i++) {
119 entries[i] = _removeFirst();
120 }
121 for (int i = 0; i < count; i++) {
122 _LoadBalancerEntry entry = entries[i];
123 entry.load += load;
124 _add(entry);
125 result[i] =
126 entry.run(this, load, function, argument, timeout, onTimeout);
127 }
128 }
129 return result;
130 }
131
132 Future close() {
133 if (_stopFuture != null) return _stopFuture;
134 _stopFuture =
135 MultiError.waitUnordered(_queue.take(_length).map((e) => e.close()));
136 // Remove all entries.
137 for (int i = 0; i < _length; i++) _queue[i].queueIndex = -1;
138 _queue = null;
139 _length = 0;
140 return _stopFuture;
141 }
142
143 /// Place [element] in heap at [index] or above.
144 ///
145 /// Put element into the empty cell at `index`.
146 /// While the `element` has higher priority than the
147 /// parent, swap it with the parent.
148 void _bubbleUp(_LoadBalancerEntry element, int index) {
149 while (index > 0) {
150 int parentIndex = (index - 1) ~/ 2;
151 _LoadBalancerEntry parent = _queue[parentIndex];
152 if (element.compareTo(parent) > 0) break;
153 _queue[index] = parent;
154 parent.queueIndex = index;
155 index = parentIndex;
156 }
157 _queue[index] = element;
158 element.queueIndex = index;
159 }
160
161 /// Place [element] in heap at [index] or above.
162 ///
163 /// Put element into the empty cell at `index`.
164 /// While the `element` has lower priority than either child,
165 /// swap it with the highest priority child.
166 void _bubbleDown(_LoadBalancerEntry element, int index) {
167 while (true) {
168 int childIndex = index * 2 + 1; // Left child index.
169 if (childIndex >= _length) break;
170 _LoadBalancerEntry child = _queue[childIndex];
171 int rightChildIndex = childIndex + 1;
172 if (rightChildIndex < _length) {
173 _LoadBalancerEntry rightChild = _queue[rightChildIndex];
174 if (rightChild.compareTo(child) < 0) {
175 childIndex = rightChildIndex;
176 child = rightChild;
177 }
178 }
179 if (element.compareTo(child) <= 0) break;
180 _queue[index] = child;
181 child.queueIndex = index;
182 index = childIndex;
183 }
184 _queue[index] = element;
185 element.queueIndex = index;
186 }
187
188 /// Removes the entry from the queue, but doesn't stop its service.
189 ///
190 /// The entry is expected to be either added back to the queue
191 /// immediately or have its stop method called.
192 void _remove(_LoadBalancerEntry entry) {
193 int index = entry.queueIndex;
194 if (index < 0) return;
195 entry.queueIndex = -1;
196 _length--;
197 _LoadBalancerEntry replacement = _queue[_length];
198 _queue[_length] = null;
199 if (index < _length) {
200 if (entry.compareTo(replacement) < 0) {
201 _bubbleDown(replacement, index);
202 } else {
203 _bubbleUp(replacement, index);
204 }
205 }
206 }
207
208 /// Adds entry to the queue.
209 void _add(_LoadBalancerEntry entry) {
210 if (_stopFuture != null) throw new StateError("LoadBalancer is stopped");
211 assert(entry.queueIndex < 0);
212 if (_queue.length == _length) {
213 _grow();
214 }
215 int index = _length;
216 _length = index + 1;
217 _bubbleUp(entry, index);
218 }
219
220 void _increaseLoad(_LoadBalancerEntry entry, int load) {
221 assert(load >= 0);
222 entry.load += load;
223 if (entry.inQueue) {
224 _bubbleDown(entry, entry.queueIndex);
225 }
226 }
227
228 void _decreaseLoad(_LoadBalancerEntry entry, int load) {
229 assert(load >= 0);
230 entry.load -= load;
231 if (entry.inQueue) {
232 _bubbleUp(entry, entry.queueIndex);
233 }
234 }
235
236 void _grow() {
237 List newQueue = new List(_length * 2);
238 newQueue.setRange(0, _length, _queue);
239 _queue = newQueue;
240 }
241
242 _LoadBalancerEntry get _first {
243 assert(_length > 0);
244 return _queue[0];
245 }
246
247 _LoadBalancerEntry _removeFirst() {
248 _LoadBalancerEntry result = _first;
249 _remove(result);
250 return result;
251 }
252 }
253
254 class _LoadBalancerEntry implements Comparable<_LoadBalancerEntry> {
255 // The current load on the isolate.
256 int load = 0;
257 // The current index in the heap-queue.
258 // Negative when the entry is not part of the queue.
259 int queueIndex = -1;
260
261 // The service used to send commands to the other isolate.
262 Runner runner;
263
264 _LoadBalancerEntry(Runner runner) : runner = runner;
265
266 /// Whether the entry is still in the queue.
267 bool get inQueue => queueIndex >= 0;
268
269 Future run(LoadBalancer balancer, int load, function(argumen), argument,
270 Duration timeout, onTimeout()) {
271 return runner.run(function, argument,
272 timeout: timeout, onTimeout: onTimeout).whenComplete(() {
273 balancer._decreaseLoad(this, load);
274 });
275 }
276
277 Future close() => runner.close();
278
279 int compareTo(_LoadBalancerEntry other) => load - other.load;
280 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698