Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(896)

Unified Diff: packages/isolate/lib/load_balancer.dart

Issue 2990843002: Removed fixed dependencies (Closed)
Patch Set: Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « packages/isolate/lib/isolate_runner.dart ('k') | packages/isolate/lib/ports.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: packages/isolate/lib/load_balancer.dart
diff --git a/packages/isolate/lib/load_balancer.dart b/packages/isolate/lib/load_balancer.dart
new file mode 100644
index 0000000000000000000000000000000000000000..3f0828845831ee09ae1aa3526fcba2477b5d2820
--- /dev/null
+++ b/packages/isolate/lib/load_balancer.dart
@@ -0,0 +1,281 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/// A load-balancing runner pool.
+library isolate.load_balancer;
+
+import 'dart:async' show Future;
+
+import 'runner.dart';
+import 'src/errors.dart';
+import 'src/lists.dart';
+
+/// A pool of runners, ordered by load.
+///
+/// Keeps a pool of runners,
+/// and allows running function through the runner with the lowest current load.
+class LoadBalancer implements Runner {
+ // A heap-based priority queue of entries, prioritized by `load`.
+ // Each entry has its own entry in the queue, for faster update.
+ List<_LoadBalancerEntry> _queue;
+
+ // The number of entries currently in the queue.
+ int _length;
+
+ // Whether [stop] has been called.
+ Future _stopFuture = null;
+
+ /// Create a load balancer for [service] with [size] isolates.
+ LoadBalancer(Iterable<Runner> runners) : this._(_createEntries(runners));
+
+ LoadBalancer._(List<_LoadBalancerEntry> entries)
+ : _queue = entries,
+ _length = entries.length {
+ for (int i = 0; i < _length; i++) {
+ _queue[i].queueIndex = i;
+ }
+ }
+
+ /// The number of runners currently in the pool.
+ int get length => _length;
+
+ /// Asynchronously create [size] runners and create a `LoadBalancer` of those.
+ ///
+ /// This is a helper function that makes it easy to create a `LoadBalancer`
+ /// with asynchronously created runners, for example:
+ ///
+ /// var isolatePool = LoadBalancer.create(10, IsolateRunner.spawn);
+ static Future<LoadBalancer> create(int size, Future<Runner> createRunner()) {
+ return Future.wait(new Iterable.generate(size, (_) => createRunner()),
+ cleanUp: (Runner runner) { runner.close(); })
+ .then((runners) => new LoadBalancer(runners));
+ }
+
+ static List<_LoadBalancerEntry> _createEntries(Iterable<Runner> runners) {
+ var entries = runners.map((runner) => new _LoadBalancerEntry(runner));
+ return new List<_LoadBalancerEntry>.from(entries, growable: false);
+ }
+
+ /// Execute the command in the currently least loaded isolate.
+ ///
+ /// The optional [load] parameter represents the load that the command
+ /// is causing on the isolate where it runs.
+ /// The number has no fixed meaning, but should be seen as relative to
+ /// other commands run in the same load balancer.
+ /// The `load` must not be negative.
+ ///
+ /// If [timeout] and [onTimeout] are provided, they are forwarded to
+ /// the runner running the function, which will handle a timeout
+ /// as normal.
+ Future run(function(argument), argument, {Duration timeout,
+ onTimeout(),
+ int load: 100}) {
+ RangeError.checkNotNegative(load, "load");
+ _LoadBalancerEntry entry = _first;
+ _increaseLoad(entry, load);
+ return entry.run(this, load, function, argument, timeout, onTimeout);
+ }
+
+ /// Execute the same function in the least loaded [count] isolates.
+ ///
+ /// This guarantees that the function isn't run twice in the same isolate,
+ /// so `count` is not allowed to exceed [length].
+ ///
+ /// The optional [load] parameter represents the load that the command
+ /// is causing on the isolate where it runs.
+ /// The number has no fixed meaning, but should be seen as relative to
+ /// other commands run in the same load balancer.
+ /// The `load` must not be negative.
+ ///
+ /// If [timeout] and [onTimeout] are provided, they are forwarded to
+ /// the runners running the function, which will handle any timeouts
+ /// as normal.
+ List<Future> runMultiple(int count, function(argument), argument,
+ {Duration timeout,
+ onTimeout(),
+ int load: 100}) {
+ RangeError.checkValueInInterval(count, 1, _length, "count");
+ RangeError.checkNotNegative(load, "load");
+ if (count == 1) {
+ return list1(run(function, argument, load: load,
+ timeout: timeout, onTimeout: onTimeout));
+ }
+ List result = new List<Future>(count);
+ if (count == _length) {
+ // No need to change the order of entries in the queue.
+ for (int i = 0; i < count; i++) {
+ _LoadBalancerEntry entry = _queue[i];
+ entry.load += load;
+ result[i] =
+ entry.run(this, load, function, argument, timeout, onTimeout);
+ }
+ } else {
+ // Remove the [count] least loaded services and run the
+ // command on each, then add them back to the queue.
+ // This avoids running the same command twice in the same
+ // isolate.
+ List entries = new List(count);
+ for (int i = 0; i < count; i++) {
+ entries[i] = _removeFirst();
+ }
+ for (int i = 0; i < count; i++) {
+ _LoadBalancerEntry entry = entries[i];
+ entry.load += load;
+ _add(entry);
+ result[i] =
+ entry.run(this, load, function, argument, timeout, onTimeout);
+ }
+ }
+ return result;
+ }
+
+ Future close() {
+ if (_stopFuture != null) return _stopFuture;
+ _stopFuture =
+ MultiError.waitUnordered(_queue.take(_length).map((e) => e.close()));
+ // Remove all entries.
+ for (int i = 0; i < _length; i++) _queue[i].queueIndex = -1;
+ _queue = null;
+ _length = 0;
+ return _stopFuture;
+ }
+
+ /// Place [element] in heap at [index] or above.
+ ///
+ /// Put element into the empty cell at `index`.
+ /// While the `element` has higher priority than the
+ /// parent, swap it with the parent.
+ void _bubbleUp(_LoadBalancerEntry element, int index) {
+ while (index > 0) {
+ int parentIndex = (index - 1) ~/ 2;
+ _LoadBalancerEntry parent = _queue[parentIndex];
+ if (element.compareTo(parent) > 0) break;
+ _queue[index] = parent;
+ parent.queueIndex = index;
+ index = parentIndex;
+ }
+ _queue[index] = element;
+ element.queueIndex = index;
+ }
+
+ /// Place [element] in heap at [index] or above.
+ ///
+ /// Put element into the empty cell at `index`.
+ /// While the `element` has lower priority than either child,
+ /// swap it with the highest priority child.
+ void _bubbleDown(_LoadBalancerEntry element, int index) {
+ while (true) {
+ int childIndex = index * 2 + 1; // Left child index.
+ if (childIndex >= _length) break;
+ _LoadBalancerEntry child = _queue[childIndex];
+ int rightChildIndex = childIndex + 1;
+ if (rightChildIndex < _length) {
+ _LoadBalancerEntry rightChild = _queue[rightChildIndex];
+ if (rightChild.compareTo(child) < 0) {
+ childIndex = rightChildIndex;
+ child = rightChild;
+ }
+ }
+ if (element.compareTo(child) <= 0) break;
+ _queue[index] = child;
+ child.queueIndex = index;
+ index = childIndex;
+ }
+ _queue[index] = element;
+ element.queueIndex = index;
+ }
+
+ /// Removes the entry from the queue, but doesn't stop its service.
+ ///
+ /// The entry is expected to be either added back to the queue
+ /// immediately or have its stop method called.
+ void _remove(_LoadBalancerEntry entry) {
+ int index = entry.queueIndex;
+ if (index < 0) return;
+ entry.queueIndex = -1;
+ _length--;
+ _LoadBalancerEntry replacement = _queue[_length];
+ _queue[_length] = null;
+ if (index < _length) {
+ if (entry.compareTo(replacement) < 0) {
+ _bubbleDown(replacement, index);
+ } else {
+ _bubbleUp(replacement, index);
+ }
+ }
+ }
+
+ /// Adds entry to the queue.
+ void _add(_LoadBalancerEntry entry) {
+ if (_stopFuture != null) throw new StateError("LoadBalancer is stopped");
+ assert(entry.queueIndex < 0);
+ if (_queue.length == _length) {
+ _grow();
+ }
+ int index = _length;
+ _length = index + 1;
+ _bubbleUp(entry, index);
+ }
+
+ void _increaseLoad(_LoadBalancerEntry entry, int load) {
+ assert(load >= 0);
+ entry.load += load;
+ if (entry.inQueue) {
+ _bubbleDown(entry, entry.queueIndex);
+ }
+ }
+
+ void _decreaseLoad(_LoadBalancerEntry entry, int load) {
+ assert(load >= 0);
+ entry.load -= load;
+ if (entry.inQueue) {
+ _bubbleUp(entry, entry.queueIndex);
+ }
+ }
+
+ void _grow() {
+ List newQueue = new List(_length * 2);
+ newQueue.setRange(0, _length, _queue);
+ _queue = newQueue;
+ }
+
+ _LoadBalancerEntry get _first {
+ assert(_length > 0);
+ return _queue[0];
+ }
+
+ _LoadBalancerEntry _removeFirst() {
+ _LoadBalancerEntry result = _first;
+ _remove(result);
+ return result;
+ }
+}
+
+class _LoadBalancerEntry implements Comparable<_LoadBalancerEntry> {
+ // The current load on the isolate.
+ int load = 0;
+ // The current index in the heap-queue.
+ // Negative when the entry is not part of the queue.
+ int queueIndex = -1;
+
+ // The service used to send commands to the other isolate.
+ Runner runner;
+
+ _LoadBalancerEntry(Runner runner) : runner = runner;
+
+ /// Whether the entry is still in the queue.
+ bool get inQueue => queueIndex >= 0;
+
+ Future run(LoadBalancer balancer, int load, function(argument), argument,
+ Duration timeout, onTimeout()) {
+ return runner.run(function, argument,
+ timeout: timeout, onTimeout: onTimeout).whenComplete(() {
+ balancer._decreaseLoad(this, load);
+ });
+ }
+
+ Future close() => runner.close();
+
+ int compareTo(_LoadBalancerEntry other) => load - other.load;
+}
« no previous file with comments | « packages/isolate/lib/isolate_runner.dart ('k') | packages/isolate/lib/ports.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698