Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2676)

Unified Diff: appengine/findit/util_scripts/script_util.py

Issue 2432203003: [Predator] Run predator. (Closed)
Patch Set: Rebase. Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/findit/util_scripts/script_util.py
diff --git a/appengine/findit/util_scripts/script_util.py b/appengine/findit/util_scripts/script_util.py
index 1386357d9855ef952e71c8400febace7bac7b282..f87229f50cd8d7373464e5980d10074a9828b05d 100644
--- a/appengine/findit/util_scripts/script_util.py
+++ b/appengine/findit/util_scripts/script_util.py
@@ -4,27 +4,34 @@
"""This module contains util functions that local scripts can use."""
-import logging
+import atexit
+import functools
import os
+import Queue
import subprocess
import sys
+import threading
+import time
+import traceback
-from lib.cache_decorator import Cached
-from local_cache import LocalCacher # pylint: disable=W
+MAX_THREAD_NUMBER = 15
+TASK_QUEUE = None
def SetUpSystemPaths(): # pragma: no cover
"""Sets system paths so as to import modules in findit, third_party and
appengine."""
findit_root_dir = os.path.join(os.path.dirname(__file__), os.path.pardir)
+ first_party_dir = os.path.join(findit_root_dir, 'first_party')
third_party_dir = os.path.join(findit_root_dir, 'third_party')
appengine_sdk_dir = os.path.join(findit_root_dir, os.path.pardir,
os.path.pardir, os.path.pardir,
'google_appengine')
# Add App Engine SDK dir to sys.path.
- sys.path.insert(1, appengine_sdk_dir)
sys.path.insert(1, third_party_dir)
+ sys.path.insert(1, first_party_dir)
+ sys.path.insert(1, appengine_sdk_dir)
import dev_appserver
dev_appserver.fix_sys_path()
@@ -32,6 +39,92 @@ def SetUpSystemPaths(): # pragma: no cover
sys.path.insert(1, findit_root_dir)
+SetUpSystemPaths()
+
+# The lib is in predator/ root dir, and can be imported only when sys.path gets
+# set up.
+from lib.cache_decorator import Cached
+from local_cache import LocalCacher # pylint: disable=W
+
+
+def SignalWorkerThreads(): # pragma: no cover
+ """Puts signal worker threads into task queue."""
+ global TASK_QUEUE # pylint: disable=W0602
+ if not TASK_QUEUE:
+ return
+
+ for _ in range(MAX_THREAD_NUMBER):
+ TASK_QUEUE.put(None)
+
+ # Give worker threads a chance to exit.
+ # Workaround the harmless bug in python 2.7 below.
+ time.sleep(1)
+
+
+atexit.register(SignalWorkerThreads)
+
+
+def Worker(): # pragma: no cover
+ global TASK_QUEUE # pylint: disable=W0602
+ while True:
+ try:
+ task = TASK_QUEUE.get()
+ if not task:
+ return
+ except TypeError:
+ # According to http://bugs.python.org/issue14623, this is a harmless bug
+ # in python 2.7 which won't be fixed.
+ # The exception is raised on daemon threads when python interpreter is
+ # shutting down.
+ return
+
+ function, args, kwargs, result_semaphore = task
+ try:
+ function(*args, **kwargs)
+ except Exception:
+ print 'Caught exception in thread.'
+ print traceback.format_exc()
+ # Continue to process tasks in queue, in case every thread fails, the
+ # main thread will be waiting forever.
+ continue
+ finally:
+ # Signal one task is done in case of exception.
+ result_semaphore.release()
+
+
+def RunTasks(tasks): # pragma: no cover
+ """Run given tasks. Not thread-safe: no concurrent calls of this function.
+
+ Return after all tasks were completed. A task is a dict as below:
+ {
+ 'function': the function to call,
+ 'args': the positional argument to pass to the function,
+ 'kwargs': the key-value arguments to pass to the function,
+ }
+ """
+ if not tasks:
+ return
+
+ global TASK_QUEUE
+ if not TASK_QUEUE:
+ TASK_QUEUE = Queue.Queue()
+ for index in range(MAX_THREAD_NUMBER):
+ thread = threading.Thread(target=Worker, name='worker_%s' % index)
+ # Set as daemon, so no join is needed.
+ thread.daemon = True
+ thread.start()
+
+ result_semaphore = threading.Semaphore(0)
+ # Push task to task queue for execution.
+ for task in tasks:
+ TASK_QUEUE.put((task['function'], task.get('args', []),
+ task.get('kwargs', {}), result_semaphore))
+
+ # Wait until all tasks to be executed.
+ for _ in tasks:
+ result_semaphore.acquire()
+
+
@Cached(namespace='Command-output', cacher=LocalCacher())
def GetCommandOutput(command): # pragma: no cover
"""Gets the output stream of executable command.
@@ -47,7 +140,17 @@ def GetCommandOutput(command): # pragma: no cover
stdoutdata, stderrdata = p.communicate()
if p.returncode != 0:
- logging.error('Error running command %s: %s', command, stderrdata)
- return None
+ raise Exception('Error running command %s: %s' % (command, stderrdata))
return stdoutdata
+
+
+def GetLockedMethod(cls, method_name, lock): # pragma: no cover
+ """Returns a class/object method serialized with lock."""
+ method = getattr(cls, method_name)
+
+ def LockedMethod(cls, *args, **kwargs): # pylint: disable=W
+ with lock:
+ return method(*args, **kwargs)
+
+ return functools.partial(LockedMethod, cls)
« no previous file with comments | « appengine/findit/util_scripts/local_cache.py ('k') | appengine/findit/util_scripts/test/script_util_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698