Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(186)

Side by Side Diff: lib/loadbalancer.dart

Issue 955053002: adding codereview file, formatting, adding gitignore (Closed) Base URL: https://github.com/dart-lang/isolate.git@master
Patch Set: Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 /** 5 /**
6 * A load-balancing runner pool. 6 * A load-balancing runner pool.
7 */ 7 */
8 library dart.pkg.isolate.loadbalancer; 8 library dart.pkg.isolate.loadbalancer;
9 9
10 import "runner.dart"; 10 import "runner.dart";
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
49 49
50 /** 50 /**
51 * Asynchronously create [size] runners and create a `LoadBalancer` of those. 51 * Asynchronously create [size] runners and create a `LoadBalancer` of those.
52 * 52 *
53 * This is a helper function that makes it easy to create a `LoadBalancer` 53 * This is a helper function that makes it easy to create a `LoadBalancer`
54 * with asynchronously created runners, for example: 54 * with asynchronously created runners, for example:
55 * 55 *
56 * var isolatePool = LoadBalancer.create(10, IsolateRunner.spawn); 56 * var isolatePool = LoadBalancer.create(10, IsolateRunner.spawn);
57 */ 57 */
58 static Future<LoadBalancer> create(int size, Future<Runner> createRunner()) { 58 static Future<LoadBalancer> create(int size, Future<Runner> createRunner()) {
59 return Future.wait(new Iterable.generate(size, (_) => createRunner()), 59 return Future
60 cleanUp: (Runner runner) { runner.close(); }) 60 .wait(new Iterable.generate(size, (_) => createRunner()),
61 .then((runners) => new LoadBalancer(runners)); 61 cleanUp: (Runner runner) {
Lasse Reichstein Nielsen 2015/02/26 10:59:14 indent cleanUp to after the '('. Putting the body
62 runner.close();
63 }).then((runners) => new LoadBalancer(runners));
62 } 64 }
63 65
64 static List<_LoadBalancerEntry> _createEntries(Iterable<Runner> runners) { 66 static List<_LoadBalancerEntry> _createEntries(Iterable<Runner> runners) {
65 var entries = runners.map((runner) => new _LoadBalancerEntry(runner)); 67 var entries = runners.map((runner) => new _LoadBalancerEntry(runner));
66 return new List<_LoadBalancerEntry>.from(entries, growable: false); 68 return new List<_LoadBalancerEntry>.from(entries, growable: false);
67 } 69 }
68 70
69 /** 71 /**
70 * Execute the command in the currently least loaded isolate. 72 * Execute the command in the currently least loaded isolate.
71 * 73 *
72 * The optional [load] parameter represents the load that the command 74 * The optional [load] parameter represents the load that the command
73 * is causing on the isolate where it runs. 75 * is causing on the isolate where it runs.
74 * The number has no fixed meaning, but should be seen as relative to 76 * The number has no fixed meaning, but should be seen as relative to
75 * other commands run in the same load balancer. 77 * other commands run in the same load balancer.
76 * The `load` must not be negative. 78 * The `load` must not be negative.
77 * 79 *
78 * If [timeout] and [onTimeout] are provided, they are forwarded to 80 * If [timeout] and [onTimeout] are provided, they are forwarded to
79 * the runner running the function, which will handle a timeout 81 * the runner running the function, which will handle a timeout
80 * as normal. 82 * as normal.
81 */ 83 */
82 Future run(function(argument), argument, {Duration timeout, 84 Future run(function(argument), argument,
83 onTimeout(), 85 {Duration timeout, onTimeout(), int load: 100}) {
Lasse Reichstein Nielsen 2015/02/26 10:59:14 Indent "{" to after then "(". That is: Future run(
84 int load: 100}) {
85 RangeError.checkNotNegative(load, "load"); 86 RangeError.checkNotNegative(load, "load");
86 _LoadBalancerEntry entry = _first; 87 _LoadBalancerEntry entry = _first;
87 _increaseLoad(entry, load); 88 _increaseLoad(entry, load);
88 return entry.run(this, load, function, argument, timeout, onTimeout); 89 return entry.run(this, load, function, argument, timeout, onTimeout);
89 } 90 }
90 91
91 /** 92 /**
92 * Execute the same function in the least loaded [count] isolates. 93 * Execute the same function in the least loaded [count] isolates.
93 * 94 *
94 * This guarantees that the function isn't run twice in the same isolate, 95 * This guarantees that the function isn't run twice in the same isolate,
95 * so `count` is not allowed to exceed [length]. 96 * so `count` is not allowed to exceed [length].
96 * 97 *
97 * The optional [load] parameter represents the load that the command 98 * The optional [load] parameter represents the load that the command
98 * is causing on the isolate where it runs. 99 * is causing on the isolate where it runs.
99 * The number has no fixed meaning, but should be seen as relative to 100 * The number has no fixed meaning, but should be seen as relative to
100 * other commands run in the same load balancer. 101 * other commands run in the same load balancer.
101 * The `load` must not be negative. 102 * The `load` must not be negative.
102 * 103 *
103 * If [timeout] and [onTimeout] are provided, they are forwarded to 104 * If [timeout] and [onTimeout] are provided, they are forwarded to
104 * the runners running the function, which will handle any timeouts 105 * the runners running the function, which will handle any timeouts
105 * as normal. 106 * as normal.
106 */ 107 */
107 List<Future> runMultiple(int count, function(argument), argument, 108 List<Future> runMultiple(int count, function(argument), argument,
108 {Duration timeout, 109 {Duration timeout, onTimeout(), int load: 100}) {
Lasse Reichstein Nielsen 2015/02/26 10:59:14 Indent '{ ' to after '('.
109 onTimeout(),
110 int load: 100}) {
111 RangeError.checkValueInInterval(count, 1, _length, "count"); 110 RangeError.checkValueInInterval(count, 1, _length, "count");
112 RangeError.checkNotNegative(load, "load"); 111 RangeError.checkNotNegative(load, "load");
113 if (count == 1) { 112 if (count == 1) {
114 return list1(run(function, argument, load: load, 113 return list1(run(function, argument,
115 timeout: timeout, onTimeout: onTimeout)); 114 load: load, timeout: timeout, onTimeout: onTimeout));
Lasse Reichstein Nielsen 2015/02/26 10:59:14 Indent arguments to after '(' or put all arguments
116 } 115 }
117 List result = new List<Future>(count); 116 List result = new List<Future>(count);
118 if (count == _length) { 117 if (count == _length) {
119 // No need to change the order of entries in the queue. 118 // No need to change the order of entries in the queue.
120 for (int i = 0; i < count; i++) { 119 for (int i = 0; i < count; i++) {
121 _LoadBalancerEntry entry = _queue[i]; 120 _LoadBalancerEntry entry = _queue[i];
122 entry.load += load; 121 entry.load += load;
123 result[i] = entry.run(this, load, function, argument, 122 result[i] =
124 timeout, onTimeout); 123 entry.run(this, load, function, argument, timeout, onTimeout);
125 } 124 }
126 } else { 125 } else {
127 // Remove the [count] least loaded services and run the 126 // Remove the [count] least loaded services and run the
128 // command on each, then add them back to the queue. 127 // command on each, then add them back to the queue.
129 // This avoids running the same command twice in the same 128 // This avoids running the same command twice in the same
130 // isolate. 129 // isolate.
131 List entries = new List(count); 130 List entries = new List(count);
132 for (int i = 0; i < count; i++) { 131 for (int i = 0; i < count; i++) {
133 entries[i] = _removeFirst(); 132 entries[i] = _removeFirst();
134 } 133 }
135 for (int i = 0; i < count; i++) { 134 for (int i = 0; i < count; i++) {
136 _LoadBalancerEntry entry = entries[i]; 135 _LoadBalancerEntry entry = entries[i];
137 entry.load += load; 136 entry.load += load;
138 _add(entry); 137 _add(entry);
139 result[i] = entry.run(this, load, function, argument, 138 result[i] =
140 timeout, onTimeout); 139 entry.run(this, load, function, argument, timeout, onTimeout);
141 } 140 }
142 } 141 }
143 return result; 142 return result;
144 } 143 }
145 144
146 Future close() { 145 Future close() {
147 if (_stopFuture != null) return _stopFuture; 146 if (_stopFuture != null) return _stopFuture;
148 _stopFuture = MultiError.waitUnordered(_queue.take(_length) 147 _stopFuture =
149 .map((e) => e.close())); 148 MultiError.waitUnordered(_queue.take(_length).map((e) => e.close()));
150 // Remove all entries. 149 // Remove all entries.
151 for (int i = 0; i < _length; i++) _queue[i].queueIndex = -1; 150 for (int i = 0; i < _length; i++) _queue[i].queueIndex = -1;
152 _queue = null; 151 _queue = null;
153 _length = 0; 152 _length = 0;
154 return _stopFuture; 153 return _stopFuture;
155 } 154 }
156 155
157 /** 156 /**
158 * Place [element] in heap at [index] or above. 157 * Place [element] in heap at [index] or above.
159 * 158 *
(...skipping 16 matching lines...) Expand all
176 175
177 /** 176 /**
178 * Place [element] in heap at [index] or above. 177 * Place [element] in heap at [index] or above.
179 * 178 *
180 * Put element into the empty cell at `index`. 179 * Put element into the empty cell at `index`.
181 * While the `element` has lower priority than either child, 180 * While the `element` has lower priority than either child,
182 * swap it with the highest priority child. 181 * swap it with the highest priority child.
183 */ 182 */
184 void _bubbleDown(_LoadBalancerEntry element, int index) { 183 void _bubbleDown(_LoadBalancerEntry element, int index) {
185 while (true) { 184 while (true) {
186 int childIndex = index * 2 + 1; // Left child index. 185 int childIndex = index * 2 + 1; // Left child index.
Lasse Reichstein Nielsen 2015/02/26 10:59:14 There should always be two spaces before an end-of
187 if (childIndex >= _length) break; 186 if (childIndex >= _length) break;
188 _LoadBalancerEntry child = _queue[childIndex]; 187 _LoadBalancerEntry child = _queue[childIndex];
189 int rightChildIndex = childIndex + 1; 188 int rightChildIndex = childIndex + 1;
190 if (rightChildIndex < _length) { 189 if (rightChildIndex < _length) {
191 _LoadBalancerEntry rightChild = _queue[rightChildIndex]; 190 _LoadBalancerEntry rightChild = _queue[rightChildIndex];
192 if (rightChild.compareTo(child) < 0) { 191 if (rightChild.compareTo(child) < 0) {
193 childIndex = rightChildIndex; 192 childIndex = rightChildIndex;
194 child = rightChild; 193 child = rightChild;
195 } 194 }
196 } 195 }
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after
276 class _LoadBalancerEntry implements Comparable<_LoadBalancerEntry> { 275 class _LoadBalancerEntry implements Comparable<_LoadBalancerEntry> {
277 // The current load on the isolate. 276 // The current load on the isolate.
278 int load = 0; 277 int load = 0;
279 // The current index in the heap-queue. 278 // The current index in the heap-queue.
280 // Negative when the entry is not part of the queue. 279 // Negative when the entry is not part of the queue.
281 int queueIndex = -1; 280 int queueIndex = -1;
282 281
283 // The service used to send commands to the other isolate. 282 // The service used to send commands to the other isolate.
284 Runner runner; 283 Runner runner;
285 284
286 _LoadBalancerEntry(Runner runner) 285 _LoadBalancerEntry(Runner runner) : runner = runner;
287 : runner = runner;
288 286
289 /** Whether the entry is still in the queue. */ 287 /** Whether the entry is still in the queue. */
290 bool get inQueue => queueIndex >= 0; 288 bool get inQueue => queueIndex >= 0;
291 289
292 Future run(LoadBalancer balancer, int load, function(argumen), argument, 290 Future run(LoadBalancer balancer, int load, function(argumen), argument,
293 Duration timeout, onTimeout()) { 291 Duration timeout, onTimeout()) {
294 return runner.run(function, argument, 292 return runner
295 timeout: timeout, onTimeout: onTimeout).whenComplete(() { 293 .run(function, argument, timeout: timeout, onTimeout: onTimeout)
294 .whenComplete(() {
296 balancer._decreaseLoad(this, load); 295 balancer._decreaseLoad(this, load);
Lasse Reichstein Nielsen 2015/02/26 10:59:14 Indent body by four more.
297 }); 296 });
298 } 297 }
299 298
300 Future close() => runner.close(); 299 Future close() => runner.close();
301 300
302 int compareTo(_LoadBalancerEntry other) => load - other.load; 301 int compareTo(_LoadBalancerEntry other) => load - other.load;
303 } 302 }
304
305
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698