OLD | NEW |
1 import os, select | 1 import os, select |
2 import kvm_utils, kvm_vm, kvm_subprocess | 2 import kvm_utils, kvm_vm, kvm_subprocess |
3 | 3 |
4 | 4 |
5 class scheduler: | 5 class scheduler: |
6 """ | 6 """ |
7 A scheduler that manages several parallel test execution pipelines on a | 7 A scheduler that manages several parallel test execution pipelines on a |
8 single host. | 8 single host. |
9 """ | 9 """ |
10 | 10 |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
56 while True: | 56 while True: |
57 cmd = r.readline().split() | 57 cmd = r.readline().split() |
58 if not cmd: | 58 if not cmd: |
59 continue | 59 continue |
60 | 60 |
61 # The scheduler wants this worker to run a test | 61 # The scheduler wants this worker to run a test |
62 if cmd[0] == "run": | 62 if cmd[0] == "run": |
63 test_index = int(cmd[1]) | 63 test_index = int(cmd[1]) |
64 test = self.tests[test_index].copy() | 64 test = self.tests[test_index].copy() |
65 test.update(self_dict) | 65 test.update(self_dict) |
66 test = kvm_utils.get_sub_pool(test, index, self.num_workers) | |
67 test_iterations = int(test.get("iterations", 1)) | 66 test_iterations = int(test.get("iterations", 1)) |
68 status = run_test_func("kvm", params=test, | 67 status = run_test_func("kvm", params=test, |
69 tag=test.get("shortname"), | 68 tag=test.get("shortname"), |
70 iterations=test_iterations) | 69 iterations=test_iterations) |
71 w.write("done %s %s\n" % (test_index, status)) | 70 w.write("done %s %s\n" % (test_index, status)) |
72 w.write("ready\n") | 71 w.write("ready\n") |
73 | 72 |
74 # The scheduler wants this worker to free its used resources | 73 # The scheduler wants this worker to free its used resources |
75 elif cmd[0] == "cleanup": | 74 elif cmd[0] == "cleanup": |
76 env_filename = os.path.join(self.bindir, self_dict["env"]) | 75 env_filename = os.path.join(self.bindir, self_dict["env"]) |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
122 | 121 |
123 # A worker completed a test | 122 # A worker completed a test |
124 elif msg[0] == "done": | 123 elif msg[0] == "done": |
125 test_index = int(msg[1]) | 124 test_index = int(msg[1]) |
126 test = self.tests[test_index] | 125 test = self.tests[test_index] |
127 status = int(eval(msg[2])) | 126 status = int(eval(msg[2])) |
128 test_status[test_index] = ("fail", "pass")[status] | 127 test_status[test_index] = ("fail", "pass")[status] |
129 # If the test failed, mark all dependent tests as "failed" t
oo | 128 # If the test failed, mark all dependent tests as "failed" t
oo |
130 if not status: | 129 if not status: |
131 for i, other_test in enumerate(self.tests): | 130 for i, other_test in enumerate(self.tests): |
132 for dep in other_test.get("depend", []): | 131 for dep in other_test.get("dep", []): |
133 if dep in test["name"]: | 132 if dep in test["name"]: |
134 test_status[i] = "fail" | 133 test_status[i] = "fail" |
135 | 134 |
136 # A worker is done shutting down its VMs and other processes | 135 # A worker is done shutting down its VMs and other processes |
137 elif msg[0] == "cleanup_done": | 136 elif msg[0] == "cleanup_done": |
138 used_cpus[worker_index] = 0 | 137 used_cpus[worker_index] = 0 |
139 used_mem[worker_index] = 0 | 138 used_mem[worker_index] = 0 |
140 closing_workers.remove(worker_index) | 139 closing_workers.remove(worker_index) |
141 | 140 |
142 if not someone_is_ready: | 141 if not someone_is_ready: |
143 continue | 142 continue |
144 | 143 |
145 for worker in idle_workers[:]: | 144 for worker in idle_workers[:]: |
146 # Find a test for this worker | 145 # Find a test for this worker |
147 test_found = False | 146 test_found = False |
148 for i, test in enumerate(self.tests): | 147 for i, test in enumerate(self.tests): |
149 # We only want "waiting" tests | 148 # We only want "waiting" tests |
150 if test_status[i] != "waiting": | 149 if test_status[i] != "waiting": |
151 continue | 150 continue |
152 # Make sure the test isn't assigned to another worker | 151 # Make sure the test isn't assigned to another worker |
153 if test_worker[i] is not None and test_worker[i] != worker: | 152 if test_worker[i] is not None and test_worker[i] != worker: |
154 continue | 153 continue |
155 # Make sure the test's dependencies are satisfied | 154 # Make sure the test's dependencies are satisfied |
156 dependencies_satisfied = True | 155 dependencies_satisfied = True |
157 for dep in test["depend"]: | 156 for dep in test["dep"]: |
158 dependencies = [j for j, t in enumerate(self.tests) | 157 dependencies = [j for j, t in enumerate(self.tests) |
159 if dep in t["name"]] | 158 if dep in t["name"]] |
160 bad_status_deps = [j for j in dependencies | 159 bad_status_deps = [j for j in dependencies |
161 if test_status[j] != "pass"] | 160 if test_status[j] != "pass"] |
162 if bad_status_deps: | 161 if bad_status_deps: |
163 dependencies_satisfied = False | 162 dependencies_satisfied = False |
164 break | 163 break |
165 if not dependencies_satisfied: | 164 if not dependencies_satisfied: |
166 continue | 165 continue |
167 # Make sure we have enough resources to run the test | 166 # Make sure we have enough resources to run the test |
(...skipping 25 matching lines...) Expand all Loading... |
193 continue | 192 continue |
194 # Everything is OK -- run the test | 193 # Everything is OK -- run the test |
195 test_status[i] = "running" | 194 test_status[i] = "running" |
196 test_worker[i] = worker | 195 test_worker[i] = worker |
197 idle_workers.remove(worker) | 196 idle_workers.remove(worker) |
198 # Update used_cpus and used_mem | 197 # Update used_cpus and used_mem |
199 used_cpus[worker] = test_used_cpus | 198 used_cpus[worker] = test_used_cpus |
200 used_mem[worker] = test_used_mem | 199 used_mem[worker] = test_used_mem |
201 # Assign all related tests to this worker | 200 # Assign all related tests to this worker |
202 for j, other_test in enumerate(self.tests): | 201 for j, other_test in enumerate(self.tests): |
203 for other_dep in other_test["depend"]: | 202 for other_dep in other_test["dep"]: |
204 # All tests that depend on this test | 203 # All tests that depend on this test |
205 if other_dep in test["name"]: | 204 if other_dep in test["name"]: |
206 test_worker[j] = worker | 205 test_worker[j] = worker |
207 break | 206 break |
208 # ... and all tests that share a dependency | 207 # ... and all tests that share a dependency |
209 # with this test | 208 # with this test |
210 for dep in test["depend"]: | 209 for dep in test["dep"]: |
211 if dep in other_dep or other_dep in dep: | 210 if dep in other_dep or other_dep in dep: |
212 test_worker[j] = worker | 211 test_worker[j] = worker |
213 break | 212 break |
214 # Tell the worker to run the test | 213 # Tell the worker to run the test |
215 self.s2w_w[worker].write("run %s\n" % i) | 214 self.s2w_w[worker].write("run %s\n" % i) |
216 break | 215 break |
217 | 216 |
218 # If there won't be any tests for this worker to run soon, tell | 217 # If there won't be any tests for this worker to run soon, tell |
219 # the worker to free its used resources | 218 # the worker to free its used resources |
220 if not test_found and (used_cpus[worker] or used_mem[worker]): | 219 if not test_found and (used_cpus[worker] or used_mem[worker]): |
221 self.s2w_w[worker].write("cleanup\n") | 220 self.s2w_w[worker].write("cleanup\n") |
222 idle_workers.remove(worker) | 221 idle_workers.remove(worker) |
223 closing_workers.append(worker) | 222 closing_workers.append(worker) |
224 | 223 |
225 # If there are no more new tests to run, terminate the workers and | 224 # If there are no more new tests to run, terminate the workers and |
226 # the scheduler | 225 # the scheduler |
227 if len(idle_workers) == self.num_workers: | 226 if len(idle_workers) == self.num_workers: |
228 for worker in idle_workers: | 227 for worker in idle_workers: |
229 self.s2w_w[worker].write("terminate\n") | 228 self.s2w_w[worker].write("terminate\n") |
230 break | 229 break |
OLD | NEW |