Index: parallel_emerge |
diff --git a/parallel_emerge b/parallel_emerge |
index 35c089cdf5fcd0b3c004c7be2e34b25f19462aeb..af552543feb45b5057ca3bdcdb2ed9c714d1f6c7 100755 |
--- a/parallel_emerge |
+++ b/parallel_emerge |
@@ -38,14 +38,17 @@ Basic operation: |
of the same package for a runtime dep). |
""" |
+import codecs |
import copy |
import multiprocessing |
import os |
import Queue |
import shlex |
+import signal |
import sys |
import tempfile |
import time |
+import traceback |
import urllib2 |
# If PORTAGE_USERNAME isn't specified, scrape it from the $HOME variable. On |
@@ -214,7 +217,7 @@ class DepGraphGenerator(object): |
""" |
__slots__ = ["board", "emerge", "mandatory_source", "no_workon_deps", |
- "package_db", "rebuild"] |
+ "package_db", "rebuild", "show_output"] |
def __init__(self): |
self.board = None |
@@ -223,6 +226,7 @@ class DepGraphGenerator(object): |
self.no_workon_deps = False |
self.package_db = {} |
self.rebuild = False |
+ self.show_output = False |
def ParseParallelEmergeArgs(self, argv): |
"""Read the parallel emerge arguments from the command-line. |
@@ -249,6 +253,8 @@ class DepGraphGenerator(object): |
self.no_workon_deps = True |
elif arg == "--rebuild": |
self.rebuild = True |
+ elif arg == "--show-output": |
+ self.show_output = True |
else: |
# Not one of our options, so pass through to emerge. |
emerge_args.append(arg) |
@@ -1067,20 +1073,72 @@ def PrintDepsMap(deps_map): |
print " no dependencies" |
-def EmergeWorker(task_queue, done_queue, emerge, package_db): |
+class EmergeJobState(object): |
+ __slots__ = ["done", "filename", "last_output_seek", "last_output_timestamp", |
+ "pkgname", "retcode", "start_timestamp", "target"] |
+ |
+ def __init__(self, target, pkgname, done, filename, start_timestamp, |
+ retcode=None): |
+ |
+ # The full name of the target we're building (e.g. |
+ # chromeos-base/chromeos-0.0.1-r60) |
+ self.target = target |
+ |
+ # The short name of the target we're building (e.g. chromeos-0.0.1-r60) |
+ self.pkgname = pkgname |
+ |
+ # Whether the job is done. (True if the job is done; false otherwise.) |
+ self.done = done |
+ |
+ # The filename where output is currently stored. |
+ self.filename = filename |
+ |
+ # The location (in bytes) of the end of the last complete line we printed. |
+ # This starts off at zero. We use this to jump to the right place when we |
+ # print output from the same ebuild multiple times. |
+ self.last_output_seek = 0 |
+ |
+ # The timestamp of the last time we printed output. Since we haven't |
+ # printed output yet, this starts at zero. |
+ self.last_output_timestamp = 0 |
+ |
+ # The return code of our job, if the job is actually finished. |
+ self.retcode = retcode |
+ |
+ # The timestamp when our job started. |
+ self.start_timestamp = start_timestamp |
+ |
+ |
+def EmergeWorker(task_queue, job_queue, emerge, package_db): |
"""This worker emerges any packages given to it on the task_queue. |
Args: |
task_queue: The queue of tasks for this worker to do. |
- done_queue: The queue of results from the worker. |
+ job_queue: The queue of results from the worker. |
emerge: An EmergeData() object. |
package_db: A dict, mapping package ids to portage Package objects. |
It expects package identifiers to be passed to it via task_queue. When |
- the package is merged, it pushes (target, retval, outputstr) into the |
- done_queue. |
+ a task is started, it pushes the (target, filename) to the started_queue. |
+ The output is stored in filename. When a merge starts or finishes, we push |
+ EmergeJobState objects to the job_queue. |
""" |
+ def ExitHandler(signum, frame): |
+ # Remove our signal handlers so we don't get called recursively. |
+ signal.signal(signal.SIGINT, signal.SIG_DFL) |
+ signal.signal(signal.SIGTERM, signal.SIG_DFL) |
+ |
+ # Try to exit cleanly |
+ sys.exit(1) |
+ |
+ # Ensure that we exit quietly and cleanly, if possible, when we receive |
+ # SIGTERM or SIGINT signals. By default, when the user hits CTRL-C, all |
+ # of the child processes will print details about KeyboardInterrupt |
+ # exceptions, which isn't very helpful. |
+ signal.signal(signal.SIGINT, ExitHandler) |
+ signal.signal(signal.SIGTERM, ExitHandler) |
+ |
settings, trees, mtimedb = emerge.settings, emerge.trees, emerge.mtimedb |
opts, spinner = emerge.opts, emerge.spinner |
opts["--nodeps"] = True |
@@ -1088,14 +1146,16 @@ def EmergeWorker(task_queue, done_queue, emerge, package_db): |
# Wait for a new item to show up on the queue. This is a blocking wait, |
# so if there's nothing to do, we just sit here. |
target = task_queue.get() |
- print "Emerging", target |
db_pkg = package_db[target] |
db_pkg.root_config = emerge.root_config |
install_list = [db_pkg] |
- output = tempfile.TemporaryFile() |
- outputstr = "" |
+ pkgname = db_pkg.pf |
+ output = tempfile.NamedTemporaryFile(prefix=pkgname + "-", delete=False) |
+ start_timestamp = time.time() |
+ job = EmergeJobState(target, pkgname, False, output.name, start_timestamp) |
+ job_queue.put(job) |
if "--pretend" in opts: |
- retval = 0 |
+ retcode = 0 |
else: |
save_stdout = sys.stdout |
save_stderr = sys.stderr |
@@ -1104,30 +1164,34 @@ def EmergeWorker(task_queue, done_queue, emerge, package_db): |
sys.stderr = output |
scheduler = Scheduler(settings, trees, mtimedb, opts, spinner, |
install_list, [], emerge.scheduler_graph) |
- retval = scheduler.merge() |
+ retcode = scheduler.merge() |
+ except Exception: |
+ traceback.print_exc(file=output) |
+ retcode = 1 |
finally: |
sys.stdout = save_stdout |
sys.stderr = save_stderr |
- if retval is None: |
- retval = 0 |
- if retval != 0: |
- output.seek(0) |
- outputstr = output.read() |
+ output.close() |
+ if retcode is None: |
+ retcode = 0 |
- done_queue.put((target, retval, outputstr)) |
+ job = EmergeJobState(target, pkgname, True, output.name, start_timestamp, |
+ retcode) |
+ job_queue.put(job) |
class EmergeQueue(object): |
"""Class to schedule emerge jobs according to a dependency graph.""" |
- def __init__(self, deps_map, emerge, package_db): |
+ def __init__(self, deps_map, emerge, package_db, show_output): |
# Store the dependency graph. |
self._deps_map = deps_map |
# Initialize the running queue to empty |
- self._jobs = set() |
+ self._jobs = {} |
# List of total package installs represented in deps_map. |
install_jobs = [x for x in deps_map if deps_map[x]["action"] == "merge"] |
self._total_jobs = len(install_jobs) |
+ self._show_output = show_output |
if "--pretend" in emerge.opts: |
print "Skipping merge because of --pretend mode." |
@@ -1140,21 +1204,48 @@ class EmergeQueue(object): |
procs = min(self._total_jobs, |
emerge.opts.get("--jobs", multiprocessing.cpu_count())) |
self._emerge_queue = multiprocessing.Queue() |
- self._done_queue = multiprocessing.Queue() |
- args = (self._emerge_queue, self._done_queue, emerge, package_db) |
+ self._job_queue = multiprocessing.Queue() |
+ args = (self._emerge_queue, self._job_queue, emerge, package_db) |
self._pool = multiprocessing.Pool(procs, EmergeWorker, args) |
# Initialize the failed queue to empty. |
self._retry_queue = [] |
- self._failed = {} |
+ self._failed = set() |
# Print an update before we launch the merges. |
self._Status() |
+ # Setup an exit handler so that we print nice messages if we are |
+ # terminated. |
+ self._SetupExitHandler() |
+ |
+ # Schedule our jobs. |
for target, info in deps_map.items(): |
if not info["needs"]: |
self._Schedule(target) |
+ def _SetupExitHandler(self): |
+ |
+ def ExitHandler(signum, frame): |
+ |
+ # Kill our signal handlers so we don't get called recursively |
+ signal.signal(signal.SIGINT, signal.SIG_DFL) |
+ signal.signal(signal.SIGTERM, signal.SIG_DFL) |
+ |
+ # Print our current job status |
+ for target, job in self._jobs.iteritems(): |
+ if job: |
+ self._PrintJob(job) |
+ os.unlink(job.filename) |
+ |
+ # Notify the user that we are exiting |
+ print "Exiting on signal %s" % signum |
+ sys.exit(1) |
+ |
+ # Print out job status when we are killed |
+ signal.signal(signal.SIGINT, ExitHandler) |
+ signal.signal(signal.SIGTERM, ExitHandler) |
+ |
def _Schedule(self, target): |
# We maintain a tree of all deps, if this doesn't need |
# to be installed just free up it's children and continue. |
@@ -1165,16 +1256,56 @@ class EmergeQueue(object): |
self._Finish(target) |
else: |
# Kick off the build if it's marked to be built. |
- self._jobs.add(target) |
+ self._jobs[target] = None |
self._emerge_queue.put(target) |
def _LoadAvg(self): |
loads = open("/proc/loadavg", "r").readline().split()[:3] |
return " ".join(loads) |
+ def _PrintJob(self, job): |
+ """Print output so far of specified job""" |
+ |
+ # Calculate how long the job has been running. |
+ current_time = time.time() |
+ seconds = current_time - job.start_timestamp |
+ |
+ # Note that we've printed out the job so far. |
+ job.last_output_timestamp = current_time |
+ |
+ # Note that we're starting the job |
+ info = "job %s (%dm%.1fs) ===" % (job.pkgname, seconds / 60, seconds % 60) |
+ if job.last_output_seek: |
+ print "=== Continue output for %s " % info |
+ else: |
+ print "=== Start output for %s ===" % info |
+ |
+ # Print actual output from job |
+ f = codecs.open(job.filename, encoding='utf-8', errors='replace') |
+ f.seek(job.last_output_seek) |
+ prefix = job.pkgname + ":" |
+ for line in f: |
+ |
+ # Save off our position in the file |
+ if line and line[-1] == "\n": |
+ job.last_output_seek = f.tell() |
+ line = line[:-1] |
+ |
+ # Print our line |
+ print prefix, line.encode('utf-8', 'replace') |
+ f.close() |
+ |
+ # Note end of output section |
+ if job.done: |
+ print "=== Complete: %s ===" % info |
+ else: |
+ print "=== Still running: %s ===" % info |
+ |
+ |
def _Status(self): |
"""Print status.""" |
- seconds = time.time() - GLOBAL_START |
+ current_time = time.time() |
+ seconds = current_time - GLOBAL_START |
line = ("Pending %s, Ready %s, Running %s, Retrying %s, Total %s " |
"[Time %dm%.1fs Load %s]") |
qsize = self._emerge_queue.qsize() |
@@ -1182,6 +1313,18 @@ class EmergeQueue(object): |
len(self._retry_queue), self._total_jobs, |
seconds / 60, seconds % 60, self._LoadAvg()) |
+ # Print interim output every minute if --show-output is used. Otherwise, |
+ # only print output if a job has been running for 60 minutes or more. |
+ if self._show_output: |
+ interval = 60 |
+ else: |
+ interval = 60 * 60 |
+ for target, job in self._jobs.iteritems(): |
+ if job: |
+ last_timestamp = max(job.start_timestamp, job.last_output_timestamp) |
+ if last_timestamp + interval < current_time: |
+ self._PrintJob(job) |
+ |
def _Finish(self, target): |
"""Mark a target as completed and unblock dependecies.""" |
for dep in self._deps_map[target]["provides"]: |
@@ -1205,7 +1348,7 @@ class EmergeQueue(object): |
while self._deps_map: |
# Check here that we are actually waiting for something. |
if (self._emerge_queue.empty() and |
- self._done_queue.empty() and |
+ self._job_queue.empty() and |
not self._jobs and |
self._deps_map): |
# If we have failed on a package, retry it now. |
@@ -1213,11 +1356,10 @@ class EmergeQueue(object): |
self._Retry() |
# If we have failed a package twice, just give up. |
elif self._failed: |
- for failure, output in self._failed.items(): |
+ for failure in self._failed: |
print "Package failed: %s" % failure |
- print output |
PrintDepsMap(self._deps_map) |
- print "Packages failed: %s" % " ,".join(self._failed.keys()) |
+ print "Packages failed: %s" % " ,".join(self._failed) |
sys.exit(1) |
# If we have dependency cycles. |
else: |
@@ -1226,27 +1368,39 @@ class EmergeQueue(object): |
sys.exit(1) |
try: |
- target, retcode, output = self._done_queue.get(timeout=5) |
+ job = self._job_queue.get(timeout=5) |
except Queue.Empty: |
# Print an update. |
self._Status() |
continue |
- self._jobs.discard(target) |
+ target = job.target |
+ |
+ if not job.done: |
+ self._jobs[target] = job |
+ print "Started %s (logged in %s)" % (target, job.filename) |
+ continue |
+ |
+ # Print output of job |
+ if self._show_output or job.retcode != 0: |
+ self._PrintJob(job) |
+ os.unlink(job.filename) |
+ del self._jobs[target] |
+ |
+ seconds = time.time() - job.start_timestamp |
+ details = "%s (in %dm%.1fs)" % (target, seconds / 60, seconds % 60) |
- # Print if necessary. |
- if retcode != 0: |
- print output |
- if retcode != 0: |
+ # Complain if necessary. |
+ if job.retcode != 0: |
# Handle job failure. |
if target in self._failed: |
# If this job has failed previously, give up. |
- print "Failed %s. Your build has failed." % target |
+ print "Failed %s. Your build has failed." % details |
else: |
# Queue up this build to try again after a long while. |
self._retry_queue.append(target) |
- self._failed[target] = 1 |
- print "Failed %s, retrying later." % target |
+ self._failed.add(target) |
+ print "Failed %s, retrying later." % details |
else: |
if target in self._failed and self._retry_queue: |
# If we have successfully retried a failed package, and there |
@@ -1254,7 +1408,7 @@ class EmergeQueue(object): |
# one retrying package actively running at a time. |
self._Retry() |
- print "Completed %s" % target |
+ print "Completed %s" % details |
# Mark as completed and unblock waiting ebuilds. |
self._Finish(target) |
@@ -1305,7 +1459,7 @@ def main(): |
PrintDepsMap(deps_graph) |
# Run the queued emerges. |
- scheduler = EmergeQueue(deps_graph, emerge, deps.package_db) |
+ scheduler = EmergeQueue(deps_graph, emerge, deps.package_db, deps.show_output) |
scheduler.Run() |
# Update world. |