Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(332)

Side by Side Diff: appengine/findit/util_scripts/script_util.py

Issue 2432203003: [Predator] Run predator. (Closed)
Patch Set: Rebase. Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """This module contains util functions that local scripts can use.""" 5 """This module contains util functions that local scripts can use."""
6 6
7 import logging 7 import atexit
8 import functools
8 import os 9 import os
10 import Queue
9 import subprocess 11 import subprocess
10 import sys 12 import sys
13 import threading
14 import time
15 import traceback
11 16
12 from lib.cache_decorator import Cached 17 MAX_THREAD_NUMBER = 15
13 from local_cache import LocalCacher # pylint: disable=W 18 TASK_QUEUE = None
14 19
15 20
16 def SetUpSystemPaths(): # pragma: no cover 21 def SetUpSystemPaths(): # pragma: no cover
17 """Sets system paths so as to import modules in findit, third_party and 22 """Sets system paths so as to import modules in findit, third_party and
18 appengine.""" 23 appengine."""
19 findit_root_dir = os.path.join(os.path.dirname(__file__), os.path.pardir) 24 findit_root_dir = os.path.join(os.path.dirname(__file__), os.path.pardir)
25 first_party_dir = os.path.join(findit_root_dir, 'first_party')
20 third_party_dir = os.path.join(findit_root_dir, 'third_party') 26 third_party_dir = os.path.join(findit_root_dir, 'third_party')
21 appengine_sdk_dir = os.path.join(findit_root_dir, os.path.pardir, 27 appengine_sdk_dir = os.path.join(findit_root_dir, os.path.pardir,
22 os.path.pardir, os.path.pardir, 28 os.path.pardir, os.path.pardir,
23 'google_appengine') 29 'google_appengine')
24 30
25 # Add App Engine SDK dir to sys.path. 31 # Add App Engine SDK dir to sys.path.
32 sys.path.insert(1, third_party_dir)
33 sys.path.insert(1, first_party_dir)
26 sys.path.insert(1, appengine_sdk_dir) 34 sys.path.insert(1, appengine_sdk_dir)
27 sys.path.insert(1, third_party_dir)
28 import dev_appserver 35 import dev_appserver
29 dev_appserver.fix_sys_path() 36 dev_appserver.fix_sys_path()
30 37
31 # Add Findit root dir to sys.path so that modules in Findit is available. 38 # Add Findit root dir to sys.path so that modules in Findit is available.
32 sys.path.insert(1, findit_root_dir) 39 sys.path.insert(1, findit_root_dir)
33 40
34 41
42 SetUpSystemPaths()
43
44 # The lib is in predator/ root dir, and can be imported only when sys.path gets
45 # set up.
46 from lib.cache_decorator import Cached
47 from local_cache import LocalCacher # pylint: disable=W
48
49
50 def SignalWorkerThreads(): # pragma: no cover
51 """Puts signal worker threads into task queue."""
52 global TASK_QUEUE # pylint: disable=W0602
53 if not TASK_QUEUE:
54 return
55
56 for _ in range(MAX_THREAD_NUMBER):
57 TASK_QUEUE.put(None)
58
59 # Give worker threads a chance to exit.
60 # Workaround the harmless bug in python 2.7 below.
61 time.sleep(1)
62
63
64 atexit.register(SignalWorkerThreads)
65
66
67 def Worker(): # pragma: no cover
68 global TASK_QUEUE # pylint: disable=W0602
69 while True:
70 try:
71 task = TASK_QUEUE.get()
72 if not task:
73 return
74 except TypeError:
75 # According to http://bugs.python.org/issue14623, this is a harmless bug
76 # in python 2.7 which won't be fixed.
77 # The exception is raised on daemon threads when python interpreter is
78 # shutting down.
79 return
80
81 function, args, kwargs, result_semaphore = task
82 try:
83 function(*args, **kwargs)
84 except Exception:
85 print 'Caught exception in thread.'
86 print traceback.format_exc()
87 # Continue to process tasks in queue, in case every thread fails, the
88 # main thread will be waiting forever.
89 continue
90 finally:
91 # Signal one task is done in case of exception.
92 result_semaphore.release()
93
94
95 def RunTasks(tasks): # pragma: no cover
96 """Run given tasks. Not thread-safe: no concurrent calls of this function.
97
98 Return after all tasks were completed. A task is a dict as below:
99 {
100 'function': the function to call,
101 'args': the positional argument to pass to the function,
102 'kwargs': the key-value arguments to pass to the function,
103 }
104 """
105 if not tasks:
106 return
107
108 global TASK_QUEUE
109 if not TASK_QUEUE:
110 TASK_QUEUE = Queue.Queue()
111 for index in range(MAX_THREAD_NUMBER):
112 thread = threading.Thread(target=Worker, name='worker_%s' % index)
113 # Set as daemon, so no join is needed.
114 thread.daemon = True
115 thread.start()
116
117 result_semaphore = threading.Semaphore(0)
118 # Push task to task queue for execution.
119 for task in tasks:
120 TASK_QUEUE.put((task['function'], task.get('args', []),
121 task.get('kwargs', {}), result_semaphore))
122
123 # Wait until all tasks to be executed.
124 for _ in tasks:
125 result_semaphore.acquire()
126
127
35 @Cached(namespace='Command-output', cacher=LocalCacher()) 128 @Cached(namespace='Command-output', cacher=LocalCacher())
36 def GetCommandOutput(command): # pragma: no cover 129 def GetCommandOutput(command): # pragma: no cover
37 """Gets the output stream of executable command. 130 """Gets the output stream of executable command.
38 131
39 Args: 132 Args:
40 command (str): Command to execute to get output. 133 command (str): Command to execute to get output.
41 134
42 Return: 135 Return:
43 Output steam of the command. 136 Output steam of the command.
44 """ 137 """
45 p = subprocess.Popen( 138 p = subprocess.Popen(
46 command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) 139 command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
47 stdoutdata, stderrdata = p.communicate() 140 stdoutdata, stderrdata = p.communicate()
48 141
49 if p.returncode != 0: 142 if p.returncode != 0:
50 logging.error('Error running command %s: %s', command, stderrdata) 143 raise Exception('Error running command %s: %s' % (command, stderrdata))
51 return None
52 144
53 return stdoutdata 145 return stdoutdata
146
147
148 def GetLockedMethod(cls, method_name, lock): # pragma: no cover
149 """Returns a class/object method serialized with lock."""
150 method = getattr(cls, method_name)
151
152 def LockedMethod(cls, *args, **kwargs): # pylint: disable=W
153 with lock:
154 return method(*args, **kwargs)
155
156 return functools.partial(LockedMethod, cls)
OLDNEW
« no previous file with comments | « appengine/findit/util_scripts/local_cache.py ('k') | appengine/findit/util_scripts/test/script_util_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698