OLD | NEW |
---|---|
1 # Copyright 2016 The Chromium Authors. All rights reserved. | 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import argparse | 5 import argparse |
6 import json | 6 import json |
7 import logging | 7 import logging |
8 import os | 8 import os |
9 import re | 9 import re |
10 import sys | 10 import sys |
(...skipping 14 matching lines...) Expand all Loading... | |
25 import loading_trace | 25 import loading_trace |
26 from loading_trace_database import LoadingTraceDatabase | 26 from loading_trace_database import LoadingTraceDatabase |
27 import options | 27 import options |
28 | 28 |
29 | 29 |
30 class Worker(object): | 30 class Worker(object): |
31 def __init__(self, config, logger): | 31 def __init__(self, config, logger): |
32 """See README.md for the config format.""" | 32 """See README.md for the config format.""" |
33 self._project_name = config['project_name'] | 33 self._project_name = config['project_name'] |
34 self._taskqueue_tag = config['taskqueue_tag'] | 34 self._taskqueue_tag = config['taskqueue_tag'] |
35 self._src_path = config['src_path'] | |
35 self._credentials = GoogleCredentials.get_application_default() | 36 self._credentials = GoogleCredentials.get_application_default() |
36 self._logger = logger | 37 self._logger = logger |
37 | 38 |
38 # Separate the cloud storage path into the bucket and the base path under | 39 # Separate the cloud storage path into the bucket and the base path under |
39 # the bucket. | 40 # the bucket. |
40 storage_path_components = config['cloud_storage_path'].split('/') | 41 storage_path_components = config['cloud_storage_path'].split('/') |
41 self._bucket_name = storage_path_components[0] | 42 self._bucket_name = storage_path_components[0] |
42 self._base_path_in_bucket = '' | 43 self._base_path_in_bucket = '' |
43 if len(storage_path_components) > 1: | 44 if len(storage_path_components) > 1: |
44 self._base_path_in_bucket = '/'.join(storage_path_components[1:]) | 45 self._base_path_in_bucket = '/'.join(storage_path_components[1:]) |
45 if not self._base_path_in_bucket.endswith('/'): | 46 if not self._base_path_in_bucket.endswith('/'): |
46 self._base_path_in_bucket += '/' | 47 self._base_path_in_bucket += '/' |
47 | 48 |
48 # TODO: improve the trace database to support concurrent access. | |
49 self._traces_dir = self._base_path_in_bucket + 'traces/' | |
50 self._trace_database = LoadingTraceDatabase({}) | |
51 | |
52 self._src_path = config['src_path'] | |
53 self._google_storage_accessor = GoogleStorageAccessor( | 49 self._google_storage_accessor = GoogleStorageAccessor( |
54 credentials=self._credentials, project_name=self._project_name, | 50 credentials=self._credentials, project_name=self._project_name, |
55 bucket_name=self._bucket_name) | 51 bucket_name=self._bucket_name) |
56 | 52 |
53 self._traces_dir = self._base_path_in_bucket + 'traces/' | |
Benoit L
2016/04/20 15:02:22
Generally speaking, I prefer to manipulate paths w
droger
2016/04/20 15:58:17
Done.
| |
54 self._trace_database_path = self._traces_dir + config.get( | |
55 'trace_database_filename', 'trace_database.json') | |
Benoit L
2016/04/20 15:02:22
Is the trace dir per worker?
If not, then the trac
droger
2016/04/20 15:58:17
It is intended that all worker write to the same d
| |
56 | |
57 # Recover any existing trace database in case the worker died. | |
58 self._DownloadTraceDatabase() | |
59 | |
57 # Initialize the global options that will be used during trace generation. | 60 # Initialize the global options that will be used during trace generation. |
58 options.OPTIONS.ParseArgs([]) | 61 options.OPTIONS.ParseArgs([]) |
59 options.OPTIONS.local_binary = config['chrome_path'] | 62 options.OPTIONS.local_binary = config['chrome_path'] |
60 | 63 |
61 def Start(self): | 64 def Start(self): |
62 """Main worker loop. | 65 """Main worker loop. |
63 | 66 |
64 Repeatedly pulls tasks from the task queue and processes them. Returns when | 67 Repeatedly pulls tasks from the task queue and processes them. Returns when |
65 the queue is empty. | 68 the queue is empty. |
66 """ | 69 """ |
(...skipping 20 matching lines...) Expand all Loading... | |
87 continue | 90 continue |
88 | 91 |
89 self._logger.info('Processing task %s' % task_id) | 92 self._logger.info('Processing task %s' % task_id) |
90 self._ProcessClovisTask(clovis_task) | 93 self._ProcessClovisTask(clovis_task) |
91 self._logger.debug('Deleting task %s' % task_id) | 94 self._logger.debug('Deleting task %s' % task_id) |
92 task_api.tasks().delete(project=project, taskqueue=queue_name, | 95 task_api.tasks().delete(project=project, taskqueue=queue_name, |
93 task=task_id).execute() | 96 task=task_id).execute() |
94 self._logger.info('Finished task %s' % task_id) | 97 self._logger.info('Finished task %s' % task_id) |
95 self._Finalize() | 98 self._Finalize() |
96 | 99 |
100 def _DownloadTraceDatabase(self): | |
101 """Downloads the trace database from CloudStorage.""" | |
102 trace_database_string = self._google_storage_accessor.DownloadAsString( | |
Benoit L
2016/04/20 15:02:22
What about:
trace_db_str = self._google_storage_a
droger
2016/04/20 15:58:17
Done.
| |
103 self._trace_database_path) | |
104 trace_database_dict = {} | |
105 if trace_database_string: | |
106 self._logger.warning( | |
107 'Importing existing trace database %s' % self._trace_database_path) | |
108 trace_database_dict = json.loads(trace_database_string) | |
109 self._trace_database = LoadingTraceDatabase(trace_database_dict) | |
110 | |
111 def _UploadTraceDatabase(self): | |
112 """Uploads the trace database to CloudStorage.""" | |
113 self._logger.info('Uploading trace database') | |
Benoit L
2016/04/20 15:02:22
tiny nit: Importing is "warning", uploading "info"
droger
2016/04/20 15:58:17
Done.
| |
114 self._google_storage_accessor.UploadString( | |
115 json.dumps(self._trace_database.ToJsonDict(), indent=2), | |
116 self._trace_database_path) | |
117 | |
97 def _FetchClovisTask(self, project_name, task_api, queue_name): | 118 def _FetchClovisTask(self, project_name, task_api, queue_name): |
98 """Fetches a ClovisTask from the task queue. | 119 """Fetches a ClovisTask from the task queue. |
99 | 120 |
100 Params: | 121 Params: |
101 project_name(str): The name of the Google Cloud project. | 122 project_name(str): The name of the Google Cloud project. |
102 task_api: The TaskQueue service. | 123 task_api: The TaskQueue service. |
103 queue_name(str): The name of the task queue. | 124 queue_name(str): The name of the task queue. |
104 | 125 |
105 Returns: | 126 Returns: |
106 (ClovisTask, str): The fetched ClovisTask and its task ID, or (None, None) | 127 (ClovisTask, str): The fetched ClovisTask and its task ID, or (None, None) |
107 if no tasks are found. | 128 if no tasks are found. |
108 """ | 129 """ |
109 response = task_api.tasks().lease( | 130 response = task_api.tasks().lease( |
110 project=project_name, taskqueue=queue_name, numTasks=1, leaseSecs=180, | 131 project=project_name, taskqueue=queue_name, numTasks=1, leaseSecs=180, |
111 groupByTag=True, tag=self._taskqueue_tag).execute() | 132 groupByTag=True, tag=self._taskqueue_tag).execute() |
112 if (not response.get('items')) or (len(response['items']) < 1): | 133 if (not response.get('items')) or (len(response['items']) < 1): |
113 return (None, None) | 134 return (None, None) |
114 | 135 |
115 google_task = response['items'][0] | 136 google_task = response['items'][0] |
116 task_id = google_task['id'] | 137 task_id = google_task['id'] |
117 clovis_task = ClovisTask.FromBase64(google_task['payloadBase64']) | 138 clovis_task = ClovisTask.FromBase64(google_task['payloadBase64']) |
118 return (clovis_task, task_id) | 139 return (clovis_task, task_id) |
119 | 140 |
120 def _Finalize(self): | 141 def _Finalize(self): |
121 """Called before exiting.""" | 142 """Called before exiting.""" |
122 self._logger.info('Uploading trace database') | |
123 self._google_storage_accessor.UploadString( | |
124 json.dumps(self._trace_database.ToJsonDict(), indent=2), | |
125 self._traces_dir + 'trace_database.json') | |
126 # TODO(droger): Implement automatic instance destruction. | 143 # TODO(droger): Implement automatic instance destruction. |
127 self._logger.info('Done') | 144 self._logger.info('Done') |
128 | 145 |
129 | |
130 def _GenerateTrace(self, url, emulate_device, emulate_network, filename, | 146 def _GenerateTrace(self, url, emulate_device, emulate_network, filename, |
131 log_filename): | 147 log_filename): |
132 """ Generates a trace. | 148 """ Generates a trace. |
133 | 149 |
134 Args: | 150 Args: |
135 url: URL as a string. | 151 url: URL as a string. |
136 emulate_device: Name of the device to emulate. Empty for no emulation. | 152 emulate_device: Name of the device to emulate. Empty for no emulation. |
137 emulate_network: Type of network emulation. Empty for no emulation. | 153 emulate_network: Type of network emulation. Empty for no emulation. |
138 filename: Name of the file where the trace is saved. | 154 filename: Name of the file where the trace is saved. |
139 log_filename: Name of the file where standard output and errors are | 155 log_filename: Name of the file where standard output and errors are |
140 logged. | 156 logged. |
141 | 157 |
142 Returns: | 158 Returns: |
143 A dictionary of metadata about the trace, including a 'succeeded' field | 159 A dictionary of metadata about the trace, including a 'succeeded' field |
144 indicating whether the trace was successfully generated. | 160 indicating whether the trace was successfully generated. |
145 """ | 161 """ |
146 try: | 162 try: |
147 os.remove(filename) # Remove any existing trace for this URL. | 163 os.remove(filename) # Remove any existing trace for this URL. |
148 except OSError: | 164 except OSError: |
149 pass # Nothing to remove. | 165 pass # Nothing to remove. |
150 | 166 |
151 if not url.startswith('http') and not url.startswith('file'): | 167 if not url.startswith('http') and not url.startswith('file'): |
152 url = 'http://' + url | 168 url = 'http://' + url |
153 | 169 |
154 old_stdout = sys.stdout | |
155 old_stderr = sys.stderr | 170 old_stderr = sys.stderr |
156 | 171 |
157 trace_metadata = { 'succeeded' : False, 'url' : url } | 172 trace_metadata = { 'succeeded' : False, 'url' : url } |
158 trace = None | 173 trace = None |
159 with open(log_filename, 'w') as sys.stdout: | 174 with open(log_filename, 'w') as sys.stdout: |
160 try: | 175 try: |
161 sys.stderr = sys.stdout | 176 sys.stderr = sys.stdout |
162 | 177 |
163 # Set up the controller. | 178 # Set up the controller. |
164 chrome_ctl = controller.LocalChromeController() | 179 chrome_ctl = controller.LocalChromeController() |
(...skipping 11 matching lines...) Expand all Loading... | |
176 url, connection, chrome_ctl.ChromeMetadata()) | 191 url, connection, chrome_ctl.ChromeMetadata()) |
177 trace_metadata['succeeded'] = True | 192 trace_metadata['succeeded'] = True |
178 trace_metadata.update(trace.ToJsonDict()[trace._METADATA_KEY]) | 193 trace_metadata.update(trace.ToJsonDict()[trace._METADATA_KEY]) |
179 except Exception as e: | 194 except Exception as e: |
180 sys.stderr.write(str(e)) | 195 sys.stderr.write(str(e)) |
181 | 196 |
182 if trace: | 197 if trace: |
183 with open(filename, 'w') as f: | 198 with open(filename, 'w') as f: |
184 json.dump(trace.ToJsonDict(), f, sort_keys=True, indent=2) | 199 json.dump(trace.ToJsonDict(), f, sort_keys=True, indent=2) |
185 | 200 |
186 sys.stdout = old_stdout | |
187 sys.stderr = old_stderr | 201 sys.stderr = old_stderr |
188 | 202 |
189 return trace_metadata | 203 return trace_metadata |
190 | 204 |
191 def _ProcessClovisTask(self, clovis_task): | 205 def _ProcessClovisTask(self, clovis_task): |
192 """Processes one clovis_task.""" | 206 """Processes one clovis_task.""" |
193 if clovis_task.Action() != 'trace': | 207 if clovis_task.Action() != 'trace': |
194 self._logger.error('Unsupported task action: %s' % clovis_task.Action()) | 208 self._logger.error('Unsupported task action: %s' % clovis_task.Action()) |
195 return | 209 return |
196 | 210 |
(...skipping 19 matching lines...) Expand all Loading... | |
216 remote_filename = local_filename + '/' + str(repeat) | 230 remote_filename = local_filename + '/' + str(repeat) |
217 trace_metadata = self._GenerateTrace( | 231 trace_metadata = self._GenerateTrace( |
218 url, emulate_device, emulate_network, local_filename, log_filename) | 232 url, emulate_device, emulate_network, local_filename, log_filename) |
219 if trace_metadata['succeeded']: | 233 if trace_metadata['succeeded']: |
220 self._logger.debug('Uploading: %s' % remote_filename) | 234 self._logger.debug('Uploading: %s' % remote_filename) |
221 remote_trace_location = self._traces_dir + remote_filename | 235 remote_trace_location = self._traces_dir + remote_filename |
222 self._google_storage_accessor.UploadFile(local_filename, | 236 self._google_storage_accessor.UploadFile(local_filename, |
223 remote_trace_location) | 237 remote_trace_location) |
224 full_cloud_storage_path = ('gs://' + self._bucket_name + '/' + | 238 full_cloud_storage_path = ('gs://' + self._bucket_name + '/' + |
225 remote_trace_location) | 239 remote_trace_location) |
226 self._trace_database.AddTrace(full_cloud_storage_path, trace_metadata) | 240 self._trace_database.SetTrace(full_cloud_storage_path, trace_metadata) |
227 else: | 241 else: |
228 self._logger.warning('Trace generation failed for URL: %s' % url) | 242 self._logger.warning('Trace generation failed for URL: %s' % url) |
229 # TODO: upload the failure | |
230 if os.path.isfile(local_filename): | 243 if os.path.isfile(local_filename): |
231 self._google_storage_accessor.UploadFile(local_filename, | 244 self._google_storage_accessor.UploadFile( |
232 failures_dir + remote_filename) | 245 local_filename, failures_dir + remote_filename) |
233 self._logger.debug('Uploading log') | 246 self._logger.debug('Uploading analyze log') |
234 self._google_storage_accessor.UploadFile(log_filename, | 247 self._google_storage_accessor.UploadFile(log_filename, |
235 logs_dir + remote_filename) | 248 logs_dir + remote_filename) |
236 | 249 self._UploadTraceDatabase() |
237 | 250 |
238 if __name__ == '__main__': | 251 if __name__ == '__main__': |
239 parser = argparse.ArgumentParser( | 252 parser = argparse.ArgumentParser( |
240 description='ComputeEngine Worker for Clovis') | 253 description='ComputeEngine Worker for Clovis') |
241 parser.add_argument('--config', required=True, | 254 parser.add_argument('--config', required=True, |
242 help='Path to the configuration file.') | 255 help='Path to the configuration file.') |
243 args = parser.parse_args() | 256 args = parser.parse_args() |
244 | 257 |
245 # Configure logging. | 258 # Configure logging. |
246 logging.basicConfig(level=logging.WARNING) | 259 logging.basicConfig(level=logging.WARNING) |
247 worker_logger = logging.getLogger('worker') | 260 worker_logger = logging.getLogger('worker') |
248 worker_logger.setLevel(logging.INFO) | 261 worker_logger.setLevel(logging.INFO) |
249 | 262 |
250 worker_logger.info('Reading configuration') | 263 worker_logger.info('Reading configuration') |
251 with open(args.config) as config_json: | 264 with open(args.config) as config_json: |
252 worker = Worker(json.load(config_json), worker_logger) | 265 worker = Worker(json.load(config_json), worker_logger) |
253 worker.Start() | 266 worker.Start() |
254 | 267 |
OLD | NEW |