| Index: parallel_emerge
|
| diff --git a/parallel_emerge b/parallel_emerge
|
| index 9a77ef45c486323090d5488839de90ec9bc96787..37214241e17f41150c97790830c8de317d9b5988 100755
|
| --- a/parallel_emerge
|
| +++ b/parallel_emerge
|
| @@ -40,6 +40,7 @@ Basic operation:
|
|
|
| import codecs
|
| import copy
|
| +import errno
|
| import multiprocessing
|
| import os
|
| import Queue
|
| @@ -1308,6 +1309,12 @@ def EmergeWorker(task_queue, job_queue, emerge, package_db):
|
| # Wait for a new item to show up on the queue. This is a blocking wait,
|
| # so if there's nothing to do, we just sit here.
|
| target = task_queue.get()
|
| + if not target:
|
| + # If target is None, this means that the main thread wants us to quit.
|
| + # The other workers need to exit too, so we'll push the message back on
|
| + # to the queue so they'll get it too.
|
| + task_queue.put(target)
|
| + return
|
| db_pkg = package_db[target]
|
| db_pkg.root_config = emerge.root_config
|
| install_list = [db_pkg]
|
| @@ -1412,14 +1419,33 @@ class JobPrinter(object):
|
|
|
| def PrintWorker(queue):
|
| """A worker that prints stuff to the screen as requested."""
|
| - SetupWorkerSignals()
|
| +
|
| + def ExitHandler(signum, frame):
|
| + # Switch to default signal handlers so that we'll die after two signals.
|
| + signal.signal(signal.SIGINT, signal.SIG_DFL)
|
| + signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
| +
|
| + # Don't exit on the first SIGINT / SIGTERM, because the parent worker will
|
| + # handle it and tell us when we need to exit.
|
| + signal.signal(signal.SIGINT, ExitHandler)
|
| + signal.signal(signal.SIGTERM, ExitHandler)
|
| +
|
| + # seek_locations is a map indicating the position we are at in each file.
|
| + # It starts off empty, but is set by the various Print jobs as we go along
|
| + # to indicate where we left off in each file.
|
| seek_locations = {}
|
| while True:
|
| - job = queue.get()
|
| - if job:
|
| - job.Print(seek_locations)
|
| - else:
|
| - break
|
| + try:
|
| + job = queue.get()
|
| + if job:
|
| + job.Print(seek_locations)
|
| + else:
|
| + break
|
| + except IOError as ex:
|
| + if ex.errno == errno.EINTR:
|
| + # Looks like we received a signal. Keep printing.
|
| + continue
|
| + raise
|
|
|
|
|
| class EmergeQueue(object):
|
| @@ -1490,9 +1516,8 @@ class EmergeQueue(object):
|
| # Notify the user that we are exiting
|
| self._Print("Exiting on signal %s" % signum)
|
|
|
| - # Exit when print worker is done.
|
| - self._print_queue.put(None)
|
| - self._print_worker.join()
|
| + # Kill child threads, then exit.
|
| + self._Exit()
|
| sys.exit(1)
|
|
|
| # Print out job status when we are killed
|
| @@ -1558,6 +1583,17 @@ class EmergeQueue(object):
|
| self._Schedule(target)
|
| self._Print("Retrying emerge of %s." % target)
|
|
|
| + def _Exit(self):
|
| + # Tell emerge workers to exit. They all exit when 'None' is pushed
|
| + # to the queue.
|
| + self._emerge_queue.put(None)
|
| + self._pool.close()
|
| + self._pool.join()
|
| +
|
| + # Now that our workers are finished, we can kill the print queue.
|
| + self._print_queue.put(None)
|
| + self._print_worker.join()
|
| +
|
| def Run(self):
|
| """Run through the scheduled ebuilds.
|
|
|
| @@ -1574,9 +1610,8 @@ class EmergeQueue(object):
|
| if self._retry_queue:
|
| self._Retry()
|
| else:
|
| - # Tell the print worker we're done, and wait for it to exit.
|
| - self._print_queue.put(None)
|
| - self._print_worker.join()
|
| + # Tell child threads to exit.
|
| + self._Exit()
|
|
|
| # The dependency map is helpful for debugging failures.
|
| PrintDepsMap(self._deps_map)
|
| @@ -1637,9 +1672,9 @@ class EmergeQueue(object):
|
| # Print an update.
|
| self._Status()
|
|
|
| - # Tell the print worker we're done, and wait for it to exit.
|
| - self._print_queue.put(None)
|
| - self._print_worker.join()
|
| + # Tell child threads to exit.
|
| + self._Print("Merge complete")
|
| + self._Exit()
|
|
|
|
|
| def main():
|
|
|