Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(180)

Side by Side Diff: appengine/findit/util_scripts/script_util.py

Issue 2432203003: [Predator] Run predator. (Closed)
Patch Set: Fix flaky BadStatusLine exception. Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """This module contains util functions that local scripts can use.""" 5 """This module contains util functions that local scripts can use."""
6 6
7 import atexit
8 import cgi
9 import functools
10 import json
7 import logging 11 import logging
8 import os 12 import os
13 import Queue
14 import re
9 import subprocess 15 import subprocess
10 import sys 16 import sys
17 import threading
18 import traceback
19 import time
11 20
12 from lib.cache_decorator import Cached 21 MAX_THREAD_NUMBER = 15
13 from local_cache import LocalCacher # pylint: disable=W 22 TASK_QUEUE = None
14 23
15 24
16 def SetUpSystemPaths(): # pragma: no cover 25 def SetUpSystemPaths(): # pragma: no cover
17 """Sets system paths so as to import modules in findit, third_party and 26 """Sets system paths so as to import modules in findit, third_party and
18 appengine.""" 27 appengine."""
19 findit_root_dir = os.path.join(os.path.dirname(__file__), os.path.pardir) 28 findit_root_dir = os.path.join(os.path.dirname(__file__), os.path.pardir)
20 third_party_dir = os.path.join(findit_root_dir, 'third_party') 29 third_party_dir = os.path.join(findit_root_dir, 'third_party')
21 appengine_sdk_dir = os.path.join(findit_root_dir, os.path.pardir, 30 appengine_sdk_dir = os.path.join(findit_root_dir, os.path.pardir,
22 os.path.pardir, os.path.pardir, 31 os.path.pardir, os.path.pardir,
23 'google_appengine') 32 'google_appengine')
24 33
25 # Add App Engine SDK dir to sys.path. 34 # Add App Engine SDK dir to sys.path.
26 sys.path.insert(1, appengine_sdk_dir) 35 sys.path.insert(1, appengine_sdk_dir)
27 sys.path.insert(1, third_party_dir) 36 sys.path.insert(1, third_party_dir)
28 import dev_appserver 37 import dev_appserver
29 dev_appserver.fix_sys_path() 38 dev_appserver.fix_sys_path()
30 39
31 # Add Findit root dir to sys.path so that modules in Findit is available. 40 # Add Findit root dir to sys.path so that modules in Findit is available.
32 sys.path.insert(1, findit_root_dir) 41 sys.path.insert(1, findit_root_dir)
33 42
34 43
44 SetUpSystemPaths()
45
46 # The lib is in predator/ root dir, and can be imported only when sys.path gets
47 # set up.
48 from lib.cache_decorator import Cached
49 from local_cache import LocalCacher # pylint: disable=W
50
51
52 def SignalWorkerThreads(): # pragma: no cover
53 """Puts signal worker threads into task queue."""
54 global TASK_QUEUE # pylint: disable=W0602
55 if not TASK_QUEUE:
56 return
57
58 for _ in range(MAX_THREAD_NUMBER):
59 TASK_QUEUE.put(None)
60
61 # Give worker threads a chance to exit.
62 # Workaround the harmless bug in python 2.7 below.
63 time.sleep(1)
64
65
66 atexit.register(SignalWorkerThreads)
67
68
69 def Worker(): # pragma: no cover
70 global TASK_QUEUE # pylint: disable=W0602
71 while True:
72 try:
73 task = TASK_QUEUE.get()
74 if not task:
75 return
76 except TypeError:
77 # According to http://bugs.python.org/issue14623, this is a harmless bug
78 # in python 2.7 which won't be fixed.
79 # The exception is raised on daemon threads when python interpreter is
80 # shutting down.
81 return
82
83 function, args, kwargs, result_semaphore = task
84 try:
85 function(*args, **kwargs)
86 except Exception:
87 print 'Caught exception in thread.'
88 print traceback.format_exc()
89 # Continue to process tasks in queue, in case every thread fails, the
90 # main thread will be waiting forever.
91 continue
92 finally:
93 # Signal one task is done in case of exception.
94 result_semaphore.release()
95
96
97 def RunTasks(tasks): # pragma: no cover
98 """Run given tasks. Not thread-safe: no concurrent calls of this function.
99
100 Return after all tasks were completed. A task is a dict as below:
101 {
102 'function': the function to call,
103 'args': the positional argument to pass to the function,
104 'kwargs': the key-value arguments to pass to the function,
105 }
106 """
107 if not tasks:
108 return
109
110 global TASK_QUEUE
111 if not TASK_QUEUE:
112 TASK_QUEUE = Queue.Queue()
113 for index in range(MAX_THREAD_NUMBER):
114 thread = threading.Thread(target=Worker, name='worker_%s' % index)
115 # Set as daemon, so no join is needed.
116 thread.daemon = True
117 thread.start()
118
119 result_semaphore = threading.Semaphore(0)
120 # Push task to task queue for execution.
121 for task in tasks:
122 TASK_QUEUE.put((task['function'], task.get('args', []),
123 task.get('kwargs', {}), result_semaphore))
124
125 # Wait until all tasks to be executed.
126 for _ in tasks:
127 result_semaphore.acquire()
128
129
35 @Cached(namespace='Command-output', cacher=LocalCacher()) 130 @Cached(namespace='Command-output', cacher=LocalCacher())
36 def GetCommandOutput(command): # pragma: no cover 131 def GetCommandOutput(command): # pragma: no cover
37 """Gets the output stream of executable command. 132 """Gets the output stream of executable command.
38 133
39 Args: 134 Args:
40 command (str): Command to execute to get output. 135 command (str): Command to execute to get output.
41 136
42 Return: 137 Return:
43 Output steam of the command. 138 Output steam of the command.
44 """ 139 """
45 p = subprocess.Popen( 140 p = subprocess.Popen(
46 command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) 141 command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
47 stdoutdata, stderrdata = p.communicate() 142 stdoutdata, stderrdata = p.communicate()
48 143
49 if p.returncode != 0: 144 if p.returncode != 0:
50 logging.error('Error running command %s: %s', command, stderrdata) 145 raise Exception('Error running command %s: %s' % (command, stderrdata))
51 return None
52 146
53 return stdoutdata 147 return stdoutdata
148
149
150 def GetLockedMethod(cls, method):
151
152 def LockedMethod(cls, *args, **kwargs): # pylint: disable=W
153 if not hasattr(LockedMethod, 'lock'):
154 LockedMethod.lock = threading.Lock()
155
156 with LockedMethod.lock:
157 return method(*args, **kwargs)
stgao 2016/11/17 05:03:05 This is thread safe if the lock is set appropriate
Sharu Jiang 2016/11/17 09:03:16 Refactor this, since this is more general function
158
159 return functools.partial(LockedMethod, cls)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698