| OLD | NEW |
| 1 #!/usr/bin/python2.6 | 1 #!/usr/bin/python2.6 |
| 2 # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. | 2 # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. |
| 3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
| 4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
| 5 | 5 |
| 6 """Program to run emerge in parallel, for significant speedup. | 6 """Program to run emerge in parallel, for significant speedup. |
| 7 | 7 |
| 8 Usage: | 8 Usage: |
| 9 ./parallel_emerge [--board=BOARD] [--workon=PKGS] [--no-workon-deps] | 9 ./parallel_emerge [--board=BOARD] [--workon=PKGS] [--no-workon-deps] |
| 10 [emerge args] package" | 10 [emerge args] package" |
| (...skipping 1101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1112 # printed output yet, this starts at zero. | 1112 # printed output yet, this starts at zero. |
| 1113 self.last_output_timestamp = 0 | 1113 self.last_output_timestamp = 0 |
| 1114 | 1114 |
| 1115 # The return code of our job, if the job is actually finished. | 1115 # The return code of our job, if the job is actually finished. |
| 1116 self.retcode = retcode | 1116 self.retcode = retcode |
| 1117 | 1117 |
| 1118 # The timestamp when our job started. | 1118 # The timestamp when our job started. |
| 1119 self.start_timestamp = start_timestamp | 1119 self.start_timestamp = start_timestamp |
| 1120 | 1120 |
| 1121 | 1121 |
| 1122 def EmergeWorker(task_queue, job_queue, emerge, package_db): | 1122 def SetupWorkerSignals(): |
| 1123 """This worker emerges any packages given to it on the task_queue. | |
| 1124 | |
| 1125 Args: | |
| 1126 task_queue: The queue of tasks for this worker to do. | |
| 1127 job_queue: The queue of results from the worker. | |
| 1128 emerge: An EmergeData() object. | |
| 1129 package_db: A dict, mapping package ids to portage Package objects. | |
| 1130 | |
| 1131 It expects package identifiers to be passed to it via task_queue. When | |
| 1132 a task is started, it pushes the (target, filename) to the started_queue. | |
| 1133 The output is stored in filename. When a merge starts or finishes, we push | |
| 1134 EmergeJobState objects to the job_queue. | |
| 1135 """ | |
| 1136 | |
| 1137 def ExitHandler(signum, frame): | 1123 def ExitHandler(signum, frame): |
| 1138 # Remove our signal handlers so we don't get called recursively. | 1124 # Remove our signal handlers so we don't get called recursively. |
| 1139 signal.signal(signal.SIGINT, signal.SIG_DFL) | 1125 signal.signal(signal.SIGINT, signal.SIG_DFL) |
| 1140 signal.signal(signal.SIGTERM, signal.SIG_DFL) | 1126 signal.signal(signal.SIGTERM, signal.SIG_DFL) |
| 1141 | 1127 |
| 1142 # Try to exit cleanly | 1128 # Try to exit cleanly |
| 1143 sys.exit(1) | 1129 sys.exit(1) |
| 1144 | 1130 |
| 1145 # Ensure that we exit quietly and cleanly, if possible, when we receive | 1131 # Ensure that we exit quietly and cleanly, if possible, when we receive |
| 1146 # SIGTERM or SIGINT signals. By default, when the user hits CTRL-C, all | 1132 # SIGTERM or SIGINT signals. By default, when the user hits CTRL-C, all |
| 1147 # of the child processes will print details about KeyboardInterrupt | 1133 # of the child processes will print details about KeyboardInterrupt |
| 1148 # exceptions, which isn't very helpful. | 1134 # exceptions, which isn't very helpful. |
| 1149 signal.signal(signal.SIGINT, ExitHandler) | 1135 signal.signal(signal.SIGINT, ExitHandler) |
| 1150 signal.signal(signal.SIGTERM, ExitHandler) | 1136 signal.signal(signal.SIGTERM, ExitHandler) |
| 1151 | 1137 |
| 1138 |
| 1139 def EmergeWorker(task_queue, job_queue, emerge, package_db): |
| 1140 """This worker emerges any packages given to it on the task_queue. |
| 1141 |
| 1142 Args: |
| 1143 task_queue: The queue of tasks for this worker to do. |
| 1144 job_queue: The queue of results from the worker. |
| 1145 emerge: An EmergeData() object. |
| 1146 package_db: A dict, mapping package ids to portage Package objects. |
| 1147 |
| 1148 It expects package identifiers to be passed to it via task_queue. When |
| 1149 a task is started, it pushes the (target, filename) to the started_queue. |
| 1150 The output is stored in filename. When a merge starts or finishes, we push |
| 1151 EmergeJobState objects to the job_queue. |
| 1152 """ |
| 1153 |
| 1154 SetupWorkerSignals() |
| 1152 settings, trees, mtimedb = emerge.settings, emerge.trees, emerge.mtimedb | 1155 settings, trees, mtimedb = emerge.settings, emerge.trees, emerge.mtimedb |
| 1153 opts, spinner = emerge.opts, emerge.spinner | 1156 opts, spinner = emerge.opts, emerge.spinner |
| 1154 opts["--nodeps"] = True | 1157 opts["--nodeps"] = True |
| 1155 while True: | 1158 while True: |
| 1156 # Wait for a new item to show up on the queue. This is a blocking wait, | 1159 # Wait for a new item to show up on the queue. This is a blocking wait, |
| 1157 # so if there's nothing to do, we just sit here. | 1160 # so if there's nothing to do, we just sit here. |
| 1158 target = task_queue.get() | 1161 target = task_queue.get() |
| 1159 db_pkg = package_db[target] | 1162 db_pkg = package_db[target] |
| 1160 db_pkg.root_config = emerge.root_config | 1163 db_pkg.root_config = emerge.root_config |
| 1161 install_list = [db_pkg] | 1164 install_list = [db_pkg] |
| (...skipping 21 matching lines...) Expand all Loading... |
| 1183 sys.stderr = save_stderr | 1186 sys.stderr = save_stderr |
| 1184 output.close() | 1187 output.close() |
| 1185 if retcode is None: | 1188 if retcode is None: |
| 1186 retcode = 0 | 1189 retcode = 0 |
| 1187 | 1190 |
| 1188 job = EmergeJobState(target, pkgname, True, output.name, start_timestamp, | 1191 job = EmergeJobState(target, pkgname, True, output.name, start_timestamp, |
| 1189 retcode) | 1192 retcode) |
| 1190 job_queue.put(job) | 1193 job_queue.put(job) |
| 1191 | 1194 |
| 1192 | 1195 |
| 1196 class LinePrinter(object): |
| 1197 """Helper object to print a single line.""" |
| 1198 |
| 1199 def __init__(self, line): |
| 1200 self.line = line |
| 1201 |
| 1202 def Print(self, seek_locations): |
| 1203 print self.line |
| 1204 |
| 1205 |
| 1206 class JobPrinter(object): |
| 1207 """Helper object to print output of a job.""" |
| 1208 |
| 1209 def __init__(self, job, unlink=False): |
| 1210 """Print output of job. |
| 1211 |
| 1212 If unlink is True, unlink the job output file when done.""" |
| 1213 self.current_time = time.time() |
| 1214 self.job = job |
| 1215 self.unlink = unlink |
| 1216 |
| 1217 def Print(self, seek_locations): |
| 1218 |
| 1219 job = self.job |
| 1220 |
| 1221 # Calculate how long the job has been running. |
| 1222 seconds = self.current_time - job.start_timestamp |
| 1223 |
| 1224 # Note that we've printed out the job so far. |
| 1225 job.last_output_timestamp = self.current_time |
| 1226 |
| 1227 # Note that we're starting the job |
| 1228 info = "job %s (%dm%.1fs)" % (job.pkgname, seconds / 60, seconds % 60) |
| 1229 last_output_seek = seek_locations.get(job.filename, 0) |
| 1230 if last_output_seek: |
| 1231 print "=== Continue output for %s ===" % info |
| 1232 else: |
| 1233 print "=== Start output for %s ===" % info |
| 1234 |
| 1235 # Print actual output from job |
| 1236 f = codecs.open(job.filename, encoding='utf-8', errors='replace') |
| 1237 f.seek(last_output_seek) |
| 1238 prefix = job.pkgname + ":" |
| 1239 for line in f: |
| 1240 |
| 1241 # Save off our position in the file |
| 1242 if line and line[-1] == "\n": |
| 1243 last_output_seek = f.tell() |
| 1244 line = line[:-1] |
| 1245 |
| 1246 # Print our line |
| 1247 print prefix, line.encode('utf-8', 'replace') |
| 1248 f.close() |
| 1249 |
| 1250 # Save our last spot in the file so that we don't print out the same |
| 1251 # location twice. |
| 1252 seek_locations[job.filename] = last_output_seek |
| 1253 |
| 1254 # Note end of output section |
| 1255 if job.done: |
| 1256 print "=== Complete: %s ===" % info |
| 1257 else: |
| 1258 print "=== Still running: %s ===" % info |
| 1259 |
| 1260 if self.unlink: |
| 1261 os.unlink(job.filename) |
| 1262 |
| 1263 |
| 1264 def PrintWorker(queue): |
| 1265 """A worker that prints stuff to the screen as requested.""" |
| 1266 SetupWorkerSignals() |
| 1267 seek_locations = {} |
| 1268 while True: |
| 1269 job = queue.get() |
| 1270 if job: |
| 1271 job.Print(seek_locations) |
| 1272 else: |
| 1273 break |
| 1274 |
| 1275 |
| 1193 class EmergeQueue(object): | 1276 class EmergeQueue(object): |
| 1194 """Class to schedule emerge jobs according to a dependency graph.""" | 1277 """Class to schedule emerge jobs according to a dependency graph.""" |
| 1195 | 1278 |
| 1196 def __init__(self, deps_map, emerge, package_db, show_output): | 1279 def __init__(self, deps_map, emerge, package_db, show_output): |
| 1197 # Store the dependency graph. | 1280 # Store the dependency graph. |
| 1198 self._deps_map = deps_map | 1281 self._deps_map = deps_map |
| 1199 # Initialize the running queue to empty | 1282 # Initialize the running queue to empty |
| 1200 self._jobs = {} | 1283 self._jobs = {} |
| 1201 # List of total package installs represented in deps_map. | 1284 # List of total package installs represented in deps_map. |
| 1202 install_jobs = [x for x in deps_map if deps_map[x]["action"] == "merge"] | 1285 install_jobs = [x for x in deps_map if deps_map[x]["action"] == "merge"] |
| 1203 self._total_jobs = len(install_jobs) | 1286 self._total_jobs = len(install_jobs) |
| 1204 self._show_output = show_output | 1287 self._show_output = show_output |
| 1205 | 1288 |
| 1206 if "--pretend" in emerge.opts: | 1289 if "--pretend" in emerge.opts: |
| 1207 print "Skipping merge because of --pretend mode." | 1290 print "Skipping merge because of --pretend mode." |
| 1208 sys.exit(0) | 1291 sys.exit(0) |
| 1209 | 1292 |
| 1210 # Setup scheduler graph object. This is used by the child processes | 1293 # Setup scheduler graph object. This is used by the child processes |
| 1211 # to help schedule jobs. | 1294 # to help schedule jobs. |
| 1212 emerge.scheduler_graph = emerge.depgraph.schedulerGraph() | 1295 emerge.scheduler_graph = emerge.depgraph.schedulerGraph() |
| 1213 | 1296 |
| 1214 procs = min(self._total_jobs, | 1297 procs = min(self._total_jobs, |
| 1215 emerge.opts.get("--jobs", multiprocessing.cpu_count())) | 1298 emerge.opts.get("--jobs", multiprocessing.cpu_count())) |
| 1216 self._emerge_queue = multiprocessing.Queue() | 1299 self._emerge_queue = multiprocessing.Queue() |
| 1217 self._job_queue = multiprocessing.Queue() | 1300 self._job_queue = multiprocessing.Queue() |
| 1301 self._print_queue = multiprocessing.Queue() |
| 1218 args = (self._emerge_queue, self._job_queue, emerge, package_db) | 1302 args = (self._emerge_queue, self._job_queue, emerge, package_db) |
| 1219 self._pool = multiprocessing.Pool(procs, EmergeWorker, args) | 1303 self._pool = multiprocessing.Pool(procs, EmergeWorker, args) |
| 1304 self._print_worker = multiprocessing.Process(target=PrintWorker, |
| 1305 args=[self._print_queue]) |
| 1306 self._print_worker.start() |
| 1220 | 1307 |
| 1221 # Initialize the failed queue to empty. | 1308 # Initialize the failed queue to empty. |
| 1222 self._retry_queue = [] | 1309 self._retry_queue = [] |
| 1223 self._failed = set() | 1310 self._failed = set() |
| 1224 | 1311 |
| 1225 # Print an update before we launch the merges. | 1312 # Print an update before we launch the merges. |
| 1226 self._Status() | 1313 self._Status() |
| 1227 | 1314 |
| 1228 # Setup an exit handler so that we print nice messages if we are | 1315 # Setup an exit handler so that we print nice messages if we are |
| 1229 # terminated. | 1316 # terminated. |
| 1230 self._SetupExitHandler() | 1317 self._SetupExitHandler() |
| 1231 | 1318 |
| 1232 # Schedule our jobs. | 1319 # Schedule our jobs. |
| 1233 for target, info in deps_map.items(): | 1320 for target, info in deps_map.items(): |
| 1234 if not info["needs"]: | 1321 if not info["needs"]: |
| 1235 self._Schedule(target) | 1322 self._Schedule(target) |
| 1236 | 1323 |
| 1237 def _SetupExitHandler(self): | 1324 def _SetupExitHandler(self): |
| 1238 | 1325 |
| 1239 def ExitHandler(signum, frame): | 1326 def ExitHandler(signum, frame): |
| 1240 | 1327 |
| 1241 # Kill our signal handlers so we don't get called recursively | 1328 # Kill our signal handlers so we don't get called recursively |
| 1242 signal.signal(signal.SIGINT, signal.SIG_DFL) | 1329 signal.signal(signal.SIGINT, signal.SIG_DFL) |
| 1243 signal.signal(signal.SIGTERM, signal.SIG_DFL) | 1330 signal.signal(signal.SIGTERM, signal.SIG_DFL) |
| 1244 | 1331 |
| 1245 # Print our current job status | 1332 # Print our current job status |
| 1246 for target, job in self._jobs.iteritems(): | 1333 for target, job in self._jobs.iteritems(): |
| 1247 if job: | 1334 if job: |
| 1248 self._PrintJob(job) | 1335 self._print_queue.put(JobPrinter(job, unlink=True)) |
| 1249 os.unlink(job.filename) | |
| 1250 | 1336 |
| 1251 # Notify the user that we are exiting | 1337 # Notify the user that we are exiting |
| 1252 print "Exiting on signal %s" % signum | 1338 self._Print("Exiting on signal %s" % signum) |
| 1339 |
| 1340 # Exit when print worker is done. |
| 1341 self._print_queue.put(None) |
| 1342 self._print_worker.join() |
| 1253 sys.exit(1) | 1343 sys.exit(1) |
| 1254 | 1344 |
| 1255 # Print out job status when we are killed | 1345 # Print out job status when we are killed |
| 1256 signal.signal(signal.SIGINT, ExitHandler) | 1346 signal.signal(signal.SIGINT, ExitHandler) |
| 1257 signal.signal(signal.SIGTERM, ExitHandler) | 1347 signal.signal(signal.SIGTERM, ExitHandler) |
| 1258 | 1348 |
| 1259 def _Schedule(self, target): | 1349 def _Schedule(self, target): |
| 1260 # We maintain a tree of all deps, if this doesn't need | 1350 # We maintain a tree of all deps, if this doesn't need |
| 1261 # to be installed just free up it's children and continue. | 1351 # to be installed just free up it's children and continue. |
| 1262 # It is possible to reinstall deps of deps, without reinstalling | 1352 # It is possible to reinstall deps of deps, without reinstalling |
| 1263 # first level deps, like so: | 1353 # first level deps, like so: |
| 1264 # chromeos (merge) -> eselect (nomerge) -> python (merge) | 1354 # chromeos (merge) -> eselect (nomerge) -> python (merge) |
| 1265 if self._deps_map[target]["action"] == "nomerge": | 1355 if self._deps_map[target]["action"] == "nomerge": |
| 1266 self._Finish(target) | 1356 self._Finish(target) |
| 1267 else: | 1357 else: |
| 1268 # Kick off the build if it's marked to be built. | 1358 # Kick off the build if it's marked to be built. |
| 1269 self._jobs[target] = None | 1359 self._jobs[target] = None |
| 1270 self._emerge_queue.put(target) | 1360 self._emerge_queue.put(target) |
| 1271 | 1361 |
| 1272 def _LoadAvg(self): | 1362 def _LoadAvg(self): |
| 1273 loads = open("/proc/loadavg", "r").readline().split()[:3] | 1363 loads = open("/proc/loadavg", "r").readline().split()[:3] |
| 1274 return " ".join(loads) | 1364 return " ".join(loads) |
| 1275 | 1365 |
| 1276 def _PrintJob(self, job): | 1366 def _Print(self, line): |
| 1277 """Print output so far of specified job""" | 1367 """Print a single line.""" |
| 1278 | 1368 self._print_queue.put(LinePrinter(line)) |
| 1279 # Calculate how long the job has been running. | |
| 1280 current_time = time.time() | |
| 1281 seconds = current_time - job.start_timestamp | |
| 1282 | |
| 1283 # Note that we've printed out the job so far. | |
| 1284 job.last_output_timestamp = current_time | |
| 1285 | |
| 1286 # Note that we're starting the job | |
| 1287 info = "job %s (%dm%.1fs) ===" % (job.pkgname, seconds / 60, seconds % 60) | |
| 1288 if job.last_output_seek: | |
| 1289 print "=== Continue output for %s " % info | |
| 1290 else: | |
| 1291 print "=== Start output for %s ===" % info | |
| 1292 | |
| 1293 # Print actual output from job | |
| 1294 f = codecs.open(job.filename, encoding='utf-8', errors='replace') | |
| 1295 f.seek(job.last_output_seek) | |
| 1296 prefix = job.pkgname + ":" | |
| 1297 for line in f: | |
| 1298 | |
| 1299 # Save off our position in the file | |
| 1300 if line and line[-1] == "\n": | |
| 1301 job.last_output_seek = f.tell() | |
| 1302 line = line[:-1] | |
| 1303 | |
| 1304 # Print our line | |
| 1305 print prefix, line.encode('utf-8', 'replace') | |
| 1306 f.close() | |
| 1307 | |
| 1308 # Note end of output section | |
| 1309 if job.done: | |
| 1310 print "=== Complete: %s ===" % info | |
| 1311 else: | |
| 1312 print "=== Still running: %s ===" % info | |
| 1313 | |
| 1314 | 1369 |
| 1315 def _Status(self): | 1370 def _Status(self): |
| 1316 """Print status.""" | 1371 """Print status.""" |
| 1317 current_time = time.time() | 1372 current_time = time.time() |
| 1318 seconds = current_time - GLOBAL_START | 1373 seconds = current_time - GLOBAL_START |
| 1319 line = ("Pending %s, Ready %s, Running %s, Retrying %s, Total %s " | 1374 line = ("Pending %s, Ready %s, Running %s, Retrying %s, Total %s " |
| 1320 "[Time %dm%.1fs Load %s]") | 1375 "[Time %dm%.1fs Load %s]") |
| 1321 qsize = self._emerge_queue.qsize() | 1376 qsize = self._emerge_queue.qsize() |
| 1322 print line % (len(self._deps_map), qsize, len(self._jobs) - qsize, | 1377 self._Print(line % (len(self._deps_map), qsize, len(self._jobs) - qsize, |
| 1323 len(self._retry_queue), self._total_jobs, | 1378 len(self._retry_queue), self._total_jobs, |
| 1324 seconds / 60, seconds % 60, self._LoadAvg()) | 1379 seconds / 60, seconds % 60, self._LoadAvg())) |
| 1325 | 1380 |
| 1326 # Print interim output every minute if --show-output is used. Otherwise, | 1381 # Print interim output every minute if --show-output is used. Otherwise, |
| 1327 # only print output if a job has been running for 60 minutes or more. | 1382 # only print output if a job has been running for 60 minutes or more. |
| 1328 if self._show_output: | 1383 if self._show_output: |
| 1329 interval = 60 | 1384 interval = 60 |
| 1330 else: | 1385 else: |
| 1331 interval = 60 * 60 | 1386 interval = 60 * 60 |
| 1332 for target, job in self._jobs.iteritems(): | 1387 for target, job in self._jobs.iteritems(): |
| 1333 if job: | 1388 if job: |
| 1334 last_timestamp = max(job.start_timestamp, job.last_output_timestamp) | 1389 last_timestamp = max(job.start_timestamp, job.last_output_timestamp) |
| 1335 if last_timestamp + interval < current_time: | 1390 if last_timestamp + interval < current_time: |
| 1336 self._PrintJob(job) | 1391 self._print_queue.put(JobPrinter(job)) |
| 1392 job.last_output_timestamp = current_time |
| 1337 | 1393 |
| 1338 def _Finish(self, target): | 1394 def _Finish(self, target): |
| 1339 """Mark a target as completed and unblock dependecies.""" | 1395 """Mark a target as completed and unblock dependecies.""" |
| 1340 for dep in self._deps_map[target]["provides"]: | 1396 for dep in self._deps_map[target]["provides"]: |
| 1341 del self._deps_map[dep]["needs"][target] | 1397 del self._deps_map[dep]["needs"][target] |
| 1342 if not self._deps_map[dep]["needs"]: | 1398 if not self._deps_map[dep]["needs"]: |
| 1343 self._Schedule(dep) | 1399 self._Schedule(dep) |
| 1344 self._deps_map.pop(target) | 1400 self._deps_map.pop(target) |
| 1345 | 1401 |
| 1346 def _Retry(self): | 1402 def _Retry(self): |
| 1347 if self._retry_queue: | 1403 if self._retry_queue: |
| 1348 target = self._retry_queue.pop(0) | 1404 target = self._retry_queue.pop(0) |
| 1349 self._Schedule(target) | 1405 self._Schedule(target) |
| 1350 print "Retrying emerge of %s." % target | 1406 self._Print("Retrying emerge of %s." % target) |
| 1351 | 1407 |
| 1352 def Run(self): | 1408 def Run(self): |
| 1353 """Run through the scheduled ebuilds. | 1409 """Run through the scheduled ebuilds. |
| 1354 | 1410 |
| 1355 Keep running so long as we have uninstalled packages in the | 1411 Keep running so long as we have uninstalled packages in the |
| 1356 dependency graph to merge. | 1412 dependency graph to merge. |
| 1357 """ | 1413 """ |
| 1358 while self._deps_map: | 1414 while self._deps_map: |
| 1359 # Check here that we are actually waiting for something. | 1415 # Check here that we are actually waiting for something. |
| 1360 if (self._emerge_queue.empty() and | 1416 if (self._emerge_queue.empty() and |
| 1361 self._job_queue.empty() and | 1417 self._job_queue.empty() and |
| 1362 not self._jobs and | 1418 not self._jobs and |
| 1363 self._deps_map): | 1419 self._deps_map): |
| 1364 # If we have failed on a package, retry it now. | 1420 # If we have failed on a package, retry it now. |
| 1365 if self._retry_queue: | 1421 if self._retry_queue: |
| 1366 self._Retry() | 1422 self._Retry() |
| 1367 # If we have failed a package twice, just give up. | 1423 else: |
| 1368 elif self._failed: | 1424 # Tell the print worker we're done, and wait for it to exit. |
| 1369 for failure in self._failed: | 1425 self._print_queue.put(None) |
| 1370 print "Package failed: %s" % failure | 1426 self._print_worker.join() |
| 1427 |
| 1428 # The dependency map is helpful for debugging failures. |
| 1371 PrintDepsMap(self._deps_map) | 1429 PrintDepsMap(self._deps_map) |
| 1372 print "Packages failed: %s" % " ,".join(self._failed) | 1430 |
| 1373 sys.exit(1) | 1431 # Tell the user why we're exiting. |
| 1374 # If we have dependency cycles. | 1432 if self._failed: |
| 1375 else: | 1433 print "Packages failed: %s" % " ,".join(self._failed) |
| 1376 print "Deadlock! Circular dependencies!" | 1434 else: |
| 1377 PrintDepsMap(self._deps_map) | 1435 print "Deadlock! Circular dependencies!" |
| 1378 sys.exit(1) | 1436 sys.exit(1) |
| 1379 | 1437 |
| 1380 try: | 1438 try: |
| 1381 job = self._job_queue.get(timeout=5) | 1439 job = self._job_queue.get(timeout=5) |
| 1382 except Queue.Empty: | 1440 except Queue.Empty: |
| 1383 # Print an update. | 1441 # Print an update. |
| 1384 self._Status() | 1442 self._Status() |
| 1385 continue | 1443 continue |
| 1386 | 1444 |
| 1387 target = job.target | 1445 target = job.target |
| 1388 | 1446 |
| 1389 if not job.done: | 1447 if not job.done: |
| 1390 self._jobs[target] = job | 1448 self._jobs[target] = job |
| 1391 print "Started %s (logged in %s)" % (target, job.filename) | 1449 self._Print("Started %s (logged in %s)" % (target, job.filename)) |
| 1392 continue | 1450 continue |
| 1393 | 1451 |
| 1394 # Print output of job | 1452 # Print output of job |
| 1395 if self._show_output or job.retcode != 0: | 1453 if self._show_output or job.retcode != 0: |
| 1396 self._PrintJob(job) | 1454 self._print_queue.put(JobPrinter(job, unlink=True)) |
| 1397 os.unlink(job.filename) | 1455 else: |
| 1456 os.unlink(job.filename) |
| 1398 del self._jobs[target] | 1457 del self._jobs[target] |
| 1399 | 1458 |
| 1400 seconds = time.time() - job.start_timestamp | 1459 seconds = time.time() - job.start_timestamp |
| 1401 details = "%s (in %dm%.1fs)" % (target, seconds / 60, seconds % 60) | 1460 details = "%s (in %dm%.1fs)" % (target, seconds / 60, seconds % 60) |
| 1402 | 1461 |
| 1403 # Complain if necessary. | 1462 # Complain if necessary. |
| 1404 if job.retcode != 0: | 1463 if job.retcode != 0: |
| 1405 # Handle job failure. | 1464 # Handle job failure. |
| 1406 if target in self._failed: | 1465 if target in self._failed: |
| 1407 # If this job has failed previously, give up. | 1466 # If this job has failed previously, give up. |
| 1408 print "Failed %s. Your build has failed." % details | 1467 self._Print("Failed %s. Your build has failed." % details) |
| 1409 else: | 1468 else: |
| 1410 # Queue up this build to try again after a long while. | 1469 # Queue up this build to try again after a long while. |
| 1411 self._retry_queue.append(target) | 1470 self._retry_queue.append(target) |
| 1412 self._failed.add(target) | 1471 self._failed.add(target) |
| 1413 print "Failed %s, retrying later." % details | 1472 self._Print("Failed %s, retrying later." % details) |
| 1414 else: | 1473 else: |
| 1415 if target in self._failed and self._retry_queue: | 1474 if target in self._failed and self._retry_queue: |
| 1416 # If we have successfully retried a failed package, and there | 1475 # If we have successfully retried a failed package, and there |
| 1417 # are more failed packages, try the next one. We will only have | 1476 # are more failed packages, try the next one. We will only have |
| 1418 # one retrying package actively running at a time. | 1477 # one retrying package actively running at a time. |
| 1419 self._Retry() | 1478 self._Retry() |
| 1420 | 1479 |
| 1421 print "Completed %s" % details | 1480 self._Print("Completed %s" % details) |
| 1422 # Mark as completed and unblock waiting ebuilds. | 1481 # Mark as completed and unblock waiting ebuilds. |
| 1423 self._Finish(target) | 1482 self._Finish(target) |
| 1424 | 1483 |
| 1425 # Print an update. | 1484 # Print an update. |
| 1426 self._Status() | 1485 self._Status() |
| 1427 | 1486 |
| 1487 # Tell the print worker we're done, and wait for it to exit. |
| 1488 self._print_queue.put(None) |
| 1489 self._print_worker.join() |
| 1490 |
| 1428 | 1491 |
| 1429 def main(): | 1492 def main(): |
| 1430 | 1493 |
| 1431 deps = DepGraphGenerator() | 1494 deps = DepGraphGenerator() |
| 1432 deps.Initialize(sys.argv[1:]) | 1495 deps.Initialize(sys.argv[1:]) |
| 1433 emerge = deps.emerge | 1496 emerge = deps.emerge |
| 1434 | 1497 |
| 1435 if emerge.action is not None: | 1498 if emerge.action is not None: |
| 1436 sys.argv = deps.ParseParallelEmergeArgs(sys.argv) | 1499 sys.argv = deps.ParseParallelEmergeArgs(sys.argv) |
| 1437 sys.exit(emerge_main()) | 1500 sys.exit(emerge_main()) |
| (...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1487 world_set.update(new_world_pkgs) | 1550 world_set.update(new_world_pkgs) |
| 1488 | 1551 |
| 1489 # Update environment (library cache, symlinks, etc.) | 1552 # Update environment (library cache, symlinks, etc.) |
| 1490 if deps.board and "--pretend" not in emerge.opts: | 1553 if deps.board and "--pretend" not in emerge.opts: |
| 1491 portage.env_update() | 1554 portage.env_update() |
| 1492 | 1555 |
| 1493 print "Done" | 1556 print "Done" |
| 1494 | 1557 |
| 1495 if __name__ == "__main__": | 1558 if __name__ == "__main__": |
| 1496 main() | 1559 main() |
| OLD | NEW |