Index: parallel_emerge |
diff --git a/parallel_emerge b/parallel_emerge |
index fbdcc6557d1587902321ed5e3fee8d71f42c3e81..3d5ad74205580edc4a3ead9c01ced4ba54703ca6 100755 |
--- a/parallel_emerge |
+++ b/parallel_emerge |
@@ -1119,21 +1119,7 @@ class EmergeJobState(object): |
self.start_timestamp = start_timestamp |
-def EmergeWorker(task_queue, job_queue, emerge, package_db): |
- """This worker emerges any packages given to it on the task_queue. |
- |
- Args: |
- task_queue: The queue of tasks for this worker to do. |
- job_queue: The queue of results from the worker. |
- emerge: An EmergeData() object. |
- package_db: A dict, mapping package ids to portage Package objects. |
- |
- It expects package identifiers to be passed to it via task_queue. When |
- a task is started, it pushes the (target, filename) to the started_queue. |
- The output is stored in filename. When a merge starts or finishes, we push |
- EmergeJobState objects to the job_queue. |
- """ |
- |
+def SetupWorkerSignals(): |
def ExitHandler(signum, frame): |
# Remove our signal handlers so we don't get called recursively. |
signal.signal(signal.SIGINT, signal.SIG_DFL) |
@@ -1149,6 +1135,23 @@ def EmergeWorker(task_queue, job_queue, emerge, package_db): |
signal.signal(signal.SIGINT, ExitHandler) |
signal.signal(signal.SIGTERM, ExitHandler) |
+ |
+def EmergeWorker(task_queue, job_queue, emerge, package_db): |
+ """This worker emerges any packages given to it on the task_queue. |
+ |
+ Args: |
+ task_queue: The queue of tasks for this worker to do. |
+ job_queue: The queue of results from the worker. |
+ emerge: An EmergeData() object. |
+ package_db: A dict, mapping package ids to portage Package objects. |
+ |
+ It expects package identifiers to be passed to it via task_queue. When |
+ a task is started, it pushes the (target, filename) to the started_queue. |
+ The output is stored in filename. When a merge starts or finishes, we push |
+ EmergeJobState objects to the job_queue. |
+ """ |
+ |
+ SetupWorkerSignals() |
settings, trees, mtimedb = emerge.settings, emerge.trees, emerge.mtimedb |
opts, spinner = emerge.opts, emerge.spinner |
opts["--nodeps"] = True |
@@ -1190,6 +1193,86 @@ def EmergeWorker(task_queue, job_queue, emerge, package_db): |
job_queue.put(job) |
+class LinePrinter(object): |
+ """Helper object to print a single line.""" |
+ |
+ def __init__(self, line): |
+ self.line = line |
+ |
+ def Print(self, seek_locations): |
+ print self.line |
+ |
+ |
+class JobPrinter(object): |
+ """Helper object to print output of a job.""" |
+ |
+ def __init__(self, job, unlink=False): |
+ """Print output of job. |
+ |
+ If unlink is True, unlink the job output file when done.""" |
+ self.current_time = time.time() |
+ self.job = job |
+ self.unlink = unlink |
+ |
+ def Print(self, seek_locations): |
+ |
+ job = self.job |
+ |
+ # Calculate how long the job has been running. |
+ seconds = self.current_time - job.start_timestamp |
+ |
+ # Note that we've printed out the job so far. |
+ job.last_output_timestamp = self.current_time |
+ |
+ # Note that we're starting the job |
+ info = "job %s (%dm%.1fs)" % (job.pkgname, seconds / 60, seconds % 60) |
+ last_output_seek = seek_locations.get(job.filename, 0) |
+ if last_output_seek: |
+ print "=== Continue output for %s ===" % info |
+ else: |
+ print "=== Start output for %s ===" % info |
+ |
+ # Print actual output from job |
+ f = codecs.open(job.filename, encoding='utf-8', errors='replace') |
+ f.seek(last_output_seek) |
+ prefix = job.pkgname + ":" |
+ for line in f: |
+ |
+ # Save off our position in the file |
+ if line and line[-1] == "\n": |
+ last_output_seek = f.tell() |
+ line = line[:-1] |
+ |
+ # Print our line |
+ print prefix, line.encode('utf-8', 'replace') |
+ f.close() |
+ |
+ # Save our last spot in the file so that we don't print out the same |
+ # location twice. |
+ seek_locations[job.filename] = last_output_seek |
+ |
+ # Note end of output section |
+ if job.done: |
+ print "=== Complete: %s ===" % info |
+ else: |
+ print "=== Still running: %s ===" % info |
+ |
+ if self.unlink: |
+ os.unlink(job.filename) |
+ |
+ |
+def PrintWorker(queue): |
+ """A worker that prints stuff to the screen as requested.""" |
+ SetupWorkerSignals() |
+ seek_locations = {} |
+ while True: |
+ job = queue.get() |
+ if job: |
+ job.Print(seek_locations) |
+ else: |
+ break |
+ |
+ |
class EmergeQueue(object): |
"""Class to schedule emerge jobs according to a dependency graph.""" |
@@ -1215,8 +1298,12 @@ class EmergeQueue(object): |
emerge.opts.get("--jobs", multiprocessing.cpu_count())) |
self._emerge_queue = multiprocessing.Queue() |
self._job_queue = multiprocessing.Queue() |
+ self._print_queue = multiprocessing.Queue() |
args = (self._emerge_queue, self._job_queue, emerge, package_db) |
self._pool = multiprocessing.Pool(procs, EmergeWorker, args) |
+ self._print_worker = multiprocessing.Process(target=PrintWorker, |
+ args=[self._print_queue]) |
+ self._print_worker.start() |
# Initialize the failed queue to empty. |
self._retry_queue = [] |
@@ -1245,11 +1332,14 @@ class EmergeQueue(object): |
# Print our current job status |
for target, job in self._jobs.iteritems(): |
if job: |
- self._PrintJob(job) |
- os.unlink(job.filename) |
+ self._print_queue.put(JobPrinter(job, unlink=True)) |
# Notify the user that we are exiting |
- print "Exiting on signal %s" % signum |
+ self._Print("Exiting on signal %s" % signum) |
+ |
+ # Exit when print worker is done. |
+ self._print_queue.put(None) |
+ self._print_worker.join() |
sys.exit(1) |
# Print out job status when we are killed |
@@ -1273,44 +1363,9 @@ class EmergeQueue(object): |
loads = open("/proc/loadavg", "r").readline().split()[:3] |
return " ".join(loads) |
- def _PrintJob(self, job): |
- """Print output so far of specified job""" |
- |
- # Calculate how long the job has been running. |
- current_time = time.time() |
- seconds = current_time - job.start_timestamp |
- |
- # Note that we've printed out the job so far. |
- job.last_output_timestamp = current_time |
- |
- # Note that we're starting the job |
- info = "job %s (%dm%.1fs) ===" % (job.pkgname, seconds / 60, seconds % 60) |
- if job.last_output_seek: |
- print "=== Continue output for %s " % info |
- else: |
- print "=== Start output for %s ===" % info |
- |
- # Print actual output from job |
- f = codecs.open(job.filename, encoding='utf-8', errors='replace') |
- f.seek(job.last_output_seek) |
- prefix = job.pkgname + ":" |
- for line in f: |
- |
- # Save off our position in the file |
- if line and line[-1] == "\n": |
- job.last_output_seek = f.tell() |
- line = line[:-1] |
- |
- # Print our line |
- print prefix, line.encode('utf-8', 'replace') |
- f.close() |
- |
- # Note end of output section |
- if job.done: |
- print "=== Complete: %s ===" % info |
- else: |
- print "=== Still running: %s ===" % info |
- |
+ def _Print(self, line): |
+ """Print a single line.""" |
+ self._print_queue.put(LinePrinter(line)) |
def _Status(self): |
"""Print status.""" |
@@ -1319,9 +1374,9 @@ class EmergeQueue(object): |
line = ("Pending %s, Ready %s, Running %s, Retrying %s, Total %s " |
"[Time %dm%.1fs Load %s]") |
qsize = self._emerge_queue.qsize() |
- print line % (len(self._deps_map), qsize, len(self._jobs) - qsize, |
- len(self._retry_queue), self._total_jobs, |
- seconds / 60, seconds % 60, self._LoadAvg()) |
+ self._Print(line % (len(self._deps_map), qsize, len(self._jobs) - qsize, |
+ len(self._retry_queue), self._total_jobs, |
+ seconds / 60, seconds % 60, self._LoadAvg())) |
# Print interim output every minute if --show-output is used. Otherwise, |
# only print output if a job has been running for 60 minutes or more. |
@@ -1333,7 +1388,8 @@ class EmergeQueue(object): |
if job: |
last_timestamp = max(job.start_timestamp, job.last_output_timestamp) |
if last_timestamp + interval < current_time: |
- self._PrintJob(job) |
+ self._print_queue.put(JobPrinter(job)) |
+ job.last_output_timestamp = current_time |
def _Finish(self, target): |
"""Mark a target as completed and unblock dependecies.""" |
@@ -1347,7 +1403,7 @@ class EmergeQueue(object): |
if self._retry_queue: |
target = self._retry_queue.pop(0) |
self._Schedule(target) |
- print "Retrying emerge of %s." % target |
+ self._Print("Retrying emerge of %s." % target) |
def Run(self): |
"""Run through the scheduled ebuilds. |
@@ -1364,17 +1420,19 @@ class EmergeQueue(object): |
# If we have failed on a package, retry it now. |
if self._retry_queue: |
self._Retry() |
- # If we have failed a package twice, just give up. |
- elif self._failed: |
- for failure in self._failed: |
- print "Package failed: %s" % failure |
- PrintDepsMap(self._deps_map) |
- print "Packages failed: %s" % " ,".join(self._failed) |
- sys.exit(1) |
- # If we have dependency cycles. |
else: |
- print "Deadlock! Circular dependencies!" |
+ # Tell the print worker we're done, and wait for it to exit. |
+ self._print_queue.put(None) |
+ self._print_worker.join() |
+ |
+ # The dependency map is helpful for debugging failures. |
PrintDepsMap(self._deps_map) |
+ |
+ # Tell the user why we're exiting. |
+ if self._failed: |
+ print "Packages failed: %s" % " ,".join(self._failed) |
+ else: |
+ print "Deadlock! Circular dependencies!" |
sys.exit(1) |
try: |
@@ -1388,13 +1446,14 @@ class EmergeQueue(object): |
if not job.done: |
self._jobs[target] = job |
- print "Started %s (logged in %s)" % (target, job.filename) |
+ self._Print("Started %s (logged in %s)" % (target, job.filename)) |
continue |
# Print output of job |
if self._show_output or job.retcode != 0: |
- self._PrintJob(job) |
- os.unlink(job.filename) |
+ self._print_queue.put(JobPrinter(job, unlink=True)) |
+ else: |
+ os.unlink(job.filename) |
del self._jobs[target] |
seconds = time.time() - job.start_timestamp |
@@ -1405,12 +1464,12 @@ class EmergeQueue(object): |
# Handle job failure. |
if target in self._failed: |
# If this job has failed previously, give up. |
- print "Failed %s. Your build has failed." % details |
+ self._Print("Failed %s. Your build has failed." % details) |
else: |
# Queue up this build to try again after a long while. |
self._retry_queue.append(target) |
self._failed.add(target) |
- print "Failed %s, retrying later." % details |
+ self._Print("Failed %s, retrying later." % details) |
else: |
if target in self._failed and self._retry_queue: |
# If we have successfully retried a failed package, and there |
@@ -1418,13 +1477,17 @@ class EmergeQueue(object): |
# one retrying package actively running at a time. |
self._Retry() |
- print "Completed %s" % details |
+ self._Print("Completed %s" % details) |
# Mark as completed and unblock waiting ebuilds. |
self._Finish(target) |
# Print an update. |
self._Status() |
+ # Tell the print worker we're done, and wait for it to exit. |
+ self._print_queue.put(None) |
+ self._print_worker.join() |
+ |
def main(): |