Chromium Code Reviews| OLD | NEW | 
|---|---|
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a | 
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. | 
| 4 | 4 | 
| 5 /** | 5 /** | 
| 6 * A load-balancing runner pool. | 6 * A load-balancing runner pool. | 
| 7 */ | 7 */ | 
| 8 library dart.pkg.isolate.loadbalancer; | 8 library dart.pkg.isolate.loadbalancer; | 
| 9 | 9 | 
| 10 import "runner.dart"; | 10 import "runner.dart"; | 
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 49 | 49 | 
| 50 /** | 50 /** | 
| 51 * Asynchronously create [size] runners and create a `LoadBalancer` of those. | 51 * Asynchronously create [size] runners and create a `LoadBalancer` of those. | 
| 52 * | 52 * | 
| 53 * This is a helper function that makes it easy to create a `LoadBalancer` | 53 * This is a helper function that makes it easy to create a `LoadBalancer` | 
| 54 * with asynchronously created runners, for example: | 54 * with asynchronously created runners, for example: | 
| 55 * | 55 * | 
| 56 * var isolatePool = LoadBalancer.create(10, IsolateRunner.spawn); | 56 * var isolatePool = LoadBalancer.create(10, IsolateRunner.spawn); | 
| 57 */ | 57 */ | 
| 58 static Future<LoadBalancer> create(int size, Future<Runner> createRunner()) { | 58 static Future<LoadBalancer> create(int size, Future<Runner> createRunner()) { | 
| 59 return Future.wait(new Iterable.generate(size, (_) => createRunner()), | 59 return Future | 
| 60 cleanUp: (Runner runner) { runner.close(); }) | 60 .wait(new Iterable.generate(size, (_) => createRunner()), | 
| 61 .then((runners) => new LoadBalancer(runners)); | 61 cleanUp: (Runner runner) { | 
| 
 
Lasse Reichstein Nielsen
2015/02/26 10:59:14
indent cleanUp to after the '('. Putting the body
 
 | |
| 62 runner.close(); | |
| 63 }).then((runners) => new LoadBalancer(runners)); | |
| 62 } | 64 } | 
| 63 | 65 | 
| 64 static List<_LoadBalancerEntry> _createEntries(Iterable<Runner> runners) { | 66 static List<_LoadBalancerEntry> _createEntries(Iterable<Runner> runners) { | 
| 65 var entries = runners.map((runner) => new _LoadBalancerEntry(runner)); | 67 var entries = runners.map((runner) => new _LoadBalancerEntry(runner)); | 
| 66 return new List<_LoadBalancerEntry>.from(entries, growable: false); | 68 return new List<_LoadBalancerEntry>.from(entries, growable: false); | 
| 67 } | 69 } | 
| 68 | 70 | 
| 69 /** | 71 /** | 
| 70 * Execute the command in the currently least loaded isolate. | 72 * Execute the command in the currently least loaded isolate. | 
| 71 * | 73 * | 
| 72 * The optional [load] parameter represents the load that the command | 74 * The optional [load] parameter represents the load that the command | 
| 73 * is causing on the isolate where it runs. | 75 * is causing on the isolate where it runs. | 
| 74 * The number has no fixed meaning, but should be seen as relative to | 76 * The number has no fixed meaning, but should be seen as relative to | 
| 75 * other commands run in the same load balancer. | 77 * other commands run in the same load balancer. | 
| 76 * The `load` must not be negative. | 78 * The `load` must not be negative. | 
| 77 * | 79 * | 
| 78 * If [timeout] and [onTimeout] are provided, they are forwarded to | 80 * If [timeout] and [onTimeout] are provided, they are forwarded to | 
| 79 * the runner running the function, which will handle a timeout | 81 * the runner running the function, which will handle a timeout | 
| 80 * as normal. | 82 * as normal. | 
| 81 */ | 83 */ | 
| 82 Future run(function(argument), argument, {Duration timeout, | 84 Future run(function(argument), argument, | 
| 83 onTimeout(), | 85 {Duration timeout, onTimeout(), int load: 100}) { | 
| 
 
Lasse Reichstein Nielsen
2015/02/26 10:59:14
Indent "{" to after then "(". That is:
Future run(
 
 | |
| 84 int load: 100}) { | |
| 85 RangeError.checkNotNegative(load, "load"); | 86 RangeError.checkNotNegative(load, "load"); | 
| 86 _LoadBalancerEntry entry = _first; | 87 _LoadBalancerEntry entry = _first; | 
| 87 _increaseLoad(entry, load); | 88 _increaseLoad(entry, load); | 
| 88 return entry.run(this, load, function, argument, timeout, onTimeout); | 89 return entry.run(this, load, function, argument, timeout, onTimeout); | 
| 89 } | 90 } | 
| 90 | 91 | 
| 91 /** | 92 /** | 
| 92 * Execute the same function in the least loaded [count] isolates. | 93 * Execute the same function in the least loaded [count] isolates. | 
| 93 * | 94 * | 
| 94 * This guarantees that the function isn't run twice in the same isolate, | 95 * This guarantees that the function isn't run twice in the same isolate, | 
| 95 * so `count` is not allowed to exceed [length]. | 96 * so `count` is not allowed to exceed [length]. | 
| 96 * | 97 * | 
| 97 * The optional [load] parameter represents the load that the command | 98 * The optional [load] parameter represents the load that the command | 
| 98 * is causing on the isolate where it runs. | 99 * is causing on the isolate where it runs. | 
| 99 * The number has no fixed meaning, but should be seen as relative to | 100 * The number has no fixed meaning, but should be seen as relative to | 
| 100 * other commands run in the same load balancer. | 101 * other commands run in the same load balancer. | 
| 101 * The `load` must not be negative. | 102 * The `load` must not be negative. | 
| 102 * | 103 * | 
| 103 * If [timeout] and [onTimeout] are provided, they are forwarded to | 104 * If [timeout] and [onTimeout] are provided, they are forwarded to | 
| 104 * the runners running the function, which will handle any timeouts | 105 * the runners running the function, which will handle any timeouts | 
| 105 * as normal. | 106 * as normal. | 
| 106 */ | 107 */ | 
| 107 List<Future> runMultiple(int count, function(argument), argument, | 108 List<Future> runMultiple(int count, function(argument), argument, | 
| 108 {Duration timeout, | 109 {Duration timeout, onTimeout(), int load: 100}) { | 
| 
 
Lasse Reichstein Nielsen
2015/02/26 10:59:14
Indent '{ ' to after '('.
 
 | |
| 109 onTimeout(), | |
| 110 int load: 100}) { | |
| 111 RangeError.checkValueInInterval(count, 1, _length, "count"); | 110 RangeError.checkValueInInterval(count, 1, _length, "count"); | 
| 112 RangeError.checkNotNegative(load, "load"); | 111 RangeError.checkNotNegative(load, "load"); | 
| 113 if (count == 1) { | 112 if (count == 1) { | 
| 114 return list1(run(function, argument, load: load, | 113 return list1(run(function, argument, | 
| 115 timeout: timeout, onTimeout: onTimeout)); | 114 load: load, timeout: timeout, onTimeout: onTimeout)); | 
| 
 
Lasse Reichstein Nielsen
2015/02/26 10:59:14
Indent arguments to after '(' or put all arguments
 
 | |
| 116 } | 115 } | 
| 117 List result = new List<Future>(count); | 116 List result = new List<Future>(count); | 
| 118 if (count == _length) { | 117 if (count == _length) { | 
| 119 // No need to change the order of entries in the queue. | 118 // No need to change the order of entries in the queue. | 
| 120 for (int i = 0; i < count; i++) { | 119 for (int i = 0; i < count; i++) { | 
| 121 _LoadBalancerEntry entry = _queue[i]; | 120 _LoadBalancerEntry entry = _queue[i]; | 
| 122 entry.load += load; | 121 entry.load += load; | 
| 123 result[i] = entry.run(this, load, function, argument, | 122 result[i] = | 
| 124 timeout, onTimeout); | 123 entry.run(this, load, function, argument, timeout, onTimeout); | 
| 125 } | 124 } | 
| 126 } else { | 125 } else { | 
| 127 // Remove the [count] least loaded services and run the | 126 // Remove the [count] least loaded services and run the | 
| 128 // command on each, then add them back to the queue. | 127 // command on each, then add them back to the queue. | 
| 129 // This avoids running the same command twice in the same | 128 // This avoids running the same command twice in the same | 
| 130 // isolate. | 129 // isolate. | 
| 131 List entries = new List(count); | 130 List entries = new List(count); | 
| 132 for (int i = 0; i < count; i++) { | 131 for (int i = 0; i < count; i++) { | 
| 133 entries[i] = _removeFirst(); | 132 entries[i] = _removeFirst(); | 
| 134 } | 133 } | 
| 135 for (int i = 0; i < count; i++) { | 134 for (int i = 0; i < count; i++) { | 
| 136 _LoadBalancerEntry entry = entries[i]; | 135 _LoadBalancerEntry entry = entries[i]; | 
| 137 entry.load += load; | 136 entry.load += load; | 
| 138 _add(entry); | 137 _add(entry); | 
| 139 result[i] = entry.run(this, load, function, argument, | 138 result[i] = | 
| 140 timeout, onTimeout); | 139 entry.run(this, load, function, argument, timeout, onTimeout); | 
| 141 } | 140 } | 
| 142 } | 141 } | 
| 143 return result; | 142 return result; | 
| 144 } | 143 } | 
| 145 | 144 | 
| 146 Future close() { | 145 Future close() { | 
| 147 if (_stopFuture != null) return _stopFuture; | 146 if (_stopFuture != null) return _stopFuture; | 
| 148 _stopFuture = MultiError.waitUnordered(_queue.take(_length) | 147 _stopFuture = | 
| 149 .map((e) => e.close())); | 148 MultiError.waitUnordered(_queue.take(_length).map((e) => e.close())); | 
| 150 // Remove all entries. | 149 // Remove all entries. | 
| 151 for (int i = 0; i < _length; i++) _queue[i].queueIndex = -1; | 150 for (int i = 0; i < _length; i++) _queue[i].queueIndex = -1; | 
| 152 _queue = null; | 151 _queue = null; | 
| 153 _length = 0; | 152 _length = 0; | 
| 154 return _stopFuture; | 153 return _stopFuture; | 
| 155 } | 154 } | 
| 156 | 155 | 
| 157 /** | 156 /** | 
| 158 * Place [element] in heap at [index] or above. | 157 * Place [element] in heap at [index] or above. | 
| 159 * | 158 * | 
| (...skipping 16 matching lines...) Expand all Loading... | |
| 176 | 175 | 
| 177 /** | 176 /** | 
| 178 * Place [element] in heap at [index] or above. | 177 * Place [element] in heap at [index] or above. | 
| 179 * | 178 * | 
| 180 * Put element into the empty cell at `index`. | 179 * Put element into the empty cell at `index`. | 
| 181 * While the `element` has lower priority than either child, | 180 * While the `element` has lower priority than either child, | 
| 182 * swap it with the highest priority child. | 181 * swap it with the highest priority child. | 
| 183 */ | 182 */ | 
| 184 void _bubbleDown(_LoadBalancerEntry element, int index) { | 183 void _bubbleDown(_LoadBalancerEntry element, int index) { | 
| 185 while (true) { | 184 while (true) { | 
| 186 int childIndex = index * 2 + 1; // Left child index. | 185 int childIndex = index * 2 + 1; // Left child index. | 
| 
 
Lasse Reichstein Nielsen
2015/02/26 10:59:14
There should always be two spaces before an end-of
 
 | |
| 187 if (childIndex >= _length) break; | 186 if (childIndex >= _length) break; | 
| 188 _LoadBalancerEntry child = _queue[childIndex]; | 187 _LoadBalancerEntry child = _queue[childIndex]; | 
| 189 int rightChildIndex = childIndex + 1; | 188 int rightChildIndex = childIndex + 1; | 
| 190 if (rightChildIndex < _length) { | 189 if (rightChildIndex < _length) { | 
| 191 _LoadBalancerEntry rightChild = _queue[rightChildIndex]; | 190 _LoadBalancerEntry rightChild = _queue[rightChildIndex]; | 
| 192 if (rightChild.compareTo(child) < 0) { | 191 if (rightChild.compareTo(child) < 0) { | 
| 193 childIndex = rightChildIndex; | 192 childIndex = rightChildIndex; | 
| 194 child = rightChild; | 193 child = rightChild; | 
| 195 } | 194 } | 
| 196 } | 195 } | 
| (...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 276 class _LoadBalancerEntry implements Comparable<_LoadBalancerEntry> { | 275 class _LoadBalancerEntry implements Comparable<_LoadBalancerEntry> { | 
| 277 // The current load on the isolate. | 276 // The current load on the isolate. | 
| 278 int load = 0; | 277 int load = 0; | 
| 279 // The current index in the heap-queue. | 278 // The current index in the heap-queue. | 
| 280 // Negative when the entry is not part of the queue. | 279 // Negative when the entry is not part of the queue. | 
| 281 int queueIndex = -1; | 280 int queueIndex = -1; | 
| 282 | 281 | 
| 283 // The service used to send commands to the other isolate. | 282 // The service used to send commands to the other isolate. | 
| 284 Runner runner; | 283 Runner runner; | 
| 285 | 284 | 
| 286 _LoadBalancerEntry(Runner runner) | 285 _LoadBalancerEntry(Runner runner) : runner = runner; | 
| 287 : runner = runner; | |
| 288 | 286 | 
| 289 /** Whether the entry is still in the queue. */ | 287 /** Whether the entry is still in the queue. */ | 
| 290 bool get inQueue => queueIndex >= 0; | 288 bool get inQueue => queueIndex >= 0; | 
| 291 | 289 | 
| 292 Future run(LoadBalancer balancer, int load, function(argumen), argument, | 290 Future run(LoadBalancer balancer, int load, function(argumen), argument, | 
| 293 Duration timeout, onTimeout()) { | 291 Duration timeout, onTimeout()) { | 
| 294 return runner.run(function, argument, | 292 return runner | 
| 295 timeout: timeout, onTimeout: onTimeout).whenComplete(() { | 293 .run(function, argument, timeout: timeout, onTimeout: onTimeout) | 
| 294 .whenComplete(() { | |
| 296 balancer._decreaseLoad(this, load); | 295 balancer._decreaseLoad(this, load); | 
| 
 
Lasse Reichstein Nielsen
2015/02/26 10:59:14
Indent body by four more.
 
 | |
| 297 }); | 296 }); | 
| 298 } | 297 } | 
| 299 | 298 | 
| 300 Future close() => runner.close(); | 299 Future close() => runner.close(); | 
| 301 | 300 | 
| 302 int compareTo(_LoadBalancerEntry other) => load - other.load; | 301 int compareTo(_LoadBalancerEntry other) => load - other.load; | 
| 303 } | 302 } | 
| 304 | |
| 305 | |
| OLD | NEW |