OLD | NEW |
---|---|
1 # Copyright 2016 The Chromium Authors. All rights reserved. | 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import argparse | |
5 import json | 6 import json |
6 import os | 7 import os |
7 import re | 8 import re |
8 import threading | 9 import sys |
9 import time | 10 import time |
10 import subprocess | 11 |
11 import sys | 12 from googleapiclient import discovery |
13 from oauth2client.client import GoogleCredentials | |
12 | 14 |
13 # NOTE: The parent directory needs to be first in sys.path to avoid conflicts | 15 # NOTE: The parent directory needs to be first in sys.path to avoid conflicts |
14 # with catapult modules that have colliding names, as catapult inserts itself | 16 # with catapult modules that have colliding names, as catapult inserts itself |
15 # into the path as the second element. This is an ugly and fragile hack. | 17 # into the path as the second element. This is an ugly and fragile hack. |
16 sys.path.insert(0, | 18 sys.path.insert(0, |
17 os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir)) | 19 os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir)) |
18 import controller | 20 import controller |
21 from frontend.clovis_task import ClovisTask | |
19 from google_storage_accessor import GoogleStorageAccessor | 22 from google_storage_accessor import GoogleStorageAccessor |
20 import loading_trace | 23 import loading_trace |
21 from loading_trace_database import LoadingTraceDatabase | 24 from loading_trace_database import LoadingTraceDatabase |
22 import options | 25 import options |
23 | 26 |
24 | 27 |
25 class ServerApp(object): | 28 class Worker(object): |
26 """Simple web server application, collecting traces and writing them in | 29 def __init__(self, config): |
27 Google Cloud Storage. | 30 """See README.md for the config format""" |
28 """ | 31 self._credentials = GoogleCredentials.get_application_default() |
29 | 32 |
30 def __init__(self, configuration_file): | 33 # Separate the cloud storage path into the bucket and the base path under |
31 """|configuration_file| is a path to a file containing JSON as described in | 34 # the bucket. |
32 README.md. | 35 storage_path_components = config['cloud_storage_path'].split('/') |
33 """ | 36 self._bucket_name = storage_path_components[0] |
34 self._tasks = [] # List of remaining tasks, only modified by _thread. | 37 self._project_name = config['project_name'] |
35 self._failed_tasks = [] # Failed tasks, only modified by _thread. | 38 self._taskqueue_tag = config['taskqueue_tag'] |
36 self._thread = None | |
37 self._tasks_lock = threading.Lock() # Protects _tasks and _failed_tasks. | |
38 self._initial_task_count = -1 | |
39 self._start_time = None | |
40 print 'Reading configuration' | |
41 with open(configuration_file) as config_json: | |
42 config = json.load(config_json) | |
43 | 39 |
44 # Separate the cloud storage path into the bucket and the base path under | 40 self._base_path_in_bucket = '' |
45 # the bucket. | 41 if len(storage_path_components) > 1: |
46 storage_path_components = config['cloud_storage_path'].split('/') | 42 self._base_path_in_bucket = '/'.join(storage_path_components[1:]) |
47 self._bucket_name = storage_path_components[0] | 43 if not self._base_path_in_bucket.endswith('/'): |
48 self._base_path_in_bucket = '' | 44 self._base_path_in_bucket += '/' |
49 if len(storage_path_components) > 1: | |
50 self._base_path_in_bucket = '/'.join(storage_path_components[1:]) | |
51 if not self._base_path_in_bucket.endswith('/'): | |
52 self._base_path_in_bucket += '/' | |
53 | 45 |
54 self._src_path = config['src_path'] | 46 # TODO: improve the trace database to support concurrent access. |
55 self._google_storage_accessor = GoogleStorageAccessor( | 47 self._traces_dir = self._base_path_in_bucket + 'traces/' |
56 project_name=config['project_name'], bucket_name=self._bucket_name) | 48 self._trace_database = LoadingTraceDatabase({}) |
49 | |
50 self._src_path = config['src_path'] | |
51 self._google_storage_accessor = GoogleStorageAccessor( | |
52 credentials=self._credentials, project_name=self._project_name, | |
53 bucket_name=self._bucket_name) | |
57 | 54 |
58 # Initialize the global options that will be used during trace generation. | 55 # Initialize the global options that will be used during trace generation. |
59 options.OPTIONS.ParseArgs([]) | 56 options.OPTIONS.ParseArgs([]) |
60 options.OPTIONS.local_binary = config['chrome_path'] | 57 options.OPTIONS.local_binary = config['chrome_path'] |
61 | 58 |
62 def _IsProcessingTasks(self): | 59 def Start(self): |
63 """Returns True if the application is currently processing tasks.""" | 60 """Main worker loop. |
64 return self._thread is not None and self._thread.is_alive() | 61 |
62 Repeatedly pulls tasks from the task queue and processes them. Returns when | |
63 the queue is empty | |
64 """ | |
65 task_api = discovery.build('taskqueue', 'v1beta2', | |
66 credentials=self._credentials) | |
67 queue_name = 'clovis-queue' | |
68 # Workaround for | |
69 # https://code.google.com/p/googleappengine/issues/detail?id=10199 | |
70 project = 's~' + self._project_name | |
71 | |
72 finished = False | |
73 started = False | |
74 while not finished: | |
75 response = task_api.tasks().lease( | |
76 project=project, taskqueue=queue_name, numTasks=1, leaseSecs=60, | |
Benoit L
2016/04/19 08:47:41
Is 60s enough?
droger
2016/04/19 11:00:00
You're right, it may not be enough in some cases.
| |
77 groupByTag=True, tag=self._taskqueue_tag).execute() | |
78 if (not response.get('items')) or (len(response['items']) < 1): | |
79 if started: | |
80 finished = True | |
81 else: | |
82 # Do not stop before processing at least one task. | |
83 delay_seconds = 30 | |
84 print 'No tasks, retrying in %s seconds' % delay_seconds | |
85 time.sleep(delay_seconds) | |
86 continue | |
87 | |
88 started = True | |
89 google_task = response['items'][0] | |
90 task_id = google_task['id'] | |
91 print 'Processing task %s' % task_id | |
92 | |
93 clovis_task = ClovisTask.FromBase64(google_task['payloadBase64']) | |
94 self._ProcessClovisTask(clovis_task) | |
95 | |
96 task_api.tasks().delete(project=project, taskqueue=queue_name, | |
97 task=task_id).execute() | |
98 print 'Finished task %s' % task_id | |
99 self._Finalize() | |
Benoit L
2016/04/19 08:47:41
Does it mean that this is all or nothing? That is,
droger
2016/04/19 11:00:00
Yes for now, because the trace database does not s
| |
100 | |
101 def _Finalize(self): | |
102 """Called before exiting.""" | |
103 print 'Uploading trace database' | |
104 self._google_storage_accessor.UploadString( | |
105 json.dumps(self._trace_database.ToJsonDict(), indent=2), | |
106 self._traces_dir + 'trace_database.json') | |
107 # TODO(droger): Implement automatic instance destruction. | |
108 print 'Done' | |
109 | |
65 | 110 |
66 def _GenerateTrace(self, url, emulate_device, emulate_network, filename, | 111 def _GenerateTrace(self, url, emulate_device, emulate_network, filename, |
67 log_filename): | 112 log_filename): |
68 """ Generates a trace on _thread. | 113 """ Generates a trace. |
69 | 114 |
70 Args: | 115 Args: |
71 url: URL as a string. | 116 url: URL as a string. |
72 emulate_device: Name of the device to emulate. Empty for no emulation. | 117 emulate_device: Name of the device to emulate. Empty for no emulation. |
73 emulate_network: Type of network emulation. Empty for no emulation. | 118 emulate_network: Type of network emulation. Empty for no emulation. |
74 filename: Name of the file where the trace is saved. | 119 filename: Name of the file where the trace is saved. |
75 log_filename: Name of the file where standard output and errors are logged | 120 log_filename: Name of the file where standard output and errors are logged |
76 | 121 |
77 Returns: | 122 Returns: |
78 A dictionary of metadata about the trace, including a 'succeeded' field | 123 A dictionary of metadata about the trace, including a 'succeeded' field |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
116 | 161 |
117 if trace: | 162 if trace: |
118 with open(filename, 'w') as f: | 163 with open(filename, 'w') as f: |
119 json.dump(trace.ToJsonDict(), f, sort_keys=True, indent=2) | 164 json.dump(trace.ToJsonDict(), f, sort_keys=True, indent=2) |
120 | 165 |
121 sys.stdout = old_stdout | 166 sys.stdout = old_stdout |
122 sys.stderr = old_stderr | 167 sys.stderr = old_stderr |
123 | 168 |
124 return trace_metadata | 169 return trace_metadata |
125 | 170 |
126 def _GetCurrentTaskCount(self): | 171 def _ProcessClovisTask(self, clovis_task): |
127 """Returns the number of remaining tasks. Thread safe.""" | 172 """Processes one clovis_task""" |
128 self._tasks_lock.acquire() | 173 if clovis_task.Action() != 'trace': |
129 task_count = len(self._tasks) | 174 print 'Unsupported task action: %s' % clovis_task.Action() |
130 self._tasks_lock.release() | 175 return |
131 return task_count | |
132 | 176 |
133 def _ProcessTasks(self, tasks, repeat_count, emulate_device, emulate_network): | 177 # Extract the task parameters. |
134 """Iterates over _task, generating a trace for each of them. Uploads the | 178 params = clovis_task.Params() |
135 resulting traces to Google Cloud Storage. Runs on _thread. | 179 urls = params['urls'] |
180 repeat_count = int(params.get('repeat_count', '1')) | |
Benoit L
2016/04/19 08:47:41
are params necessarily strings?
droger
2016/04/19 11:00:00
Good point, changed to int.
| |
181 emulate_device = params.get('emulate_device') | |
182 emulate_network = params.get('emulate_network') | |
136 | 183 |
137 Args: | |
138 tasks: The list of URLs to process. | |
139 repeat_count: The number of traces generated for each URL. | |
140 emulate_device: Name of the device to emulate. Empty for no emulation. | |
141 emulate_network: Type of network emulation. Empty for no emulation. | |
142 """ | |
143 # The main thread might be reading the task lists, take the lock to modify. | |
144 self._tasks_lock.acquire() | |
145 self._tasks = tasks | |
146 self._failed_tasks = [] | |
147 self._tasks_lock.release() | |
148 failures_dir = self._base_path_in_bucket + 'failures/' | 184 failures_dir = self._base_path_in_bucket + 'failures/' |
149 traces_dir = self._base_path_in_bucket + 'traces/' | |
150 | |
151 trace_database = LoadingTraceDatabase({}) | |
152 | |
153 # TODO(blundell): Fix this up. | 185 # TODO(blundell): Fix this up. |
154 logs_dir = self._base_path_in_bucket + 'analyze_logs/' | 186 logs_dir = self._base_path_in_bucket + 'analyze_logs/' |
155 log_filename = 'analyze.log' | 187 log_filename = 'analyze.log' |
156 # Avoid special characters in storage object names | 188 # Avoid special characters in storage object names |
157 pattern = re.compile(r"[#\?\[\]\*/]") | 189 pattern = re.compile(r"[#\?\[\]\*/]") |
158 while len(self._tasks) > 0: | 190 |
159 url = self._tasks[-1] | 191 while len(urls) > 0: |
192 url = urls.pop() | |
160 local_filename = pattern.sub('_', url) | 193 local_filename = pattern.sub('_', url) |
161 for repeat in range(repeat_count): | 194 for repeat in range(repeat_count): |
162 print 'Generating trace for URL: %s' % url | 195 print 'Generating trace for URL: %s' % url |
163 remote_filename = local_filename + '/' + str(repeat) | 196 remote_filename = local_filename + '/' + str(repeat) |
164 trace_metadata = self._GenerateTrace( | 197 trace_metadata = self._GenerateTrace( |
165 url, emulate_device, emulate_network, local_filename, log_filename) | 198 url, emulate_device, emulate_network, local_filename, log_filename) |
166 if trace_metadata['succeeded']: | 199 if trace_metadata['succeeded']: |
167 print 'Uploading: %s' % remote_filename | 200 print 'Uploading: %s' % remote_filename |
168 remote_trace_location = traces_dir + remote_filename | 201 remote_trace_location = self._traces_dir + remote_filename |
169 self._google_storage_accessor.UploadFile(local_filename, | 202 self._google_storage_accessor.UploadFile(local_filename, |
170 remote_trace_location) | 203 remote_trace_location) |
171 full_cloud_storage_path = ('gs://' + self._bucket_name + '/' + | 204 full_cloud_storage_path = ('gs://' + self._bucket_name + '/' + |
172 remote_trace_location) | 205 remote_trace_location) |
173 trace_database.AddTrace(full_cloud_storage_path, trace_metadata) | 206 self._trace_database.AddTrace(full_cloud_storage_path, trace_metadata) |
174 else: | 207 else: |
175 print 'Trace generation failed for URL: %s' % url | 208 print 'Trace generation failed for URL: %s' % url |
176 self._tasks_lock.acquire() | 209 # TODO: upload the failure |
177 self._failed_tasks.append({ "url": url, "repeat": repeat}) | |
178 self._tasks_lock.release() | |
179 if os.path.isfile(local_filename): | 210 if os.path.isfile(local_filename): |
180 self._google_storage_accessor.UploadFile(local_filename, | 211 self._google_storage_accessor.UploadFile(local_filename, |
181 failures_dir + remote_filename) | 212 failures_dir + remote_filename) |
182 print 'Uploading log' | 213 print 'Uploading log' |
183 self._google_storage_accessor.UploadFile(log_filename, | 214 self._google_storage_accessor.UploadFile(log_filename, |
184 logs_dir + remote_filename) | 215 logs_dir + remote_filename) |
185 # Pop once task is finished, for accurate status tracking. | |
186 self._tasks_lock.acquire() | |
187 url = self._tasks.pop() | |
188 self._tasks_lock.release() | |
189 | |
190 self._google_storage_accessor.UploadString( | |
191 json.dumps(trace_database.ToJsonDict(), indent=2), | |
192 traces_dir + 'trace_database.json') | |
193 | |
194 if len(self._failed_tasks) > 0: | |
195 print 'Uploading failing URLs' | |
196 self._google_storage_accessor.UploadString( | |
197 json.dumps(self._failed_tasks, indent=2), | |
198 failures_dir + 'failures.json') | |
199 | |
200 def _SetTaskList(self, http_body): | |
201 """Sets the list of tasks and starts processing them | |
202 | |
203 Args: | |
204 http_body: JSON dictionary. See README.md for a description of the format. | |
205 | |
206 Returns: | |
207 A string to be sent back to the client, describing the success status of | |
208 the request. | |
209 """ | |
210 if self._IsProcessingTasks(): | |
211 return 'Error: Already running\n' | |
212 | |
213 load_parameters = json.loads(http_body) | |
214 try: | |
215 tasks = load_parameters['urls'] | |
216 except KeyError: | |
217 return 'Error: invalid urls\n' | |
218 # Optional parameters. | |
219 try: | |
220 repeat_count = int(load_parameters.get('repeat_count', '1')) | |
221 except ValueError: | |
222 return 'Error: invalid repeat_count\n' | |
223 emulate_device = load_parameters.get('emulate_device', '') | |
224 emulate_network = load_parameters.get('emulate_network', '') | |
225 | |
226 if len(tasks) == 0: | |
227 return 'Error: Empty task list\n' | |
228 else: | |
229 self._initial_task_count = len(tasks) | |
230 self._start_time = time.time() | |
231 self._thread = threading.Thread( | |
232 target = self._ProcessTasks, | |
233 args = (tasks, repeat_count, emulate_device, emulate_network)) | |
234 self._thread.start() | |
235 return 'Starting generation of %s tasks\n' % str(self._initial_task_count) | |
236 | |
237 def __call__(self, environ, start_response): | |
238 path = environ['PATH_INFO'] | |
239 | |
240 if path == '/set_tasks': | |
241 # Get the tasks from the HTTP body. | |
242 try: | |
243 body_size = int(environ.get('CONTENT_LENGTH', 0)) | |
244 except (ValueError): | |
245 body_size = 0 | |
246 body = environ['wsgi.input'].read(body_size) | |
247 data = self._SetTaskList(body) | |
248 elif path == '/test': | |
249 data = 'hello\n' | |
250 elif path == '/status': | |
251 if not self._IsProcessingTasks(): | |
252 data = 'Idle\n' | |
253 else: | |
254 task_count = self._GetCurrentTaskCount() | |
255 if task_count == 0: | |
256 data = '%s tasks complete. Finalizing.\n' % self._initial_task_count | |
257 else: | |
258 data = 'Remaining tasks: %s / %s\n' % ( | |
259 task_count, self._initial_task_count) | |
260 elapsed = time.time() - self._start_time | |
261 data += 'Elapsed time: %s seconds\n' % str(elapsed) | |
262 self._tasks_lock.acquire() | |
263 failed_tasks = self._failed_tasks | |
264 self._tasks_lock.release() | |
265 data += '%s failed tasks:\n' % len(failed_tasks) | |
266 data += json.dumps(failed_tasks, indent=2) | |
267 else: | |
268 start_response('404 NOT FOUND', [('Content-Length', '0')]) | |
269 return iter(['']) | |
270 | |
271 response_headers = [ | |
272 ('Content-type','text/plain'), | |
273 ('Content-Length', str(len(data))) | |
274 ] | |
275 start_response('200 OK', response_headers) | |
276 return iter([data]) | |
277 | 216 |
278 | 217 |
279 def StartApp(configuration_file): | 218 if __name__ == '__main__': |
280 return ServerApp(configuration_file) | 219 parser = argparse.ArgumentParser( |
220 description='ComputeEngine Worker for Clovis') | |
221 parser.add_argument('--config', required=True, | |
222 help='Path to the configuration file.') | |
223 args = parser.parse_args() | |
224 print 'Reading configuration' | |
225 with open(args.config) as config_json: | |
226 worker = Worker(json.load(config_json)) | |
227 worker.Start() | |
228 | |
OLD | NEW |