OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 /** |
| 6 * A load-balancing runner pool. |
| 7 */ |
| 8 library dart.pkg.isolate.loadbalancer; |
| 9 |
| 10 import "runner.dart"; |
| 11 import "src/errors.dart"; |
| 12 import "src/lists.dart"; |
| 13 import "dart:async" show Future; |
| 14 |
| 15 /** |
| 16 * A pool of runners, ordered by load. |
| 17 * |
| 18 * Keeps a pool of runners, |
| 19 * and allows running function through the runner with the lowest current load. |
| 20 */ |
| 21 class LoadBalancer implements Runner { |
| 22 // A heap-based priority queue of entries, prioritized by `load`. |
| 23 // Each entry has its own entry in the queue, for faster update. |
| 24 List<_LoadBalancerEntry> _queue; |
| 25 |
| 26 // The number of entries currently in the queue. |
| 27 int _length; |
| 28 |
| 29 // Whether [stop] has been called. |
| 30 Future _stopFuture = null; |
| 31 |
| 32 /** |
| 33 * Create a load balancer for [service] with [size] isolates. |
| 34 */ |
| 35 LoadBalancer(Iterable<Runner> runners) : this._(_createEntries(runners)); |
| 36 |
| 37 LoadBalancer._(List<_LoadBalancerEntry> entries) |
| 38 : _queue = entries, |
| 39 _length = entries.length { |
| 40 for (int i = 0; i < _length; i++) { |
| 41 _queue[i].queueIndex = i; |
| 42 } |
| 43 } |
| 44 |
| 45 /** |
| 46 * The number of runners currently in the pool. |
| 47 */ |
| 48 int get length => _length; |
| 49 |
| 50 /** |
| 51 * Asynchronously create [size] runners and create a `LoadBalancer` of those. |
| 52 * |
| 53 * This is a helper function that makes it easy to create a `LoadBalancer` |
| 54 * with asynchronously created runners, for example: |
| 55 * |
| 56 * var isolatePool = LoadBalancer.create(10, IsolateRunner.spawn); |
| 57 */ |
| 58 static Future<LoadBalancer> create(int size, Future<Runner> createRunner()) { |
| 59 return Future.wait(new Iterable.generate(size, (_) => createRunner()), |
| 60 cleanUp: (Runner runner) { runner.close(); }) |
| 61 .then((runners) => new LoadBalancer(runners)); |
| 62 } |
| 63 |
| 64 static List<_LoadBalancerEntry> _createEntries(Iterable<Runner> runners) { |
| 65 var entries = runners.map((runner) => new _LoadBalancerEntry(runner)); |
| 66 return new List<_LoadBalancerEntry>.from(entries, growable: false); |
| 67 } |
| 68 |
| 69 /** |
| 70 * Execute the command in the currently least loaded isolate. |
| 71 * |
| 72 * The optional [load] parameter represents the load that the command |
| 73 * is causing on the isolate where it runs. |
| 74 * The number has no fixed meaning, but should be seen as relative to |
| 75 * other commands run in the same load balancer. |
| 76 * The `load` must not be negative. |
| 77 * |
| 78 * If [timeout] and [onTimeout] are provided, they are forwarded to |
| 79 * the runner running the function, which will handle a timeout |
| 80 * as normal. |
| 81 */ |
| 82 Future run(function(argument), argument, {Duration timeout, |
| 83 onTimeout(), |
| 84 int load: 100}) { |
| 85 RangeError.checkNotNegative(load, "load"); |
| 86 _LoadBalancerEntry entry = _first; |
| 87 _increaseLoad(entry, load); |
| 88 return entry.run(this, load, function, argument, timeout, onTimeout); |
| 89 } |
| 90 |
| 91 /** |
| 92 * Execute the same function in the least loaded [count] isolates. |
| 93 * |
| 94 * This guarantees that the function isn't run twice in the same isolate, |
| 95 * so `count` is not allowed to exceed [length]. |
| 96 * |
| 97 * The optional [load] parameter represents the load that the command |
| 98 * is causing on the isolate where it runs. |
| 99 * The number has no fixed meaning, but should be seen as relative to |
| 100 * other commands run in the same load balancer. |
| 101 * The `load` must not be negative. |
| 102 * |
| 103 * If [timeout] and [onTimeout] are provided, they are forwarded to |
| 104 * the runners running the function, which will handle any timeouts |
| 105 * as normal. |
| 106 */ |
| 107 List<Future> runMultiple(int count, function(argument), argument, |
| 108 {Duration timeout, |
| 109 onTimeout(), |
| 110 int load: 100}) { |
| 111 RangeError.checkValueInInterval(count, 1, _length, "count"); |
| 112 RangeError.checkNotNegative(load, "load"); |
| 113 if (count == 1) { |
| 114 return list1(run(function, argument, load: load, |
| 115 timeout: timeout, onTimeout: onTimeout)); |
| 116 } |
| 117 List result = new List<Future>(count); |
| 118 if (count == _length) { |
| 119 // No need to change the order of entries in the queue. |
| 120 for (int i = 0; i < count; i++) { |
| 121 _LoadBalancerEntry entry = _queue[i]; |
| 122 entry.load += load; |
| 123 result[i] = entry.run(this, load, function, argument, |
| 124 timeout, onTimeout); |
| 125 } |
| 126 } else { |
| 127 // Remove the [count] least loaded services and run the |
| 128 // command on each, then add them back to the queue. |
| 129 // This avoids running the same command twice in the same |
| 130 // isolate. |
| 131 List entries = new List(count); |
| 132 for (int i = 0; i < count; i++) { |
| 133 entries[i] = _removeFirst(); |
| 134 } |
| 135 for (int i = 0; i < count; i++) { |
| 136 _LoadBalancerEntry entry = entries[i]; |
| 137 entry.load += load; |
| 138 _add(entry); |
| 139 result[i] = entry.run(this, load, function, argument, |
| 140 timeout, onTimeout); |
| 141 } |
| 142 } |
| 143 return result; |
| 144 } |
| 145 |
| 146 Future close() { |
| 147 if (_stopFuture != null) return _stopFuture; |
| 148 _stopFuture = MultiError.waitUnordered(_queue.take(_length) |
| 149 .map((e) => e.close())); |
| 150 // Remove all entries. |
| 151 for (int i = 0; i < _length; i++) _queue[i].queueIndex = -1; |
| 152 _queue = null; |
| 153 _length = 0; |
| 154 return _stopFuture; |
| 155 } |
| 156 |
| 157 /** |
| 158 * Place [element] in heap at [index] or above. |
| 159 * |
| 160 * Put element into the empty cell at `index`. |
| 161 * While the `element` has higher priority than the |
| 162 * parent, swap it with the parent. |
| 163 */ |
| 164 void _bubbleUp(_LoadBalancerEntry element, int index) { |
| 165 while (index > 0) { |
| 166 int parentIndex = (index - 1) ~/ 2; |
| 167 _LoadBalancerEntry parent = _queue[parentIndex]; |
| 168 if (element.compareTo(parent) > 0) break; |
| 169 _queue[index] = parent; |
| 170 parent.queueIndex = index; |
| 171 index = parentIndex; |
| 172 } |
| 173 _queue[index] = element; |
| 174 element.queueIndex = index; |
| 175 } |
| 176 |
| 177 /** |
| 178 * Place [element] in heap at [index] or above. |
| 179 * |
| 180 * Put element into the empty cell at `index`. |
| 181 * While the `element` has lower priority than either child, |
| 182 * swap it with the highest priority child. |
| 183 */ |
| 184 void _bubbleDown(_LoadBalancerEntry element, int index) { |
| 185 while (true) { |
| 186 int childIndex = index * 2 + 1; // Left child index. |
| 187 if (childIndex >= _length) break; |
| 188 _LoadBalancerEntry child = _queue[childIndex]; |
| 189 int rightChildIndex = childIndex + 1; |
| 190 if (rightChildIndex < _length) { |
| 191 _LoadBalancerEntry rightChild = _queue[rightChildIndex]; |
| 192 if (rightChild.compareTo(child) < 0) { |
| 193 childIndex = rightChildIndex; |
| 194 child = rightChild; |
| 195 } |
| 196 } |
| 197 if (element.compareTo(child) <= 0) break; |
| 198 _queue[index] = child; |
| 199 child.queueIndex = index; |
| 200 index = childIndex; |
| 201 } |
| 202 _queue[index] = element; |
| 203 element.queueIndex = index; |
| 204 } |
| 205 |
| 206 /** |
| 207 * Removes the entry from the queue, but doesn't stop its service. |
| 208 * |
| 209 * The entry is expected to be either added back to the queue |
| 210 * immediately or have its stop method called. |
| 211 */ |
| 212 void _remove(_LoadBalancerEntry entry) { |
| 213 int index = entry.queueIndex; |
| 214 if (index < 0) return; |
| 215 entry.queueIndex = -1; |
| 216 _length--; |
| 217 _LoadBalancerEntry replacement = _queue[_length]; |
| 218 _queue[_length] = null; |
| 219 if (index < _length) { |
| 220 if (entry.compareTo(replacement) < 0) { |
| 221 _bubbleDown(replacement, index); |
| 222 } else { |
| 223 _bubbleUp(replacement, index); |
| 224 } |
| 225 } |
| 226 } |
| 227 |
| 228 /** |
| 229 * Adds entry to the queue. |
| 230 */ |
| 231 void _add(_LoadBalancerEntry entry) { |
| 232 if (_stopFuture != null) throw new StateError("LoadBalancer is stopped"); |
| 233 assert(entry.queueIndex < 0); |
| 234 if (_queue.length == _length) { |
| 235 _grow(); |
| 236 } |
| 237 int index = _length; |
| 238 _length = index + 1; |
| 239 _bubbleUp(entry, index); |
| 240 } |
| 241 |
| 242 void _increaseLoad(_LoadBalancerEntry entry, int load) { |
| 243 assert(load >= 0); |
| 244 entry.load += load; |
| 245 if (entry.inQueue) { |
| 246 _bubbleDown(entry, entry.queueIndex); |
| 247 } |
| 248 } |
| 249 |
| 250 void _decreaseLoad(_LoadBalancerEntry entry, int load) { |
| 251 assert(load >= 0); |
| 252 entry.load -= load; |
| 253 if (entry.inQueue) { |
| 254 _bubbleUp(entry, entry.queueIndex); |
| 255 } |
| 256 } |
| 257 |
| 258 void _grow() { |
| 259 List newQueue = new List(_length * 2); |
| 260 newQueue.setRange(0, _length, _queue); |
| 261 _queue = newQueue; |
| 262 } |
| 263 |
| 264 _LoadBalancerEntry get _first { |
| 265 assert(_length > 0); |
| 266 return _queue[0]; |
| 267 } |
| 268 |
| 269 _LoadBalancerEntry _removeFirst() { |
| 270 _LoadBalancerEntry result = _first; |
| 271 _remove(result); |
| 272 return result; |
| 273 } |
| 274 } |
| 275 |
| 276 class _LoadBalancerEntry implements Comparable<_LoadBalancerEntry> { |
| 277 // The current load on the isolate. |
| 278 int load = 0; |
| 279 // The current index in the heap-queue. |
| 280 // Negative when the entry is not part of the queue. |
| 281 int queueIndex = -1; |
| 282 |
| 283 // The service used to send commands to the other isolate. |
| 284 Runner runner; |
| 285 |
| 286 _LoadBalancerEntry(Runner runner) |
| 287 : runner = runner; |
| 288 |
| 289 /** Whether the entry is still in the queue. */ |
| 290 bool get inQueue => queueIndex >= 0; |
| 291 |
| 292 Future run(LoadBalancer balancer, int load, function(argumen), argument, |
| 293 Duration timeout, onTimeout()) { |
| 294 return runner.run(function, argument, |
| 295 timeout: timeout, onTimeout: onTimeout).whenComplete(() { |
| 296 balancer._decreaseLoad(this, load); |
| 297 }); |
| 298 } |
| 299 |
| 300 Future close() => runner.close(); |
| 301 |
| 302 int compareTo(_LoadBalancerEntry other) => load - other.load; |
| 303 } |
| 304 |
| 305 |
OLD | NEW |