| Index: lib/loadbalancer.dart
|
| diff --git a/lib/loadbalancer.dart b/lib/loadbalancer.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..fdb5d0bcf70bd75b011ebc8d27eeda21b3682e5f
|
| --- /dev/null
|
| +++ b/lib/loadbalancer.dart
|
| @@ -0,0 +1,305 @@
|
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +/**
|
| + * A load-balancing runner pool.
|
| + */
|
| +library dart.pkg.isolate.loadbalancer;
|
| +
|
| +import "runner.dart";
|
| +import "src/errors.dart";
|
| +import "src/lists.dart";
|
| +import "dart:async" show Future;
|
| +
|
| +/**
|
| + * A pool of runners, ordered by load.
|
| + *
|
| + * Keeps a pool of runners,
|
| + * and allows running function through the runner with the lowest current load.
|
| + */
|
| +class LoadBalancer implements Runner {
|
| + // A heap-based priority queue of entries, prioritized by `load`.
|
| + // Each entry has its own entry in the queue, for faster update.
|
| + List<_LoadBalancerEntry> _queue;
|
| +
|
| + // The number of entries currently in the queue.
|
| + int _length;
|
| +
|
| + // Whether [stop] has been called.
|
| + Future _stopFuture = null;
|
| +
|
| + /**
|
| + * Create a load balancer for [service] with [size] isolates.
|
| + */
|
| + LoadBalancer(Iterable<Runner> runners) : this._(_createEntries(runners));
|
| +
|
| + LoadBalancer._(List<_LoadBalancerEntry> entries)
|
| + : _queue = entries,
|
| + _length = entries.length {
|
| + for (int i = 0; i < _length; i++) {
|
| + _queue[i].queueIndex = i;
|
| + }
|
| + }
|
| +
|
| + /**
|
| + * The number of runners currently in the pool.
|
| + */
|
| + int get length => _length;
|
| +
|
| + /**
|
| + * Asynchronously create [size] runners and create a `LoadBalancer` of those.
|
| + *
|
| + * This is a helper function that makes it easy to create a `LoadBalancer`
|
| + * with asynchronously created runners, for example:
|
| + *
|
| + * var isolatePool = LoadBalancer.create(10, IsolateRunner.spawn);
|
| + */
|
| + static Future<LoadBalancer> create(int size, Future<Runner> createRunner()) {
|
| + return Future.wait(new Iterable.generate(size, (_) => createRunner()),
|
| + cleanUp: (Runner runner) { runner.close(); })
|
| + .then((runners) => new LoadBalancer(runners));
|
| + }
|
| +
|
| + static List<_LoadBalancerEntry> _createEntries(Iterable<Runner> runners) {
|
| + var entries = runners.map((runner) => new _LoadBalancerEntry(runner));
|
| + return new List<_LoadBalancerEntry>.from(entries, growable: false);
|
| + }
|
| +
|
| + /**
|
| + * Execute the command in the currently least loaded isolate.
|
| + *
|
| + * The optional [load] parameter represents the load that the command
|
| + * is causing on the isolate where it runs.
|
| + * The number has no fixed meaning, but should be seen as relative to
|
| + * other commands run in the same load balancer.
|
| + * The `load` must not be negative.
|
| + *
|
| + * If [timeout] and [onTimeout] are provided, they are forwarded to
|
| + * the runner running the function, which will handle a timeout
|
| + * as normal.
|
| + */
|
| + Future run(function(argument), argument, {Duration timeout,
|
| + onTimeout(),
|
| + int load: 100}) {
|
| + RangeError.checkNotNegative(load, "load");
|
| + _LoadBalancerEntry entry = _first;
|
| + _increaseLoad(entry, load);
|
| + return entry.run(this, load, function, argument, timeout, onTimeout);
|
| + }
|
| +
|
| + /**
|
| + * Execute the same function in the least loaded [count] isolates.
|
| + *
|
| + * This guarantees that the function isn't run twice in the same isolate,
|
| + * so `count` is not allowed to exceed [length].
|
| + *
|
| + * The optional [load] parameter represents the load that the command
|
| + * is causing on the isolate where it runs.
|
| + * The number has no fixed meaning, but should be seen as relative to
|
| + * other commands run in the same load balancer.
|
| + * The `load` must not be negative.
|
| + *
|
| + * If [timeout] and [onTimeout] are provided, they are forwarded to
|
| + * the runners running the function, which will handle any timeouts
|
| + * as normal.
|
| + */
|
| + List<Future> runMultiple(int count, function(argument), argument,
|
| + {Duration timeout,
|
| + onTimeout(),
|
| + int load: 100}) {
|
| + RangeError.checkValueInInterval(count, 1, _length, "count");
|
| + RangeError.checkNotNegative(load, "load");
|
| + if (count == 1) {
|
| + return list1(run(function, argument, load: load,
|
| + timeout: timeout, onTimeout: onTimeout));
|
| + }
|
| + List result = new List<Future>(count);
|
| + if (count == _length) {
|
| + // No need to change the order of entries in the queue.
|
| + for (int i = 0; i < count; i++) {
|
| + _LoadBalancerEntry entry = _queue[i];
|
| + entry.load += load;
|
| + result[i] = entry.run(this, load, function, argument,
|
| + timeout, onTimeout);
|
| + }
|
| + } else {
|
| + // Remove the [count] least loaded services and run the
|
| + // command on each, then add them back to the queue.
|
| + // This avoids running the same command twice in the same
|
| + // isolate.
|
| + List entries = new List(count);
|
| + for (int i = 0; i < count; i++) {
|
| + entries[i] = _removeFirst();
|
| + }
|
| + for (int i = 0; i < count; i++) {
|
| + _LoadBalancerEntry entry = entries[i];
|
| + entry.load += load;
|
| + _add(entry);
|
| + result[i] = entry.run(this, load, function, argument,
|
| + timeout, onTimeout);
|
| + }
|
| + }
|
| + return result;
|
| + }
|
| +
|
| + Future close() {
|
| + if (_stopFuture != null) return _stopFuture;
|
| + _stopFuture = MultiError.waitUnordered(_queue.take(_length)
|
| + .map((e) => e.close()));
|
| + // Remove all entries.
|
| + for (int i = 0; i < _length; i++) _queue[i].queueIndex = -1;
|
| + _queue = null;
|
| + _length = 0;
|
| + return _stopFuture;
|
| + }
|
| +
|
| + /**
|
| + * Place [element] in heap at [index] or above.
|
| + *
|
| + * Put element into the empty cell at `index`.
|
| + * While the `element` has higher priority than the
|
| + * parent, swap it with the parent.
|
| + */
|
| + void _bubbleUp(_LoadBalancerEntry element, int index) {
|
| + while (index > 0) {
|
| + int parentIndex = (index - 1) ~/ 2;
|
| + _LoadBalancerEntry parent = _queue[parentIndex];
|
| + if (element.compareTo(parent) > 0) break;
|
| + _queue[index] = parent;
|
| + parent.queueIndex = index;
|
| + index = parentIndex;
|
| + }
|
| + _queue[index] = element;
|
| + element.queueIndex = index;
|
| + }
|
| +
|
| + /**
|
| + * Place [element] in heap at [index] or above.
|
| + *
|
| + * Put element into the empty cell at `index`.
|
| + * While the `element` has lower priority than either child,
|
| + * swap it with the highest priority child.
|
| + */
|
| + void _bubbleDown(_LoadBalancerEntry element, int index) {
|
| + while (true) {
|
| + int childIndex = index * 2 + 1; // Left child index.
|
| + if (childIndex >= _length) break;
|
| + _LoadBalancerEntry child = _queue[childIndex];
|
| + int rightChildIndex = childIndex + 1;
|
| + if (rightChildIndex < _length) {
|
| + _LoadBalancerEntry rightChild = _queue[rightChildIndex];
|
| + if (rightChild.compareTo(child) < 0) {
|
| + childIndex = rightChildIndex;
|
| + child = rightChild;
|
| + }
|
| + }
|
| + if (element.compareTo(child) <= 0) break;
|
| + _queue[index] = child;
|
| + child.queueIndex = index;
|
| + index = childIndex;
|
| + }
|
| + _queue[index] = element;
|
| + element.queueIndex = index;
|
| + }
|
| +
|
| + /**
|
| + * Removes the entry from the queue, but doesn't stop its service.
|
| + *
|
| + * The entry is expected to be either added back to the queue
|
| + * immediately or have its stop method called.
|
| + */
|
| + void _remove(_LoadBalancerEntry entry) {
|
| + int index = entry.queueIndex;
|
| + if (index < 0) return;
|
| + entry.queueIndex = -1;
|
| + _length--;
|
| + _LoadBalancerEntry replacement = _queue[_length];
|
| + _queue[_length] = null;
|
| + if (index < _length) {
|
| + if (entry.compareTo(replacement) < 0) {
|
| + _bubbleDown(replacement, index);
|
| + } else {
|
| + _bubbleUp(replacement, index);
|
| + }
|
| + }
|
| + }
|
| +
|
| + /**
|
| + * Adds entry to the queue.
|
| + */
|
| + void _add(_LoadBalancerEntry entry) {
|
| + if (_stopFuture != null) throw new StateError("LoadBalancer is stopped");
|
| + assert(entry.queueIndex < 0);
|
| + if (_queue.length == _length) {
|
| + _grow();
|
| + }
|
| + int index = _length;
|
| + _length = index + 1;
|
| + _bubbleUp(entry, index);
|
| + }
|
| +
|
| + void _increaseLoad(_LoadBalancerEntry entry, int load) {
|
| + assert(load >= 0);
|
| + entry.load += load;
|
| + if (entry.inQueue) {
|
| + _bubbleDown(entry, entry.queueIndex);
|
| + }
|
| + }
|
| +
|
| + void _decreaseLoad(_LoadBalancerEntry entry, int load) {
|
| + assert(load >= 0);
|
| + entry.load -= load;
|
| + if (entry.inQueue) {
|
| + _bubbleUp(entry, entry.queueIndex);
|
| + }
|
| + }
|
| +
|
| + void _grow() {
|
| + List newQueue = new List(_length * 2);
|
| + newQueue.setRange(0, _length, _queue);
|
| + _queue = newQueue;
|
| + }
|
| +
|
| + _LoadBalancerEntry get _first {
|
| + assert(_length > 0);
|
| + return _queue[0];
|
| + }
|
| +
|
| + _LoadBalancerEntry _removeFirst() {
|
| + _LoadBalancerEntry result = _first;
|
| + _remove(result);
|
| + return result;
|
| + }
|
| +}
|
| +
|
| +class _LoadBalancerEntry implements Comparable<_LoadBalancerEntry> {
|
| + // The current load on the isolate.
|
| + int load = 0;
|
| + // The current index in the heap-queue.
|
| + // Negative when the entry is not part of the queue.
|
| + int queueIndex = -1;
|
| +
|
| + // The service used to send commands to the other isolate.
|
| + Runner runner;
|
| +
|
| + _LoadBalancerEntry(Runner runner)
|
| + : runner = runner;
|
| +
|
| + /** Whether the entry is still in the queue. */
|
| + bool get inQueue => queueIndex >= 0;
|
| +
|
| + Future run(LoadBalancer balancer, int load, function(argumen), argument,
|
| + Duration timeout, onTimeout()) {
|
| + return runner.run(function, argument,
|
| + timeout: timeout, onTimeout: onTimeout).whenComplete(() {
|
| + balancer._decreaseLoad(this, load);
|
| + });
|
| + }
|
| +
|
| + Future close() => runner.close();
|
| +
|
| + int compareTo(_LoadBalancerEntry other) => load - other.load;
|
| +}
|
| +
|
| +
|
|
|