Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(315)

Side by Side Diff: appengine/findit/util_scripts/script_util.py

Issue 2432203003: [Predator] Run predator. (Closed)
Patch Set: . Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """This module contains util functions that local scripts can use.""" 5 """This module contains util functions that local scripts can use."""
6 6
7 import atexit
8 import cgi
9 import json
7 import os 10 import os
11 import Queue
12 import re
8 import sys 13 import sys
14 import threading
15 import traceback
16 import time
17
18 MAX_THREAD_NUMBER = 10
19 TASK_QUEUE = None
9 20
10 21
11 def SetUpSystemPaths(): 22 def SetUpSystemPaths():
12 """Sets system paths so as to import modules in findit, third_party and 23 """Sets system paths so as to import modules in findit, third_party and
13 appengine.""" 24 appengine."""
14 findit_root_dir = os.path.join(os.path.dirname(__file__), os.path.pardir) 25 findit_root_dir = os.path.join(os.path.dirname(__file__), os.path.pardir)
15 third_party_dir = os.path.join(findit_root_dir, 'third_party') 26 third_party_dir = os.path.join(findit_root_dir, 'third_party')
16 appengine_sdk_dir = os.path.join(findit_root_dir, os.path.pardir, 27 appengine_sdk_dir = os.path.join(findit_root_dir, os.path.pardir,
17 os.path.pardir, os.path.pardir, 28 os.path.pardir, os.path.pardir,
18 'google_appengine') 29 'google_appengine')
19 30
20 # Add App Engine SDK dir to sys.path. 31 # Add App Engine SDK dir to sys.path.
21 sys.path.insert(1, appengine_sdk_dir) 32 sys.path.insert(1, appengine_sdk_dir)
22 sys.path.insert(1, third_party_dir) 33 sys.path.insert(1, third_party_dir)
23 import dev_appserver 34 import dev_appserver
24 dev_appserver.fix_sys_path() 35 dev_appserver.fix_sys_path()
25 36
26 # Add Findit root dir to sys.path so that modules in Findit is available. 37 # Add Findit root dir to sys.path so that modules in Findit is available.
27 sys.path.insert(1, findit_root_dir) 38 sys.path.insert(1, findit_root_dir)
39
40
41 def SignalWorkerThreads():
stgao 2016/11/02 01:45:49 Is this a copy from the version in ClusterFuzz? Or
Sharu Jiang 2016/11/11 23:10:26 It is a copy from the Clusterfuzz.
42 """Puts signal worker threads into task queue."""
43 global TASK_QUEUE # pylint: disable=W0602
44 if not TASK_QUEUE:
45 return
46
47 for _ in range(MAX_THREAD_NUMBER):
48 TASK_QUEUE.put(None)
49
50 # Give worker threads a chance to exit.
51 # Workaround the harmless bug in python 2.7 below.
52 time.sleep(1)
53
54
55 atexit.register(SignalWorkerThreads)
56
57
58 def Worker():
59 global TASK_QUEUE # pylint: disable=W0602
60 while True:
61 try:
62 task = TASK_QUEUE.get()
63 if not task:
64 return
65 except TypeError:
66 # According to http://bugs.python.org/issue14623, this is a harmless bug
67 # in python 2.7 which won't be fixed.
68 # The exception is raised on daemon threads when python interpreter is
69 # shutting down.
70 return
71
72 function, args, kwargs, result_semaphore = task
73 try:
74 function(*args, **kwargs)
75 except Exception:
76 print traceback.format_exc()
77 # Continue to process tasks in queue, in case every thread fails, the
78 # main thread will be waiting forever.
79 continue
80 finally:
81 # Signal one task is done in case of exception.
82 result_semaphore.release()
83
84
85 def RunTasks(tasks):
86 """Run given tasks. Not thread-safe: no concurrent calls of this function.
87
88 Return after all tasks were completed. A task is a dict as below:
89 {
90 'function': the function to call,
91 'args': the positional argument to pass to the function,
92 'kwargs': the key-value arguments to pass to the function,
93 }
94 """
95 if not tasks:
96 return
97
98 global TASK_QUEUE
99 if not TASK_QUEUE:
100 TASK_QUEUE = Queue.Queue()
101 for index in range(MAX_THREAD_NUMBER):
102 thread = threading.Thread(target=Worker, name='worker_%s' % index)
103 # Set as daemon, so no join is needed.
104 thread.daemon = True
105 thread.start()
106
107 result_semaphore = threading.Semaphore(0)
108 # Push task to task queue for execution.
109 for task in tasks:
110 TASK_QUEUE.put((task['function'], task.get('args', []),
111 task.get('kwargs', {}), result_semaphore))
112
113 # Wait until all tasks to be executed.
114 for _ in tasks:
115 result_semaphore.acquire()
OLDNEW
« appengine/findit/util_scripts/iterator.py ('K') | « appengine/findit/util_scripts/iterator.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698