Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1348)

Unified Diff: appengine/findit/util_scripts/script_util.py

Issue 2432203003: [Predator] Run predator. (Closed)
Patch Set: Fix nits. Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/findit/util_scripts/script_util.py
diff --git a/appengine/findit/util_scripts/script_util.py b/appengine/findit/util_scripts/script_util.py
index 1386357d9855ef952e71c8400febace7bac7b282..180114a97358a434af134e062cdfb02b360a2b23 100644
--- a/appengine/findit/util_scripts/script_util.py
+++ b/appengine/findit/util_scripts/script_util.py
@@ -4,13 +4,21 @@
"""This module contains util functions that local scripts can use."""
+import atexit
+import cgi
+import json
import logging
import os
+import Queue
+import re
import subprocess
import sys
+import threading
+import traceback
+import time
-from lib.cache_decorator import Cached
-from local_cache import LocalCacher # pylint: disable=W
+MAX_THREAD_NUMBER = 10
+TASK_QUEUE = None
def SetUpSystemPaths(): # pragma: no cover
@@ -32,6 +40,91 @@ def SetUpSystemPaths(): # pragma: no cover
sys.path.insert(1, findit_root_dir)
+SetUpSystemPaths()
+
+# The lib is in predator/ root dir, and can be imported only when sys.path get
+# set up.
+from lib.cache_decorator import Cached
+from local_cache import LocalCacher # pylint: disable=W
+
+
+def SignalWorkerThreads(): # pragma: no cover
+ """Puts signal worker threads into task queue."""
+ global TASK_QUEUE # pylint: disable=W0602
+ if not TASK_QUEUE:
+ return
+
+ for _ in range(MAX_THREAD_NUMBER):
+ TASK_QUEUE.put(None)
+
+ # Give worker threads a chance to exit.
+ # Workaround the harmless bug in python 2.7 below.
+ time.sleep(1)
+
+
+atexit.register(SignalWorkerThreads)
+
+
+def Worker(): # pragma: no cover
+ global TASK_QUEUE # pylint: disable=W0602
+ while True:
+ try:
+ task = TASK_QUEUE.get()
+ if not task:
+ return
+ except TypeError:
+ # According to http://bugs.python.org/issue14623, this is a harmless bug
+ # in python 2.7 which won't be fixed.
+ # The exception is raised on daemon threads when python interpreter is
+ # shutting down.
+ return
+
+ function, args, kwargs, result_semaphore = task
+ try:
+ function(*args, **kwargs)
+ except Exception as e:
+ print e
+ # Continue to process tasks in queue, in case every thread fails, the
+ # main thread will be waiting forever.
+ continue
+ finally:
+ # Signal one task is done in case of exception.
+ result_semaphore.release()
+
+
+def RunTasks(tasks): # pragma: no cover
+ """Run given tasks. Not thread-safe: no concurrent calls of this function.
+
+ Return after all tasks were completed. A task is a dict as below:
+ {
+ 'function': the function to call,
+ 'args': the positional argument to pass to the function,
+ 'kwargs': the key-value arguments to pass to the function,
+ }
+ """
+ if not tasks:
+ return
+
+ global TASK_QUEUE
+ if not TASK_QUEUE:
+ TASK_QUEUE = Queue.Queue()
+ for index in range(MAX_THREAD_NUMBER):
+ thread = threading.Thread(target=Worker, name='worker_%s' % index)
+ # Set as daemon, so no join is needed.
+ thread.daemon = True
+ thread.start()
+
+ result_semaphore = threading.Semaphore(0)
+ # Push task to task queue for execution.
+ for task in tasks:
+ TASK_QUEUE.put((task['function'], task.get('args', []),
+ task.get('kwargs', {}), result_semaphore))
+
+ # Wait until all tasks to be executed.
+ for _ in tasks:
+ result_semaphore.acquire()
+
+
@Cached(namespace='Command-output', cacher=LocalCacher())
def GetCommandOutput(command): # pragma: no cover
"""Gets the output stream of executable command.
« appengine/findit/crash/test/crash_testcase.py ('K') | « appengine/findit/util_scripts/iterator.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698