Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(506)

Unified Diff: parallel_emerge

Issue 3010056: Add --show-output option to parallel_emerge which prints output from jobs. (Closed) Base URL: ssh://git@chromiumos-git/crosutils.git
Patch Set: Update comment Created 10 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: parallel_emerge
diff --git a/parallel_emerge b/parallel_emerge
index 35c089cdf5fcd0b3c004c7be2e34b25f19462aeb..af552543feb45b5057ca3bdcdb2ed9c714d1f6c7 100755
--- a/parallel_emerge
+++ b/parallel_emerge
@@ -38,14 +38,17 @@ Basic operation:
of the same package for a runtime dep).
"""
+import codecs
import copy
import multiprocessing
import os
import Queue
import shlex
+import signal
import sys
import tempfile
import time
+import traceback
import urllib2
# If PORTAGE_USERNAME isn't specified, scrape it from the $HOME variable. On
@@ -214,7 +217,7 @@ class DepGraphGenerator(object):
"""
__slots__ = ["board", "emerge", "mandatory_source", "no_workon_deps",
- "package_db", "rebuild"]
+ "package_db", "rebuild", "show_output"]
def __init__(self):
self.board = None
@@ -223,6 +226,7 @@ class DepGraphGenerator(object):
self.no_workon_deps = False
self.package_db = {}
self.rebuild = False
+ self.show_output = False
def ParseParallelEmergeArgs(self, argv):
"""Read the parallel emerge arguments from the command-line.
@@ -249,6 +253,8 @@ class DepGraphGenerator(object):
self.no_workon_deps = True
elif arg == "--rebuild":
self.rebuild = True
+ elif arg == "--show-output":
+ self.show_output = True
else:
# Not one of our options, so pass through to emerge.
emerge_args.append(arg)
@@ -1067,20 +1073,72 @@ def PrintDepsMap(deps_map):
print " no dependencies"
-def EmergeWorker(task_queue, done_queue, emerge, package_db):
+class EmergeJobState(object):
+ __slots__ = ["done", "filename", "last_output_seek", "last_output_timestamp",
+ "pkgname", "retcode", "start_timestamp", "target"]
+
+ def __init__(self, target, pkgname, done, filename, start_timestamp,
+ retcode=None):
+
+ # The full name of the target we're building (e.g.
+ # chromeos-base/chromeos-0.0.1-r60)
+ self.target = target
+
+ # The short name of the target we're building (e.g. chromeos-0.0.1-r60)
+ self.pkgname = pkgname
+
+ # Whether the job is done. (True if the job is done; false otherwise.)
+ self.done = done
+
+ # The filename where output is currently stored.
+ self.filename = filename
+
+ # The location (in bytes) of the end of the last complete line we printed.
+ # This starts off at zero. We use this to jump to the right place when we
+ # print output from the same ebuild multiple times.
+ self.last_output_seek = 0
+
+ # The timestamp of the last time we printed output. Since we haven't
+ # printed output yet, this starts at zero.
+ self.last_output_timestamp = 0
+
+ # The return code of our job, if the job is actually finished.
+ self.retcode = retcode
+
+ # The timestamp when our job started.
+ self.start_timestamp = start_timestamp
+
+
+def EmergeWorker(task_queue, job_queue, emerge, package_db):
"""This worker emerges any packages given to it on the task_queue.
Args:
task_queue: The queue of tasks for this worker to do.
- done_queue: The queue of results from the worker.
+ job_queue: The queue of results from the worker.
emerge: An EmergeData() object.
package_db: A dict, mapping package ids to portage Package objects.
It expects package identifiers to be passed to it via task_queue. When
- the package is merged, it pushes (target, retval, outputstr) into the
- done_queue.
+ a task is started, it pushes the (target, filename) to the started_queue.
+ The output is stored in filename. When a merge starts or finishes, we push
+ EmergeJobState objects to the job_queue.
"""
+ def ExitHandler(signum, frame):
+ # Remove our signal handlers so we don't get called recursively.
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+
+ # Try to exit cleanly
+ sys.exit(1)
+
+ # Ensure that we exit quietly and cleanly, if possible, when we receive
+ # SIGTERM or SIGINT signals. By default, when the user hits CTRL-C, all
+ # of the child processes will print details about KeyboardInterrupt
+ # exceptions, which isn't very helpful.
+ signal.signal(signal.SIGINT, ExitHandler)
+ signal.signal(signal.SIGTERM, ExitHandler)
+
settings, trees, mtimedb = emerge.settings, emerge.trees, emerge.mtimedb
opts, spinner = emerge.opts, emerge.spinner
opts["--nodeps"] = True
@@ -1088,14 +1146,16 @@ def EmergeWorker(task_queue, done_queue, emerge, package_db):
# Wait for a new item to show up on the queue. This is a blocking wait,
# so if there's nothing to do, we just sit here.
target = task_queue.get()
- print "Emerging", target
db_pkg = package_db[target]
db_pkg.root_config = emerge.root_config
install_list = [db_pkg]
- output = tempfile.TemporaryFile()
- outputstr = ""
+ pkgname = db_pkg.pf
+ output = tempfile.NamedTemporaryFile(prefix=pkgname + "-", delete=False)
+ start_timestamp = time.time()
+ job = EmergeJobState(target, pkgname, False, output.name, start_timestamp)
+ job_queue.put(job)
if "--pretend" in opts:
- retval = 0
+ retcode = 0
else:
save_stdout = sys.stdout
save_stderr = sys.stderr
@@ -1104,30 +1164,34 @@ def EmergeWorker(task_queue, done_queue, emerge, package_db):
sys.stderr = output
scheduler = Scheduler(settings, trees, mtimedb, opts, spinner,
install_list, [], emerge.scheduler_graph)
- retval = scheduler.merge()
+ retcode = scheduler.merge()
+ except Exception:
+ traceback.print_exc(file=output)
+ retcode = 1
finally:
sys.stdout = save_stdout
sys.stderr = save_stderr
- if retval is None:
- retval = 0
- if retval != 0:
- output.seek(0)
- outputstr = output.read()
+ output.close()
+ if retcode is None:
+ retcode = 0
- done_queue.put((target, retval, outputstr))
+ job = EmergeJobState(target, pkgname, True, output.name, start_timestamp,
+ retcode)
+ job_queue.put(job)
class EmergeQueue(object):
"""Class to schedule emerge jobs according to a dependency graph."""
- def __init__(self, deps_map, emerge, package_db):
+ def __init__(self, deps_map, emerge, package_db, show_output):
# Store the dependency graph.
self._deps_map = deps_map
# Initialize the running queue to empty
- self._jobs = set()
+ self._jobs = {}
# List of total package installs represented in deps_map.
install_jobs = [x for x in deps_map if deps_map[x]["action"] == "merge"]
self._total_jobs = len(install_jobs)
+ self._show_output = show_output
if "--pretend" in emerge.opts:
print "Skipping merge because of --pretend mode."
@@ -1140,21 +1204,48 @@ class EmergeQueue(object):
procs = min(self._total_jobs,
emerge.opts.get("--jobs", multiprocessing.cpu_count()))
self._emerge_queue = multiprocessing.Queue()
- self._done_queue = multiprocessing.Queue()
- args = (self._emerge_queue, self._done_queue, emerge, package_db)
+ self._job_queue = multiprocessing.Queue()
+ args = (self._emerge_queue, self._job_queue, emerge, package_db)
self._pool = multiprocessing.Pool(procs, EmergeWorker, args)
# Initialize the failed queue to empty.
self._retry_queue = []
- self._failed = {}
+ self._failed = set()
# Print an update before we launch the merges.
self._Status()
+ # Setup an exit handler so that we print nice messages if we are
+ # terminated.
+ self._SetupExitHandler()
+
+ # Schedule our jobs.
for target, info in deps_map.items():
if not info["needs"]:
self._Schedule(target)
+ def _SetupExitHandler(self):
+
+ def ExitHandler(signum, frame):
+
+ # Kill our signal handlers so we don't get called recursively
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+
+ # Print our current job status
+ for target, job in self._jobs.iteritems():
+ if job:
+ self._PrintJob(job)
+ os.unlink(job.filename)
+
+ # Notify the user that we are exiting
+ print "Exiting on signal %s" % signum
+ sys.exit(1)
+
+ # Print out job status when we are killed
+ signal.signal(signal.SIGINT, ExitHandler)
+ signal.signal(signal.SIGTERM, ExitHandler)
+
def _Schedule(self, target):
# We maintain a tree of all deps, if this doesn't need
# to be installed just free up it's children and continue.
@@ -1165,16 +1256,56 @@ class EmergeQueue(object):
self._Finish(target)
else:
# Kick off the build if it's marked to be built.
- self._jobs.add(target)
+ self._jobs[target] = None
self._emerge_queue.put(target)
def _LoadAvg(self):
loads = open("/proc/loadavg", "r").readline().split()[:3]
return " ".join(loads)
+ def _PrintJob(self, job):
+ """Print output so far of specified job"""
+
+ # Calculate how long the job has been running.
+ current_time = time.time()
+ seconds = current_time - job.start_timestamp
+
+ # Note that we've printed out the job so far.
+ job.last_output_timestamp = current_time
+
+ # Note that we're starting the job
+ info = "job %s (%dm%.1fs) ===" % (job.pkgname, seconds / 60, seconds % 60)
+ if job.last_output_seek:
+ print "=== Continue output for %s " % info
+ else:
+ print "=== Start output for %s ===" % info
+
+ # Print actual output from job
+ f = codecs.open(job.filename, encoding='utf-8', errors='replace')
+ f.seek(job.last_output_seek)
+ prefix = job.pkgname + ":"
+ for line in f:
+
+ # Save off our position in the file
+ if line and line[-1] == "\n":
+ job.last_output_seek = f.tell()
+ line = line[:-1]
+
+ # Print our line
+ print prefix, line.encode('utf-8', 'replace')
+ f.close()
+
+ # Note end of output section
+ if job.done:
+ print "=== Complete: %s ===" % info
+ else:
+ print "=== Still running: %s ===" % info
+
+
def _Status(self):
"""Print status."""
- seconds = time.time() - GLOBAL_START
+ current_time = time.time()
+ seconds = current_time - GLOBAL_START
line = ("Pending %s, Ready %s, Running %s, Retrying %s, Total %s "
"[Time %dm%.1fs Load %s]")
qsize = self._emerge_queue.qsize()
@@ -1182,6 +1313,18 @@ class EmergeQueue(object):
len(self._retry_queue), self._total_jobs,
seconds / 60, seconds % 60, self._LoadAvg())
+ # Print interim output every minute if --show-output is used. Otherwise,
+ # only print output if a job has been running for 60 minutes or more.
+ if self._show_output:
+ interval = 60
+ else:
+ interval = 60 * 60
+ for target, job in self._jobs.iteritems():
+ if job:
+ last_timestamp = max(job.start_timestamp, job.last_output_timestamp)
+ if last_timestamp + interval < current_time:
+ self._PrintJob(job)
+
def _Finish(self, target):
"""Mark a target as completed and unblock dependecies."""
for dep in self._deps_map[target]["provides"]:
@@ -1205,7 +1348,7 @@ class EmergeQueue(object):
while self._deps_map:
# Check here that we are actually waiting for something.
if (self._emerge_queue.empty() and
- self._done_queue.empty() and
+ self._job_queue.empty() and
not self._jobs and
self._deps_map):
# If we have failed on a package, retry it now.
@@ -1213,11 +1356,10 @@ class EmergeQueue(object):
self._Retry()
# If we have failed a package twice, just give up.
elif self._failed:
- for failure, output in self._failed.items():
+ for failure in self._failed:
print "Package failed: %s" % failure
- print output
PrintDepsMap(self._deps_map)
- print "Packages failed: %s" % " ,".join(self._failed.keys())
+ print "Packages failed: %s" % " ,".join(self._failed)
sys.exit(1)
# If we have dependency cycles.
else:
@@ -1226,27 +1368,39 @@ class EmergeQueue(object):
sys.exit(1)
try:
- target, retcode, output = self._done_queue.get(timeout=5)
+ job = self._job_queue.get(timeout=5)
except Queue.Empty:
# Print an update.
self._Status()
continue
- self._jobs.discard(target)
+ target = job.target
+
+ if not job.done:
+ self._jobs[target] = job
+ print "Started %s (logged in %s)" % (target, job.filename)
+ continue
+
+ # Print output of job
+ if self._show_output or job.retcode != 0:
+ self._PrintJob(job)
+ os.unlink(job.filename)
+ del self._jobs[target]
+
+ seconds = time.time() - job.start_timestamp
+ details = "%s (in %dm%.1fs)" % (target, seconds / 60, seconds % 60)
- # Print if necessary.
- if retcode != 0:
- print output
- if retcode != 0:
+ # Complain if necessary.
+ if job.retcode != 0:
# Handle job failure.
if target in self._failed:
# If this job has failed previously, give up.
- print "Failed %s. Your build has failed." % target
+ print "Failed %s. Your build has failed." % details
else:
# Queue up this build to try again after a long while.
self._retry_queue.append(target)
- self._failed[target] = 1
- print "Failed %s, retrying later." % target
+ self._failed.add(target)
+ print "Failed %s, retrying later." % details
else:
if target in self._failed and self._retry_queue:
# If we have successfully retried a failed package, and there
@@ -1254,7 +1408,7 @@ class EmergeQueue(object):
# one retrying package actively running at a time.
self._Retry()
- print "Completed %s" % target
+ print "Completed %s" % details
# Mark as completed and unblock waiting ebuilds.
self._Finish(target)
@@ -1305,7 +1459,7 @@ def main():
PrintDepsMap(deps_graph)
# Run the queued emerges.
- scheduler = EmergeQueue(deps_graph, emerge, deps.package_db)
+ scheduler = EmergeQueue(deps_graph, emerge, deps.package_db, deps.show_output)
scheduler.Run()
# Update world.
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698