Index: parallel_emerge |
diff --git a/parallel_emerge b/parallel_emerge |
index 9a77ef45c486323090d5488839de90ec9bc96787..37214241e17f41150c97790830c8de317d9b5988 100755 |
--- a/parallel_emerge |
+++ b/parallel_emerge |
@@ -40,6 +40,7 @@ Basic operation: |
import codecs |
import copy |
+import errno |
import multiprocessing |
import os |
import Queue |
@@ -1308,6 +1309,12 @@ def EmergeWorker(task_queue, job_queue, emerge, package_db): |
# Wait for a new item to show up on the queue. This is a blocking wait, |
# so if there's nothing to do, we just sit here. |
target = task_queue.get() |
+ if not target: |
+ # If target is None, this means that the main thread wants us to quit. |
+ # The other workers need to exit too, so we'll push the message back on |
+ # to the queue so they'll get it too. |
+ task_queue.put(target) |
+ return |
db_pkg = package_db[target] |
db_pkg.root_config = emerge.root_config |
install_list = [db_pkg] |
@@ -1412,14 +1419,33 @@ class JobPrinter(object): |
def PrintWorker(queue): |
"""A worker that prints stuff to the screen as requested.""" |
- SetupWorkerSignals() |
+ |
+ def ExitHandler(signum, frame): |
+ # Switch to default signal handlers so that we'll die after two signals. |
+ signal.signal(signal.SIGINT, signal.SIG_DFL) |
+ signal.signal(signal.SIGTERM, signal.SIG_DFL) |
+ |
+ # Don't exit on the first SIGINT / SIGTERM, because the parent worker will |
+ # handle it and tell us when we need to exit. |
+ signal.signal(signal.SIGINT, ExitHandler) |
+ signal.signal(signal.SIGTERM, ExitHandler) |
+ |
+ # seek_locations is a map indicating the position we are at in each file. |
+ # It starts off empty, but is set by the various Print jobs as we go along |
+ # to indicate where we left off in each file. |
seek_locations = {} |
while True: |
- job = queue.get() |
- if job: |
- job.Print(seek_locations) |
- else: |
- break |
+ try: |
+ job = queue.get() |
+ if job: |
+ job.Print(seek_locations) |
+ else: |
+ break |
+ except IOError as ex: |
+ if ex.errno == errno.EINTR: |
+ # Looks like we received a signal. Keep printing. |
+ continue |
+ raise |
class EmergeQueue(object): |
@@ -1490,9 +1516,8 @@ class EmergeQueue(object): |
# Notify the user that we are exiting |
self._Print("Exiting on signal %s" % signum) |
- # Exit when print worker is done. |
- self._print_queue.put(None) |
- self._print_worker.join() |
+ # Kill child threads, then exit. |
+ self._Exit() |
sys.exit(1) |
# Print out job status when we are killed |
@@ -1558,6 +1583,17 @@ class EmergeQueue(object): |
self._Schedule(target) |
self._Print("Retrying emerge of %s." % target) |
+ def _Exit(self): |
+ # Tell emerge workers to exit. They all exit when 'None' is pushed |
+ # to the queue. |
+ self._emerge_queue.put(None) |
+ self._pool.close() |
+ self._pool.join() |
+ |
+ # Now that our workers are finished, we can kill the print queue. |
+ self._print_queue.put(None) |
+ self._print_worker.join() |
+ |
def Run(self): |
"""Run through the scheduled ebuilds. |
@@ -1574,9 +1610,8 @@ class EmergeQueue(object): |
if self._retry_queue: |
self._Retry() |
else: |
- # Tell the print worker we're done, and wait for it to exit. |
- self._print_queue.put(None) |
- self._print_worker.join() |
+ # Tell child threads to exit. |
+ self._Exit() |
# The dependency map is helpful for debugging failures. |
PrintDepsMap(self._deps_map) |
@@ -1637,9 +1672,9 @@ class EmergeQueue(object): |
# Print an update. |
self._Status() |
- # Tell the print worker we're done, and wait for it to exit. |
- self._print_queue.put(None) |
- self._print_worker.join() |
+ # Tell child threads to exit. |
+ self._Print("Merge complete") |
+ self._Exit() |
def main(): |