Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1418)

Unified Diff: parallel_emerge

Issue 3106008: Move printing of output to a separate thread. (Closed) Base URL: ssh://git@chromiumos-git/crosutils.git
Patch Set: Created 10 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: parallel_emerge
diff --git a/parallel_emerge b/parallel_emerge
index fbdcc6557d1587902321ed5e3fee8d71f42c3e81..3d5ad74205580edc4a3ead9c01ced4ba54703ca6 100755
--- a/parallel_emerge
+++ b/parallel_emerge
@@ -1119,21 +1119,7 @@ class EmergeJobState(object):
self.start_timestamp = start_timestamp
-def EmergeWorker(task_queue, job_queue, emerge, package_db):
- """This worker emerges any packages given to it on the task_queue.
-
- Args:
- task_queue: The queue of tasks for this worker to do.
- job_queue: The queue of results from the worker.
- emerge: An EmergeData() object.
- package_db: A dict, mapping package ids to portage Package objects.
-
- It expects package identifiers to be passed to it via task_queue. When
- a task is started, it pushes the (target, filename) to the started_queue.
- The output is stored in filename. When a merge starts or finishes, we push
- EmergeJobState objects to the job_queue.
- """
-
+def SetupWorkerSignals():
def ExitHandler(signum, frame):
# Remove our signal handlers so we don't get called recursively.
signal.signal(signal.SIGINT, signal.SIG_DFL)
@@ -1149,6 +1135,23 @@ def EmergeWorker(task_queue, job_queue, emerge, package_db):
signal.signal(signal.SIGINT, ExitHandler)
signal.signal(signal.SIGTERM, ExitHandler)
+
+def EmergeWorker(task_queue, job_queue, emerge, package_db):
+ """This worker emerges any packages given to it on the task_queue.
+
+ Args:
+ task_queue: The queue of tasks for this worker to do.
+ job_queue: The queue of results from the worker.
+ emerge: An EmergeData() object.
+ package_db: A dict, mapping package ids to portage Package objects.
+
+ It expects package identifiers to be passed to it via task_queue. When
+ a task is started, it pushes the (target, filename) to the started_queue.
+ The output is stored in filename. When a merge starts or finishes, we push
+ EmergeJobState objects to the job_queue.
+ """
+
+ SetupWorkerSignals()
settings, trees, mtimedb = emerge.settings, emerge.trees, emerge.mtimedb
opts, spinner = emerge.opts, emerge.spinner
opts["--nodeps"] = True
@@ -1190,6 +1193,86 @@ def EmergeWorker(task_queue, job_queue, emerge, package_db):
job_queue.put(job)
+class LinePrinter(object):
+ """Helper object to print a single line."""
+
+ def __init__(self, line):
+ self.line = line
+
+ def Print(self, seek_locations):
+ print self.line
+
+
+class JobPrinter(object):
+ """Helper object to print output of a job."""
+
+ def __init__(self, job, unlink=False):
+ """Print output of job.
+
+ If unlink is True, unlink the job output file when done."""
+ self.current_time = time.time()
+ self.job = job
+ self.unlink = unlink
+
+ def Print(self, seek_locations):
+
+ job = self.job
+
+ # Calculate how long the job has been running.
+ seconds = self.current_time - job.start_timestamp
+
+ # Note that we've printed out the job so far.
+ job.last_output_timestamp = self.current_time
+
+ # Note that we're starting the job
+ info = "job %s (%dm%.1fs)" % (job.pkgname, seconds / 60, seconds % 60)
+ last_output_seek = seek_locations.get(job.filename, 0)
+ if last_output_seek:
+ print "=== Continue output for %s ===" % info
+ else:
+ print "=== Start output for %s ===" % info
+
+ # Print actual output from job
+ f = codecs.open(job.filename, encoding='utf-8', errors='replace')
+ f.seek(last_output_seek)
+ prefix = job.pkgname + ":"
+ for line in f:
+
+ # Save off our position in the file
+ if line and line[-1] == "\n":
+ last_output_seek = f.tell()
+ line = line[:-1]
+
+ # Print our line
+ print prefix, line.encode('utf-8', 'replace')
+ f.close()
+
+ # Save our last spot in the file so that we don't print out the same
+ # location twice.
+ seek_locations[job.filename] = last_output_seek
+
+ # Note end of output section
+ if job.done:
+ print "=== Complete: %s ===" % info
+ else:
+ print "=== Still running: %s ===" % info
+
+ if self.unlink:
+ os.unlink(job.filename)
+
+
+def PrintWorker(queue):
+ """A worker that prints stuff to the screen as requested."""
+ SetupWorkerSignals()
+ seek_locations = {}
+ while True:
+ job = queue.get()
+ if job:
+ job.Print(seek_locations)
+ else:
+ break
+
+
class EmergeQueue(object):
"""Class to schedule emerge jobs according to a dependency graph."""
@@ -1215,8 +1298,12 @@ class EmergeQueue(object):
emerge.opts.get("--jobs", multiprocessing.cpu_count()))
self._emerge_queue = multiprocessing.Queue()
self._job_queue = multiprocessing.Queue()
+ self._print_queue = multiprocessing.Queue()
args = (self._emerge_queue, self._job_queue, emerge, package_db)
self._pool = multiprocessing.Pool(procs, EmergeWorker, args)
+ self._print_worker = multiprocessing.Process(target=PrintWorker,
+ args=[self._print_queue])
+ self._print_worker.start()
# Initialize the failed queue to empty.
self._retry_queue = []
@@ -1245,11 +1332,14 @@ class EmergeQueue(object):
# Print our current job status
for target, job in self._jobs.iteritems():
if job:
- self._PrintJob(job)
- os.unlink(job.filename)
+ self._print_queue.put(JobPrinter(job, unlink=True))
# Notify the user that we are exiting
- print "Exiting on signal %s" % signum
+ self._Print("Exiting on signal %s" % signum)
+
+ # Exit when print worker is done.
+ self._print_queue.put(None)
+ self._print_worker.join()
sys.exit(1)
# Print out job status when we are killed
@@ -1273,44 +1363,9 @@ class EmergeQueue(object):
loads = open("/proc/loadavg", "r").readline().split()[:3]
return " ".join(loads)
- def _PrintJob(self, job):
- """Print output so far of specified job"""
-
- # Calculate how long the job has been running.
- current_time = time.time()
- seconds = current_time - job.start_timestamp
-
- # Note that we've printed out the job so far.
- job.last_output_timestamp = current_time
-
- # Note that we're starting the job
- info = "job %s (%dm%.1fs) ===" % (job.pkgname, seconds / 60, seconds % 60)
- if job.last_output_seek:
- print "=== Continue output for %s " % info
- else:
- print "=== Start output for %s ===" % info
-
- # Print actual output from job
- f = codecs.open(job.filename, encoding='utf-8', errors='replace')
- f.seek(job.last_output_seek)
- prefix = job.pkgname + ":"
- for line in f:
-
- # Save off our position in the file
- if line and line[-1] == "\n":
- job.last_output_seek = f.tell()
- line = line[:-1]
-
- # Print our line
- print prefix, line.encode('utf-8', 'replace')
- f.close()
-
- # Note end of output section
- if job.done:
- print "=== Complete: %s ===" % info
- else:
- print "=== Still running: %s ===" % info
-
+ def _Print(self, line):
+ """Print a single line."""
+ self._print_queue.put(LinePrinter(line))
def _Status(self):
"""Print status."""
@@ -1319,9 +1374,9 @@ class EmergeQueue(object):
line = ("Pending %s, Ready %s, Running %s, Retrying %s, Total %s "
"[Time %dm%.1fs Load %s]")
qsize = self._emerge_queue.qsize()
- print line % (len(self._deps_map), qsize, len(self._jobs) - qsize,
- len(self._retry_queue), self._total_jobs,
- seconds / 60, seconds % 60, self._LoadAvg())
+ self._Print(line % (len(self._deps_map), qsize, len(self._jobs) - qsize,
+ len(self._retry_queue), self._total_jobs,
+ seconds / 60, seconds % 60, self._LoadAvg()))
# Print interim output every minute if --show-output is used. Otherwise,
# only print output if a job has been running for 60 minutes or more.
@@ -1333,7 +1388,8 @@ class EmergeQueue(object):
if job:
last_timestamp = max(job.start_timestamp, job.last_output_timestamp)
if last_timestamp + interval < current_time:
- self._PrintJob(job)
+ self._print_queue.put(JobPrinter(job))
+ job.last_output_timestamp = current_time
def _Finish(self, target):
"""Mark a target as completed and unblock dependecies."""
@@ -1347,7 +1403,7 @@ class EmergeQueue(object):
if self._retry_queue:
target = self._retry_queue.pop(0)
self._Schedule(target)
- print "Retrying emerge of %s." % target
+ self._Print("Retrying emerge of %s." % target)
def Run(self):
"""Run through the scheduled ebuilds.
@@ -1364,17 +1420,19 @@ class EmergeQueue(object):
# If we have failed on a package, retry it now.
if self._retry_queue:
self._Retry()
- # If we have failed a package twice, just give up.
- elif self._failed:
- for failure in self._failed:
- print "Package failed: %s" % failure
- PrintDepsMap(self._deps_map)
- print "Packages failed: %s" % " ,".join(self._failed)
- sys.exit(1)
- # If we have dependency cycles.
else:
- print "Deadlock! Circular dependencies!"
+ # Tell the print worker we're done, and wait for it to exit.
+ self._print_queue.put(None)
+ self._print_worker.join()
+
+ # The dependency map is helpful for debugging failures.
PrintDepsMap(self._deps_map)
+
+ # Tell the user why we're exiting.
+ if self._failed:
+ print "Packages failed: %s" % " ,".join(self._failed)
+ else:
+ print "Deadlock! Circular dependencies!"
sys.exit(1)
try:
@@ -1388,13 +1446,14 @@ class EmergeQueue(object):
if not job.done:
self._jobs[target] = job
- print "Started %s (logged in %s)" % (target, job.filename)
+ self._Print("Started %s (logged in %s)" % (target, job.filename))
continue
# Print output of job
if self._show_output or job.retcode != 0:
- self._PrintJob(job)
- os.unlink(job.filename)
+ self._print_queue.put(JobPrinter(job, unlink=True))
+ else:
+ os.unlink(job.filename)
del self._jobs[target]
seconds = time.time() - job.start_timestamp
@@ -1405,12 +1464,12 @@ class EmergeQueue(object):
# Handle job failure.
if target in self._failed:
# If this job has failed previously, give up.
- print "Failed %s. Your build has failed." % details
+ self._Print("Failed %s. Your build has failed." % details)
else:
# Queue up this build to try again after a long while.
self._retry_queue.append(target)
self._failed.add(target)
- print "Failed %s, retrying later." % details
+ self._Print("Failed %s, retrying later." % details)
else:
if target in self._failed and self._retry_queue:
# If we have successfully retried a failed package, and there
@@ -1418,13 +1477,17 @@ class EmergeQueue(object):
# one retrying package actively running at a time.
self._Retry()
- print "Completed %s" % details
+ self._Print("Completed %s" % details)
# Mark as completed and unblock waiting ebuilds.
self._Finish(target)
# Print an update.
self._Status()
+ # Tell the print worker we're done, and wait for it to exit.
+ self._print_queue.put(None)
+ self._print_worker.join()
+
def main():
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698