| OLD | NEW |
| 1 import os, select | 1 import os, select |
| 2 import kvm_utils, kvm_vm, kvm_subprocess | 2 import kvm_utils, kvm_vm, kvm_subprocess |
| 3 | 3 |
| 4 | 4 |
| 5 class scheduler: | 5 class scheduler: |
| 6 """ | 6 """ |
| 7 A scheduler that manages several parallel test execution pipelines on a | 7 A scheduler that manages several parallel test execution pipelines on a |
| 8 single host. | 8 single host. |
| 9 """ | 9 """ |
| 10 | 10 |
| (...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 56 while True: | 56 while True: |
| 57 cmd = r.readline().split() | 57 cmd = r.readline().split() |
| 58 if not cmd: | 58 if not cmd: |
| 59 continue | 59 continue |
| 60 | 60 |
| 61 # The scheduler wants this worker to run a test | 61 # The scheduler wants this worker to run a test |
| 62 if cmd[0] == "run": | 62 if cmd[0] == "run": |
| 63 test_index = int(cmd[1]) | 63 test_index = int(cmd[1]) |
| 64 test = self.tests[test_index].copy() | 64 test = self.tests[test_index].copy() |
| 65 test.update(self_dict) | 65 test.update(self_dict) |
| 66 test = kvm_utils.get_sub_pool(test, index, self.num_workers) | |
| 67 test_iterations = int(test.get("iterations", 1)) | 66 test_iterations = int(test.get("iterations", 1)) |
| 68 status = run_test_func("kvm", params=test, | 67 status = run_test_func("kvm", params=test, |
| 69 tag=test.get("shortname"), | 68 tag=test.get("shortname"), |
| 70 iterations=test_iterations) | 69 iterations=test_iterations) |
| 71 w.write("done %s %s\n" % (test_index, status)) | 70 w.write("done %s %s\n" % (test_index, status)) |
| 72 w.write("ready\n") | 71 w.write("ready\n") |
| 73 | 72 |
| 74 # The scheduler wants this worker to free its used resources | 73 # The scheduler wants this worker to free its used resources |
| 75 elif cmd[0] == "cleanup": | 74 elif cmd[0] == "cleanup": |
| 76 env_filename = os.path.join(self.bindir, self_dict["env"]) | 75 env_filename = os.path.join(self.bindir, self_dict["env"]) |
| (...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 122 | 121 |
| 123 # A worker completed a test | 122 # A worker completed a test |
| 124 elif msg[0] == "done": | 123 elif msg[0] == "done": |
| 125 test_index = int(msg[1]) | 124 test_index = int(msg[1]) |
| 126 test = self.tests[test_index] | 125 test = self.tests[test_index] |
| 127 status = int(eval(msg[2])) | 126 status = int(eval(msg[2])) |
| 128 test_status[test_index] = ("fail", "pass")[status] | 127 test_status[test_index] = ("fail", "pass")[status] |
| 129 # If the test failed, mark all dependent tests as "failed" t
oo | 128 # If the test failed, mark all dependent tests as "failed" t
oo |
| 130 if not status: | 129 if not status: |
| 131 for i, other_test in enumerate(self.tests): | 130 for i, other_test in enumerate(self.tests): |
| 132 for dep in other_test.get("depend", []): | 131 for dep in other_test.get("dep", []): |
| 133 if dep in test["name"]: | 132 if dep in test["name"]: |
| 134 test_status[i] = "fail" | 133 test_status[i] = "fail" |
| 135 | 134 |
| 136 # A worker is done shutting down its VMs and other processes | 135 # A worker is done shutting down its VMs and other processes |
| 137 elif msg[0] == "cleanup_done": | 136 elif msg[0] == "cleanup_done": |
| 138 used_cpus[worker_index] = 0 | 137 used_cpus[worker_index] = 0 |
| 139 used_mem[worker_index] = 0 | 138 used_mem[worker_index] = 0 |
| 140 closing_workers.remove(worker_index) | 139 closing_workers.remove(worker_index) |
| 141 | 140 |
| 142 if not someone_is_ready: | 141 if not someone_is_ready: |
| 143 continue | 142 continue |
| 144 | 143 |
| 145 for worker in idle_workers[:]: | 144 for worker in idle_workers[:]: |
| 146 # Find a test for this worker | 145 # Find a test for this worker |
| 147 test_found = False | 146 test_found = False |
| 148 for i, test in enumerate(self.tests): | 147 for i, test in enumerate(self.tests): |
| 149 # We only want "waiting" tests | 148 # We only want "waiting" tests |
| 150 if test_status[i] != "waiting": | 149 if test_status[i] != "waiting": |
| 151 continue | 150 continue |
| 152 # Make sure the test isn't assigned to another worker | 151 # Make sure the test isn't assigned to another worker |
| 153 if test_worker[i] is not None and test_worker[i] != worker: | 152 if test_worker[i] is not None and test_worker[i] != worker: |
| 154 continue | 153 continue |
| 155 # Make sure the test's dependencies are satisfied | 154 # Make sure the test's dependencies are satisfied |
| 156 dependencies_satisfied = True | 155 dependencies_satisfied = True |
| 157 for dep in test["depend"]: | 156 for dep in test["dep"]: |
| 158 dependencies = [j for j, t in enumerate(self.tests) | 157 dependencies = [j for j, t in enumerate(self.tests) |
| 159 if dep in t["name"]] | 158 if dep in t["name"]] |
| 160 bad_status_deps = [j for j in dependencies | 159 bad_status_deps = [j for j in dependencies |
| 161 if test_status[j] != "pass"] | 160 if test_status[j] != "pass"] |
| 162 if bad_status_deps: | 161 if bad_status_deps: |
| 163 dependencies_satisfied = False | 162 dependencies_satisfied = False |
| 164 break | 163 break |
| 165 if not dependencies_satisfied: | 164 if not dependencies_satisfied: |
| 166 continue | 165 continue |
| 167 # Make sure we have enough resources to run the test | 166 # Make sure we have enough resources to run the test |
| (...skipping 25 matching lines...) Expand all Loading... |
| 193 continue | 192 continue |
| 194 # Everything is OK -- run the test | 193 # Everything is OK -- run the test |
| 195 test_status[i] = "running" | 194 test_status[i] = "running" |
| 196 test_worker[i] = worker | 195 test_worker[i] = worker |
| 197 idle_workers.remove(worker) | 196 idle_workers.remove(worker) |
| 198 # Update used_cpus and used_mem | 197 # Update used_cpus and used_mem |
| 199 used_cpus[worker] = test_used_cpus | 198 used_cpus[worker] = test_used_cpus |
| 200 used_mem[worker] = test_used_mem | 199 used_mem[worker] = test_used_mem |
| 201 # Assign all related tests to this worker | 200 # Assign all related tests to this worker |
| 202 for j, other_test in enumerate(self.tests): | 201 for j, other_test in enumerate(self.tests): |
| 203 for other_dep in other_test["depend"]: | 202 for other_dep in other_test["dep"]: |
| 204 # All tests that depend on this test | 203 # All tests that depend on this test |
| 205 if other_dep in test["name"]: | 204 if other_dep in test["name"]: |
| 206 test_worker[j] = worker | 205 test_worker[j] = worker |
| 207 break | 206 break |
| 208 # ... and all tests that share a dependency | 207 # ... and all tests that share a dependency |
| 209 # with this test | 208 # with this test |
| 210 for dep in test["depend"]: | 209 for dep in test["dep"]: |
| 211 if dep in other_dep or other_dep in dep: | 210 if dep in other_dep or other_dep in dep: |
| 212 test_worker[j] = worker | 211 test_worker[j] = worker |
| 213 break | 212 break |
| 214 # Tell the worker to run the test | 213 # Tell the worker to run the test |
| 215 self.s2w_w[worker].write("run %s\n" % i) | 214 self.s2w_w[worker].write("run %s\n" % i) |
| 216 break | 215 break |
| 217 | 216 |
| 218 # If there won't be any tests for this worker to run soon, tell | 217 # If there won't be any tests for this worker to run soon, tell |
| 219 # the worker to free its used resources | 218 # the worker to free its used resources |
| 220 if not test_found and (used_cpus[worker] or used_mem[worker]): | 219 if not test_found and (used_cpus[worker] or used_mem[worker]): |
| 221 self.s2w_w[worker].write("cleanup\n") | 220 self.s2w_w[worker].write("cleanup\n") |
| 222 idle_workers.remove(worker) | 221 idle_workers.remove(worker) |
| 223 closing_workers.append(worker) | 222 closing_workers.append(worker) |
| 224 | 223 |
| 225 # If there are no more new tests to run, terminate the workers and | 224 # If there are no more new tests to run, terminate the workers and |
| 226 # the scheduler | 225 # the scheduler |
| 227 if len(idle_workers) == self.num_workers: | 226 if len(idle_workers) == self.num_workers: |
| 228 for worker in idle_workers: | 227 for worker in idle_workers: |
| 229 self.s2w_w[worker].write("terminate\n") | 228 self.s2w_w[worker].write("terminate\n") |
| 230 break | 229 break |
| OLD | NEW |