Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(60)

Unified Diff: parallel_emerge

Issue 3534006: Cleanup parallel_emerge exit conditions to fix hangs. (Closed) Base URL: http://git.chromium.org/git/crosutils.git
Patch Set: More comments. Created 10 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: parallel_emerge
diff --git a/parallel_emerge b/parallel_emerge
index 9a77ef45c486323090d5488839de90ec9bc96787..37214241e17f41150c97790830c8de317d9b5988 100755
--- a/parallel_emerge
+++ b/parallel_emerge
@@ -40,6 +40,7 @@ Basic operation:
import codecs
import copy
+import errno
import multiprocessing
import os
import Queue
@@ -1308,6 +1309,12 @@ def EmergeWorker(task_queue, job_queue, emerge, package_db):
# Wait for a new item to show up on the queue. This is a blocking wait,
# so if there's nothing to do, we just sit here.
target = task_queue.get()
+ if not target:
+ # If target is None, this means that the main thread wants us to quit.
+ # The other workers need to exit too, so we'll push the message back on
+ # to the queue so they'll get it too.
+ task_queue.put(target)
+ return
db_pkg = package_db[target]
db_pkg.root_config = emerge.root_config
install_list = [db_pkg]
@@ -1412,14 +1419,33 @@ class JobPrinter(object):
def PrintWorker(queue):
"""A worker that prints stuff to the screen as requested."""
- SetupWorkerSignals()
+
+ def ExitHandler(signum, frame):
+ # Switch to default signal handlers so that we'll die after two signals.
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+
+ # Don't exit on the first SIGINT / SIGTERM, because the parent worker will
+ # handle it and tell us when we need to exit.
+ signal.signal(signal.SIGINT, ExitHandler)
+ signal.signal(signal.SIGTERM, ExitHandler)
+
+ # seek_locations is a map indicating the position we are at in each file.
+ # It starts off empty, but is set by the various Print jobs as we go along
+ # to indicate where we left off in each file.
seek_locations = {}
while True:
- job = queue.get()
- if job:
- job.Print(seek_locations)
- else:
- break
+ try:
+ job = queue.get()
+ if job:
+ job.Print(seek_locations)
+ else:
+ break
+ except IOError as ex:
+ if ex.errno == errno.EINTR:
+ # Looks like we received a signal. Keep printing.
+ continue
+ raise
class EmergeQueue(object):
@@ -1490,9 +1516,8 @@ class EmergeQueue(object):
# Notify the user that we are exiting
self._Print("Exiting on signal %s" % signum)
- # Exit when print worker is done.
- self._print_queue.put(None)
- self._print_worker.join()
+ # Kill child threads, then exit.
+ self._Exit()
sys.exit(1)
# Print out job status when we are killed
@@ -1558,6 +1583,17 @@ class EmergeQueue(object):
self._Schedule(target)
self._Print("Retrying emerge of %s." % target)
+ def _Exit(self):
+ # Tell emerge workers to exit. They all exit when 'None' is pushed
+ # to the queue.
+ self._emerge_queue.put(None)
+ self._pool.close()
+ self._pool.join()
+
+ # Now that our workers are finished, we can kill the print queue.
+ self._print_queue.put(None)
+ self._print_worker.join()
+
def Run(self):
"""Run through the scheduled ebuilds.
@@ -1574,9 +1610,8 @@ class EmergeQueue(object):
if self._retry_queue:
self._Retry()
else:
- # Tell the print worker we're done, and wait for it to exit.
- self._print_queue.put(None)
- self._print_worker.join()
+ # Tell child threads to exit.
+ self._Exit()
# The dependency map is helpful for debugging failures.
PrintDepsMap(self._deps_map)
@@ -1637,9 +1672,9 @@ class EmergeQueue(object):
# Print an update.
self._Status()
- # Tell the print worker we're done, and wait for it to exit.
- self._print_queue.put(None)
- self._print_worker.join()
+ # Tell child threads to exit.
+ self._Print("Merge complete")
+ self._Exit()
def main():
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698