Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(205)

Side by Side Diff: client/tests/kvm/kvm_scheduler.py

Issue 6539001: Merge remote branch 'cros/upstream' into master. (Closed) Base URL: ssh://git@gitrw.chromium.org:9222/autotest.git@master
Patch Set: patch Created 9 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « client/tests/kvm/kvm_preprocessing.py ('k') | client/tests/kvm/kvm_utils.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 import os, select 1 import os, select
2 import kvm_utils, kvm_vm, kvm_subprocess 2 import kvm_utils, kvm_vm, kvm_subprocess
3 3
4 4
5 class scheduler: 5 class scheduler:
6 """ 6 """
7 A scheduler that manages several parallel test execution pipelines on a 7 A scheduler that manages several parallel test execution pipelines on a
8 single host. 8 single host.
9 """ 9 """
10 10
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
56 while True: 56 while True:
57 cmd = r.readline().split() 57 cmd = r.readline().split()
58 if not cmd: 58 if not cmd:
59 continue 59 continue
60 60
61 # The scheduler wants this worker to run a test 61 # The scheduler wants this worker to run a test
62 if cmd[0] == "run": 62 if cmd[0] == "run":
63 test_index = int(cmd[1]) 63 test_index = int(cmd[1])
64 test = self.tests[test_index].copy() 64 test = self.tests[test_index].copy()
65 test.update(self_dict) 65 test.update(self_dict)
66 test = kvm_utils.get_sub_pool(test, index, self.num_workers)
67 test_iterations = int(test.get("iterations", 1)) 66 test_iterations = int(test.get("iterations", 1))
68 status = run_test_func("kvm", params=test, 67 status = run_test_func("kvm", params=test,
69 tag=test.get("shortname"), 68 tag=test.get("shortname"),
70 iterations=test_iterations) 69 iterations=test_iterations)
71 w.write("done %s %s\n" % (test_index, status)) 70 w.write("done %s %s\n" % (test_index, status))
72 w.write("ready\n") 71 w.write("ready\n")
73 72
74 # The scheduler wants this worker to free its used resources 73 # The scheduler wants this worker to free its used resources
75 elif cmd[0] == "cleanup": 74 elif cmd[0] == "cleanup":
76 env_filename = os.path.join(self.bindir, self_dict["env"]) 75 env_filename = os.path.join(self.bindir, self_dict["env"])
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
122 121
123 # A worker completed a test 122 # A worker completed a test
124 elif msg[0] == "done": 123 elif msg[0] == "done":
125 test_index = int(msg[1]) 124 test_index = int(msg[1])
126 test = self.tests[test_index] 125 test = self.tests[test_index]
127 status = int(eval(msg[2])) 126 status = int(eval(msg[2]))
128 test_status[test_index] = ("fail", "pass")[status] 127 test_status[test_index] = ("fail", "pass")[status]
129 # If the test failed, mark all dependent tests as "failed" t oo 128 # If the test failed, mark all dependent tests as "failed" t oo
130 if not status: 129 if not status:
131 for i, other_test in enumerate(self.tests): 130 for i, other_test in enumerate(self.tests):
132 for dep in other_test.get("depend", []): 131 for dep in other_test.get("dep", []):
133 if dep in test["name"]: 132 if dep in test["name"]:
134 test_status[i] = "fail" 133 test_status[i] = "fail"
135 134
136 # A worker is done shutting down its VMs and other processes 135 # A worker is done shutting down its VMs and other processes
137 elif msg[0] == "cleanup_done": 136 elif msg[0] == "cleanup_done":
138 used_cpus[worker_index] = 0 137 used_cpus[worker_index] = 0
139 used_mem[worker_index] = 0 138 used_mem[worker_index] = 0
140 closing_workers.remove(worker_index) 139 closing_workers.remove(worker_index)
141 140
142 if not someone_is_ready: 141 if not someone_is_ready:
143 continue 142 continue
144 143
145 for worker in idle_workers[:]: 144 for worker in idle_workers[:]:
146 # Find a test for this worker 145 # Find a test for this worker
147 test_found = False 146 test_found = False
148 for i, test in enumerate(self.tests): 147 for i, test in enumerate(self.tests):
149 # We only want "waiting" tests 148 # We only want "waiting" tests
150 if test_status[i] != "waiting": 149 if test_status[i] != "waiting":
151 continue 150 continue
152 # Make sure the test isn't assigned to another worker 151 # Make sure the test isn't assigned to another worker
153 if test_worker[i] is not None and test_worker[i] != worker: 152 if test_worker[i] is not None and test_worker[i] != worker:
154 continue 153 continue
155 # Make sure the test's dependencies are satisfied 154 # Make sure the test's dependencies are satisfied
156 dependencies_satisfied = True 155 dependencies_satisfied = True
157 for dep in test["depend"]: 156 for dep in test["dep"]:
158 dependencies = [j for j, t in enumerate(self.tests) 157 dependencies = [j for j, t in enumerate(self.tests)
159 if dep in t["name"]] 158 if dep in t["name"]]
160 bad_status_deps = [j for j in dependencies 159 bad_status_deps = [j for j in dependencies
161 if test_status[j] != "pass"] 160 if test_status[j] != "pass"]
162 if bad_status_deps: 161 if bad_status_deps:
163 dependencies_satisfied = False 162 dependencies_satisfied = False
164 break 163 break
165 if not dependencies_satisfied: 164 if not dependencies_satisfied:
166 continue 165 continue
167 # Make sure we have enough resources to run the test 166 # Make sure we have enough resources to run the test
(...skipping 25 matching lines...) Expand all
193 continue 192 continue
194 # Everything is OK -- run the test 193 # Everything is OK -- run the test
195 test_status[i] = "running" 194 test_status[i] = "running"
196 test_worker[i] = worker 195 test_worker[i] = worker
197 idle_workers.remove(worker) 196 idle_workers.remove(worker)
198 # Update used_cpus and used_mem 197 # Update used_cpus and used_mem
199 used_cpus[worker] = test_used_cpus 198 used_cpus[worker] = test_used_cpus
200 used_mem[worker] = test_used_mem 199 used_mem[worker] = test_used_mem
201 # Assign all related tests to this worker 200 # Assign all related tests to this worker
202 for j, other_test in enumerate(self.tests): 201 for j, other_test in enumerate(self.tests):
203 for other_dep in other_test["depend"]: 202 for other_dep in other_test["dep"]:
204 # All tests that depend on this test 203 # All tests that depend on this test
205 if other_dep in test["name"]: 204 if other_dep in test["name"]:
206 test_worker[j] = worker 205 test_worker[j] = worker
207 break 206 break
208 # ... and all tests that share a dependency 207 # ... and all tests that share a dependency
209 # with this test 208 # with this test
210 for dep in test["depend"]: 209 for dep in test["dep"]:
211 if dep in other_dep or other_dep in dep: 210 if dep in other_dep or other_dep in dep:
212 test_worker[j] = worker 211 test_worker[j] = worker
213 break 212 break
214 # Tell the worker to run the test 213 # Tell the worker to run the test
215 self.s2w_w[worker].write("run %s\n" % i) 214 self.s2w_w[worker].write("run %s\n" % i)
216 break 215 break
217 216
218 # If there won't be any tests for this worker to run soon, tell 217 # If there won't be any tests for this worker to run soon, tell
219 # the worker to free its used resources 218 # the worker to free its used resources
220 if not test_found and (used_cpus[worker] or used_mem[worker]): 219 if not test_found and (used_cpus[worker] or used_mem[worker]):
221 self.s2w_w[worker].write("cleanup\n") 220 self.s2w_w[worker].write("cleanup\n")
222 idle_workers.remove(worker) 221 idle_workers.remove(worker)
223 closing_workers.append(worker) 222 closing_workers.append(worker)
224 223
225 # If there are no more new tests to run, terminate the workers and 224 # If there are no more new tests to run, terminate the workers and
226 # the scheduler 225 # the scheduler
227 if len(idle_workers) == self.num_workers: 226 if len(idle_workers) == self.num_workers:
228 for worker in idle_workers: 227 for worker in idle_workers:
229 self.s2w_w[worker].write("terminate\n") 228 self.s2w_w[worker].write("terminate\n")
230 break 229 break
OLDNEW
« no previous file with comments | « client/tests/kvm/kvm_preprocessing.py ('k') | client/tests/kvm/kvm_utils.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698