| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 /// A load-balancing runner pool. | |
| 6 library dart.pkg.isolate.loadbalancer; | |
| 7 | |
| 8 import "runner.dart"; | |
| 9 import "src/errors.dart"; | |
| 10 import "src/lists.dart"; | |
| 11 import "dart:async" show Future; | |
| 12 | |
| 13 /// A pool of runners, ordered by load. | |
| 14 /// | |
| 15 /// Keeps a pool of runners, | |
| 16 /// and allows running function through the runner with the lowest current load. | |
| 17 class LoadBalancer implements Runner { | |
| 18 // A heap-based priority queue of entries, prioritized by `load`. | |
| 19 // Each entry has its own entry in the queue, for faster update. | |
| 20 List<_LoadBalancerEntry> _queue; | |
| 21 | |
| 22 // The number of entries currently in the queue. | |
| 23 int _length; | |
| 24 | |
| 25 // Whether [stop] has been called. | |
| 26 Future _stopFuture = null; | |
| 27 | |
| 28 /// Create a load balancer for [service] with [size] isolates. | |
| 29 LoadBalancer(Iterable<Runner> runners) : this._(_createEntries(runners)); | |
| 30 | |
| 31 LoadBalancer._(List<_LoadBalancerEntry> entries) | |
| 32 : _queue = entries, | |
| 33 _length = entries.length { | |
| 34 for (int i = 0; i < _length; i++) { | |
| 35 _queue[i].queueIndex = i; | |
| 36 } | |
| 37 } | |
| 38 | |
| 39 /// The number of runners currently in the pool. | |
| 40 int get length => _length; | |
| 41 | |
| 42 /// Asynchronously create [size] runners and create a `LoadBalancer` of those. | |
| 43 /// | |
| 44 /// This is a helper function that makes it easy to create a `LoadBalancer` | |
| 45 /// with asynchronously created runners, for example: | |
| 46 /// | |
| 47 /// var isolatePool = LoadBalancer.create(10, IsolateRunner.spawn); | |
| 48 static Future<LoadBalancer> create(int size, Future<Runner> createRunner()) { | |
| 49 return Future.wait(new Iterable.generate(size, (_) => createRunner()), | |
| 50 cleanUp: (Runner runner) { runner.close(); }) | |
| 51 .then((runners) => new LoadBalancer(runners)); | |
| 52 } | |
| 53 | |
| 54 static List<_LoadBalancerEntry> _createEntries(Iterable<Runner> runners) { | |
| 55 var entries = runners.map((runner) => new _LoadBalancerEntry(runner)); | |
| 56 return new List<_LoadBalancerEntry>.from(entries, growable: false); | |
| 57 } | |
| 58 | |
| 59 /// Execute the command in the currently least loaded isolate. | |
| 60 /// | |
| 61 /// The optional [load] parameter represents the load that the command | |
| 62 /// is causing on the isolate where it runs. | |
| 63 /// The number has no fixed meaning, but should be seen as relative to | |
| 64 /// other commands run in the same load balancer. | |
| 65 /// The `load` must not be negative. | |
| 66 /// | |
| 67 /// If [timeout] and [onTimeout] are provided, they are forwarded to | |
| 68 /// the runner running the function, which will handle a timeout | |
| 69 /// as normal. | |
| 70 Future run(function(argument), argument, {Duration timeout, | |
| 71 onTimeout(), | |
| 72 int load: 100}) { | |
| 73 RangeError.checkNotNegative(load, "load"); | |
| 74 _LoadBalancerEntry entry = _first; | |
| 75 _increaseLoad(entry, load); | |
| 76 return entry.run(this, load, function, argument, timeout, onTimeout); | |
| 77 } | |
| 78 | |
| 79 /// Execute the same function in the least loaded [count] isolates. | |
| 80 /// | |
| 81 /// This guarantees that the function isn't run twice in the same isolate, | |
| 82 /// so `count` is not allowed to exceed [length]. | |
| 83 /// | |
| 84 /// The optional [load] parameter represents the load that the command | |
| 85 /// is causing on the isolate where it runs. | |
| 86 /// The number has no fixed meaning, but should be seen as relative to | |
| 87 /// other commands run in the same load balancer. | |
| 88 /// The `load` must not be negative. | |
| 89 /// | |
| 90 /// If [timeout] and [onTimeout] are provided, they are forwarded to | |
| 91 /// the runners running the function, which will handle any timeouts | |
| 92 /// as normal. | |
| 93 List<Future> runMultiple(int count, function(argument), argument, | |
| 94 {Duration timeout, | |
| 95 onTimeout(), | |
| 96 int load: 100}) { | |
| 97 RangeError.checkValueInInterval(count, 1, _length, "count"); | |
| 98 RangeError.checkNotNegative(load, "load"); | |
| 99 if (count == 1) { | |
| 100 return list1(run(function, argument, load: load, | |
| 101 timeout: timeout, onTimeout: onTimeout)); | |
| 102 } | |
| 103 List result = new List<Future>(count); | |
| 104 if (count == _length) { | |
| 105 // No need to change the order of entries in the queue. | |
| 106 for (int i = 0; i < count; i++) { | |
| 107 _LoadBalancerEntry entry = _queue[i]; | |
| 108 entry.load += load; | |
| 109 result[i] = | |
| 110 entry.run(this, load, function, argument, timeout, onTimeout); | |
| 111 } | |
| 112 } else { | |
| 113 // Remove the [count] least loaded services and run the | |
| 114 // command on each, then add them back to the queue. | |
| 115 // This avoids running the same command twice in the same | |
| 116 // isolate. | |
| 117 List entries = new List(count); | |
| 118 for (int i = 0; i < count; i++) { | |
| 119 entries[i] = _removeFirst(); | |
| 120 } | |
| 121 for (int i = 0; i < count; i++) { | |
| 122 _LoadBalancerEntry entry = entries[i]; | |
| 123 entry.load += load; | |
| 124 _add(entry); | |
| 125 result[i] = | |
| 126 entry.run(this, load, function, argument, timeout, onTimeout); | |
| 127 } | |
| 128 } | |
| 129 return result; | |
| 130 } | |
| 131 | |
| 132 Future close() { | |
| 133 if (_stopFuture != null) return _stopFuture; | |
| 134 _stopFuture = | |
| 135 MultiError.waitUnordered(_queue.take(_length).map((e) => e.close())); | |
| 136 // Remove all entries. | |
| 137 for (int i = 0; i < _length; i++) _queue[i].queueIndex = -1; | |
| 138 _queue = null; | |
| 139 _length = 0; | |
| 140 return _stopFuture; | |
| 141 } | |
| 142 | |
| 143 /// Place [element] in heap at [index] or above. | |
| 144 /// | |
| 145 /// Put element into the empty cell at `index`. | |
| 146 /// While the `element` has higher priority than the | |
| 147 /// parent, swap it with the parent. | |
| 148 void _bubbleUp(_LoadBalancerEntry element, int index) { | |
| 149 while (index > 0) { | |
| 150 int parentIndex = (index - 1) ~/ 2; | |
| 151 _LoadBalancerEntry parent = _queue[parentIndex]; | |
| 152 if (element.compareTo(parent) > 0) break; | |
| 153 _queue[index] = parent; | |
| 154 parent.queueIndex = index; | |
| 155 index = parentIndex; | |
| 156 } | |
| 157 _queue[index] = element; | |
| 158 element.queueIndex = index; | |
| 159 } | |
| 160 | |
| 161 /// Place [element] in heap at [index] or above. | |
| 162 /// | |
| 163 /// Put element into the empty cell at `index`. | |
| 164 /// While the `element` has lower priority than either child, | |
| 165 /// swap it with the highest priority child. | |
| 166 void _bubbleDown(_LoadBalancerEntry element, int index) { | |
| 167 while (true) { | |
| 168 int childIndex = index * 2 + 1; // Left child index. | |
| 169 if (childIndex >= _length) break; | |
| 170 _LoadBalancerEntry child = _queue[childIndex]; | |
| 171 int rightChildIndex = childIndex + 1; | |
| 172 if (rightChildIndex < _length) { | |
| 173 _LoadBalancerEntry rightChild = _queue[rightChildIndex]; | |
| 174 if (rightChild.compareTo(child) < 0) { | |
| 175 childIndex = rightChildIndex; | |
| 176 child = rightChild; | |
| 177 } | |
| 178 } | |
| 179 if (element.compareTo(child) <= 0) break; | |
| 180 _queue[index] = child; | |
| 181 child.queueIndex = index; | |
| 182 index = childIndex; | |
| 183 } | |
| 184 _queue[index] = element; | |
| 185 element.queueIndex = index; | |
| 186 } | |
| 187 | |
| 188 /// Removes the entry from the queue, but doesn't stop its service. | |
| 189 /// | |
| 190 /// The entry is expected to be either added back to the queue | |
| 191 /// immediately or have its stop method called. | |
| 192 void _remove(_LoadBalancerEntry entry) { | |
| 193 int index = entry.queueIndex; | |
| 194 if (index < 0) return; | |
| 195 entry.queueIndex = -1; | |
| 196 _length--; | |
| 197 _LoadBalancerEntry replacement = _queue[_length]; | |
| 198 _queue[_length] = null; | |
| 199 if (index < _length) { | |
| 200 if (entry.compareTo(replacement) < 0) { | |
| 201 _bubbleDown(replacement, index); | |
| 202 } else { | |
| 203 _bubbleUp(replacement, index); | |
| 204 } | |
| 205 } | |
| 206 } | |
| 207 | |
| 208 /// Adds entry to the queue. | |
| 209 void _add(_LoadBalancerEntry entry) { | |
| 210 if (_stopFuture != null) throw new StateError("LoadBalancer is stopped"); | |
| 211 assert(entry.queueIndex < 0); | |
| 212 if (_queue.length == _length) { | |
| 213 _grow(); | |
| 214 } | |
| 215 int index = _length; | |
| 216 _length = index + 1; | |
| 217 _bubbleUp(entry, index); | |
| 218 } | |
| 219 | |
| 220 void _increaseLoad(_LoadBalancerEntry entry, int load) { | |
| 221 assert(load >= 0); | |
| 222 entry.load += load; | |
| 223 if (entry.inQueue) { | |
| 224 _bubbleDown(entry, entry.queueIndex); | |
| 225 } | |
| 226 } | |
| 227 | |
| 228 void _decreaseLoad(_LoadBalancerEntry entry, int load) { | |
| 229 assert(load >= 0); | |
| 230 entry.load -= load; | |
| 231 if (entry.inQueue) { | |
| 232 _bubbleUp(entry, entry.queueIndex); | |
| 233 } | |
| 234 } | |
| 235 | |
| 236 void _grow() { | |
| 237 List newQueue = new List(_length * 2); | |
| 238 newQueue.setRange(0, _length, _queue); | |
| 239 _queue = newQueue; | |
| 240 } | |
| 241 | |
| 242 _LoadBalancerEntry get _first { | |
| 243 assert(_length > 0); | |
| 244 return _queue[0]; | |
| 245 } | |
| 246 | |
| 247 _LoadBalancerEntry _removeFirst() { | |
| 248 _LoadBalancerEntry result = _first; | |
| 249 _remove(result); | |
| 250 return result; | |
| 251 } | |
| 252 } | |
| 253 | |
| 254 class _LoadBalancerEntry implements Comparable<_LoadBalancerEntry> { | |
| 255 // The current load on the isolate. | |
| 256 int load = 0; | |
| 257 // The current index in the heap-queue. | |
| 258 // Negative when the entry is not part of the queue. | |
| 259 int queueIndex = -1; | |
| 260 | |
| 261 // The service used to send commands to the other isolate. | |
| 262 Runner runner; | |
| 263 | |
| 264 _LoadBalancerEntry(Runner runner) : runner = runner; | |
| 265 | |
| 266 /// Whether the entry is still in the queue. | |
| 267 bool get inQueue => queueIndex >= 0; | |
| 268 | |
| 269 Future run(LoadBalancer balancer, int load, function(argumen), argument, | |
| 270 Duration timeout, onTimeout()) { | |
| 271 return runner.run(function, argument, | |
| 272 timeout: timeout, onTimeout: onTimeout).whenComplete(() { | |
| 273 balancer._decreaseLoad(this, load); | |
| 274 }); | |
| 275 } | |
| 276 | |
| 277 Future close() => runner.close(); | |
| 278 | |
| 279 int compareTo(_LoadBalancerEntry other) => load - other.load; | |
| 280 } | |
| OLD | NEW |