Chromium Code Reviews| Index: appengine/findit/util_scripts/script_util.py |
| diff --git a/appengine/findit/util_scripts/script_util.py b/appengine/findit/util_scripts/script_util.py |
| index 1386357d9855ef952e71c8400febace7bac7b282..3f45ae52a4f6de8a7c1974f15cbf898e43a2f9b9 100644 |
| --- a/appengine/findit/util_scripts/script_util.py |
| +++ b/appengine/findit/util_scripts/script_util.py |
| @@ -4,13 +4,22 @@ |
| """This module contains util functions that local scripts can use.""" |
| +import atexit |
| +import cgi |
| +import functools |
| +import json |
| import logging |
| import os |
| +import Queue |
| +import re |
| import subprocess |
| import sys |
| +import threading |
| +import traceback |
| +import time |
| -from lib.cache_decorator import Cached |
| -from local_cache import LocalCacher # pylint: disable=W |
| +MAX_THREAD_NUMBER = 15 |
| +TASK_QUEUE = None |
| def SetUpSystemPaths(): # pragma: no cover |
| @@ -32,6 +41,92 @@ def SetUpSystemPaths(): # pragma: no cover |
| sys.path.insert(1, findit_root_dir) |
| +SetUpSystemPaths() |
| + |
| +# The lib is in predator/ root dir, and can be imported only when sys.path gets |
| +# set up. |
| +from lib.cache_decorator import Cached |
| +from local_cache import LocalCacher # pylint: disable=W |
| + |
| + |
| +def SignalWorkerThreads(): # pragma: no cover |
| + """Puts signal worker threads into task queue.""" |
| + global TASK_QUEUE # pylint: disable=W0602 |
| + if not TASK_QUEUE: |
| + return |
| + |
| + for _ in range(MAX_THREAD_NUMBER): |
| + TASK_QUEUE.put(None) |
| + |
| + # Give worker threads a chance to exit. |
| + # Workaround the harmless bug in python 2.7 below. |
| + time.sleep(1) |
| + |
| + |
| +atexit.register(SignalWorkerThreads) |
| + |
| + |
| +def Worker(): # pragma: no cover |
| + global TASK_QUEUE # pylint: disable=W0602 |
| + while True: |
| + try: |
| + task = TASK_QUEUE.get() |
| + if not task: |
| + return |
| + except TypeError: |
| + # According to http://bugs.python.org/issue14623, this is a harmless bug |
| + # in python 2.7 which won't be fixed. |
| + # The exception is raised on daemon threads when python interpreter is |
| + # shutting down. |
| + return |
| + |
| + function, args, kwargs, result_semaphore = task |
| + try: |
| + function(*args, **kwargs) |
| + except Exception: |
| + print 'Caught exception in thread.' |
| + print traceback.format_exc() |
| + # Continue to process tasks in queue, in case every thread fails, the |
| + # main thread will be waiting forever. |
| + continue |
| + finally: |
| + # Signal one task is done in case of exception. |
| + result_semaphore.release() |
| + |
| + |
| +def RunTasks(tasks): # pragma: no cover |
| + """Run given tasks. Not thread-safe: no concurrent calls of this function. |
| + |
| + Return after all tasks were completed. A task is a dict as below: |
| + { |
| + 'function': the function to call, |
| + 'args': the positional argument to pass to the function, |
| + 'kwargs': the key-value arguments to pass to the function, |
| + } |
| + """ |
| + if not tasks: |
| + return |
| + |
| + global TASK_QUEUE |
| + if not TASK_QUEUE: |
| + TASK_QUEUE = Queue.Queue() |
| + for index in range(MAX_THREAD_NUMBER): |
| + thread = threading.Thread(target=Worker, name='worker_%s' % index) |
| + # Set as daemon, so no join is needed. |
| + thread.daemon = True |
| + thread.start() |
| + |
| + result_semaphore = threading.Semaphore(0) |
| + # Push task to task queue for execution. |
| + for task in tasks: |
| + TASK_QUEUE.put((task['function'], task.get('args', []), |
| + task.get('kwargs', {}), result_semaphore)) |
| + |
| + # Wait until all tasks to be executed. |
| + for _ in tasks: |
| + result_semaphore.acquire() |
| + |
| + |
| @Cached(namespace='Command-output', cacher=LocalCacher()) |
| def GetCommandOutput(command): # pragma: no cover |
| """Gets the output stream of executable command. |
| @@ -47,7 +142,18 @@ def GetCommandOutput(command): # pragma: no cover |
| stdoutdata, stderrdata = p.communicate() |
| if p.returncode != 0: |
| - logging.error('Error running command %s: %s', command, stderrdata) |
| - return None |
| + raise Exception('Error running command %s: %s' % (command, stderrdata)) |
| return stdoutdata |
| + |
| + |
| +def GetLockedMethod(cls, method): |
| + |
| + def LockedMethod(cls, *args, **kwargs): # pylint: disable=W |
| + if not hasattr(LockedMethod, 'lock'): |
| + LockedMethod.lock = threading.Lock() |
| + |
| + with LockedMethod.lock: |
| + return method(*args, **kwargs) |
|
stgao
2016/11/17 05:03:05
This is thread safe if the lock is set appropriate
Sharu Jiang
2016/11/17 09:03:16
Refactor this, since this is more general function
|
| + |
| + return functools.partial(LockedMethod, cls) |