Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 # Copyright 2016 The Chromium Authors. All rights reserved. | 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 import argparse | 5 import argparse |
| 6 import json | 6 import json |
| 7 import logging | 7 import logging |
| 8 import os | 8 import os |
| 9 import re | 9 import re |
| 10 import sys | 10 import sys |
| (...skipping 14 matching lines...) Expand all Loading... | |
| 25 import loading_trace | 25 import loading_trace |
| 26 from loading_trace_database import LoadingTraceDatabase | 26 from loading_trace_database import LoadingTraceDatabase |
| 27 import options | 27 import options |
| 28 | 28 |
| 29 | 29 |
| 30 class Worker(object): | 30 class Worker(object): |
| 31 def __init__(self, config, logger): | 31 def __init__(self, config, logger): |
| 32 """See README.md for the config format.""" | 32 """See README.md for the config format.""" |
| 33 self._project_name = config['project_name'] | 33 self._project_name = config['project_name'] |
| 34 self._taskqueue_tag = config['taskqueue_tag'] | 34 self._taskqueue_tag = config['taskqueue_tag'] |
| 35 self._src_path = config['src_path'] | |
| 35 self._credentials = GoogleCredentials.get_application_default() | 36 self._credentials = GoogleCredentials.get_application_default() |
| 36 self._logger = logger | 37 self._logger = logger |
| 37 | 38 |
| 38 # Separate the cloud storage path into the bucket and the base path under | 39 # Separate the cloud storage path into the bucket and the base path under |
| 39 # the bucket. | 40 # the bucket. |
| 40 storage_path_components = config['cloud_storage_path'].split('/') | 41 storage_path_components = config['cloud_storage_path'].split('/') |
| 41 self._bucket_name = storage_path_components[0] | 42 self._bucket_name = storage_path_components[0] |
| 42 self._base_path_in_bucket = '' | 43 self._base_path_in_bucket = '' |
| 43 if len(storage_path_components) > 1: | 44 if len(storage_path_components) > 1: |
| 44 self._base_path_in_bucket = '/'.join(storage_path_components[1:]) | 45 self._base_path_in_bucket = '/'.join(storage_path_components[1:]) |
| 45 if not self._base_path_in_bucket.endswith('/'): | 46 if not self._base_path_in_bucket.endswith('/'): |
| 46 self._base_path_in_bucket += '/' | 47 self._base_path_in_bucket += '/' |
| 47 | 48 |
| 48 # TODO: improve the trace database to support concurrent access. | |
| 49 self._traces_dir = self._base_path_in_bucket + 'traces/' | |
| 50 self._trace_database = LoadingTraceDatabase({}) | |
| 51 | |
| 52 self._src_path = config['src_path'] | |
| 53 self._google_storage_accessor = GoogleStorageAccessor( | 49 self._google_storage_accessor = GoogleStorageAccessor( |
| 54 credentials=self._credentials, project_name=self._project_name, | 50 credentials=self._credentials, project_name=self._project_name, |
| 55 bucket_name=self._bucket_name) | 51 bucket_name=self._bucket_name) |
| 56 | 52 |
| 53 self._traces_dir = self._base_path_in_bucket + 'traces/' | |
|
Benoit L
2016/04/20 15:02:22
Generally speaking, I prefer to manipulate paths w
droger
2016/04/20 15:58:17
Done.
| |
| 54 self._trace_database_path = self._traces_dir + config.get( | |
| 55 'trace_database_filename', 'trace_database.json') | |
|
Benoit L
2016/04/20 15:02:22
Is the trace dir per worker?
If not, then the trac
droger
2016/04/20 15:58:17
It is intended that all worker write to the same d
| |
| 56 | |
| 57 # Recover any existing trace database in case the worker died. | |
| 58 self._DownloadTraceDatabase() | |
| 59 | |
| 57 # Initialize the global options that will be used during trace generation. | 60 # Initialize the global options that will be used during trace generation. |
| 58 options.OPTIONS.ParseArgs([]) | 61 options.OPTIONS.ParseArgs([]) |
| 59 options.OPTIONS.local_binary = config['chrome_path'] | 62 options.OPTIONS.local_binary = config['chrome_path'] |
| 60 | 63 |
| 61 def Start(self): | 64 def Start(self): |
| 62 """Main worker loop. | 65 """Main worker loop. |
| 63 | 66 |
| 64 Repeatedly pulls tasks from the task queue and processes them. Returns when | 67 Repeatedly pulls tasks from the task queue and processes them. Returns when |
| 65 the queue is empty. | 68 the queue is empty. |
| 66 """ | 69 """ |
| (...skipping 20 matching lines...) Expand all Loading... | |
| 87 continue | 90 continue |
| 88 | 91 |
| 89 self._logger.info('Processing task %s' % task_id) | 92 self._logger.info('Processing task %s' % task_id) |
| 90 self._ProcessClovisTask(clovis_task) | 93 self._ProcessClovisTask(clovis_task) |
| 91 self._logger.debug('Deleting task %s' % task_id) | 94 self._logger.debug('Deleting task %s' % task_id) |
| 92 task_api.tasks().delete(project=project, taskqueue=queue_name, | 95 task_api.tasks().delete(project=project, taskqueue=queue_name, |
| 93 task=task_id).execute() | 96 task=task_id).execute() |
| 94 self._logger.info('Finished task %s' % task_id) | 97 self._logger.info('Finished task %s' % task_id) |
| 95 self._Finalize() | 98 self._Finalize() |
| 96 | 99 |
| 100 def _DownloadTraceDatabase(self): | |
| 101 """Downloads the trace database from CloudStorage.""" | |
| 102 trace_database_string = self._google_storage_accessor.DownloadAsString( | |
|
Benoit L
2016/04/20 15:02:22
What about:
trace_db_str = self._google_storage_a
droger
2016/04/20 15:58:17
Done.
| |
| 103 self._trace_database_path) | |
| 104 trace_database_dict = {} | |
| 105 if trace_database_string: | |
| 106 self._logger.warning( | |
| 107 'Importing existing trace database %s' % self._trace_database_path) | |
| 108 trace_database_dict = json.loads(trace_database_string) | |
| 109 self._trace_database = LoadingTraceDatabase(trace_database_dict) | |
| 110 | |
| 111 def _UploadTraceDatabase(self): | |
| 112 """Uploads the trace database to CloudStorage.""" | |
| 113 self._logger.info('Uploading trace database') | |
|
Benoit L
2016/04/20 15:02:22
tiny nit: Importing is "warning", uploading "info"
droger
2016/04/20 15:58:17
Done.
| |
| 114 self._google_storage_accessor.UploadString( | |
| 115 json.dumps(self._trace_database.ToJsonDict(), indent=2), | |
| 116 self._trace_database_path) | |
| 117 | |
| 97 def _FetchClovisTask(self, project_name, task_api, queue_name): | 118 def _FetchClovisTask(self, project_name, task_api, queue_name): |
| 98 """Fetches a ClovisTask from the task queue. | 119 """Fetches a ClovisTask from the task queue. |
| 99 | 120 |
| 100 Params: | 121 Params: |
| 101 project_name(str): The name of the Google Cloud project. | 122 project_name(str): The name of the Google Cloud project. |
| 102 task_api: The TaskQueue service. | 123 task_api: The TaskQueue service. |
| 103 queue_name(str): The name of the task queue. | 124 queue_name(str): The name of the task queue. |
| 104 | 125 |
| 105 Returns: | 126 Returns: |
| 106 (ClovisTask, str): The fetched ClovisTask and its task ID, or (None, None) | 127 (ClovisTask, str): The fetched ClovisTask and its task ID, or (None, None) |
| 107 if no tasks are found. | 128 if no tasks are found. |
| 108 """ | 129 """ |
| 109 response = task_api.tasks().lease( | 130 response = task_api.tasks().lease( |
| 110 project=project_name, taskqueue=queue_name, numTasks=1, leaseSecs=180, | 131 project=project_name, taskqueue=queue_name, numTasks=1, leaseSecs=180, |
| 111 groupByTag=True, tag=self._taskqueue_tag).execute() | 132 groupByTag=True, tag=self._taskqueue_tag).execute() |
| 112 if (not response.get('items')) or (len(response['items']) < 1): | 133 if (not response.get('items')) or (len(response['items']) < 1): |
| 113 return (None, None) | 134 return (None, None) |
| 114 | 135 |
| 115 google_task = response['items'][0] | 136 google_task = response['items'][0] |
| 116 task_id = google_task['id'] | 137 task_id = google_task['id'] |
| 117 clovis_task = ClovisTask.FromBase64(google_task['payloadBase64']) | 138 clovis_task = ClovisTask.FromBase64(google_task['payloadBase64']) |
| 118 return (clovis_task, task_id) | 139 return (clovis_task, task_id) |
| 119 | 140 |
| 120 def _Finalize(self): | 141 def _Finalize(self): |
| 121 """Called before exiting.""" | 142 """Called before exiting.""" |
| 122 self._logger.info('Uploading trace database') | |
| 123 self._google_storage_accessor.UploadString( | |
| 124 json.dumps(self._trace_database.ToJsonDict(), indent=2), | |
| 125 self._traces_dir + 'trace_database.json') | |
| 126 # TODO(droger): Implement automatic instance destruction. | 143 # TODO(droger): Implement automatic instance destruction. |
| 127 self._logger.info('Done') | 144 self._logger.info('Done') |
| 128 | 145 |
| 129 | |
| 130 def _GenerateTrace(self, url, emulate_device, emulate_network, filename, | 146 def _GenerateTrace(self, url, emulate_device, emulate_network, filename, |
| 131 log_filename): | 147 log_filename): |
| 132 """ Generates a trace. | 148 """ Generates a trace. |
| 133 | 149 |
| 134 Args: | 150 Args: |
| 135 url: URL as a string. | 151 url: URL as a string. |
| 136 emulate_device: Name of the device to emulate. Empty for no emulation. | 152 emulate_device: Name of the device to emulate. Empty for no emulation. |
| 137 emulate_network: Type of network emulation. Empty for no emulation. | 153 emulate_network: Type of network emulation. Empty for no emulation. |
| 138 filename: Name of the file where the trace is saved. | 154 filename: Name of the file where the trace is saved. |
| 139 log_filename: Name of the file where standard output and errors are | 155 log_filename: Name of the file where standard output and errors are |
| 140 logged. | 156 logged. |
| 141 | 157 |
| 142 Returns: | 158 Returns: |
| 143 A dictionary of metadata about the trace, including a 'succeeded' field | 159 A dictionary of metadata about the trace, including a 'succeeded' field |
| 144 indicating whether the trace was successfully generated. | 160 indicating whether the trace was successfully generated. |
| 145 """ | 161 """ |
| 146 try: | 162 try: |
| 147 os.remove(filename) # Remove any existing trace for this URL. | 163 os.remove(filename) # Remove any existing trace for this URL. |
| 148 except OSError: | 164 except OSError: |
| 149 pass # Nothing to remove. | 165 pass # Nothing to remove. |
| 150 | 166 |
| 151 if not url.startswith('http') and not url.startswith('file'): | 167 if not url.startswith('http') and not url.startswith('file'): |
| 152 url = 'http://' + url | 168 url = 'http://' + url |
| 153 | 169 |
| 154 old_stdout = sys.stdout | |
| 155 old_stderr = sys.stderr | 170 old_stderr = sys.stderr |
| 156 | 171 |
| 157 trace_metadata = { 'succeeded' : False, 'url' : url } | 172 trace_metadata = { 'succeeded' : False, 'url' : url } |
| 158 trace = None | 173 trace = None |
| 159 with open(log_filename, 'w') as sys.stdout: | 174 with open(log_filename, 'w') as sys.stdout: |
| 160 try: | 175 try: |
| 161 sys.stderr = sys.stdout | 176 sys.stderr = sys.stdout |
| 162 | 177 |
| 163 # Set up the controller. | 178 # Set up the controller. |
| 164 chrome_ctl = controller.LocalChromeController() | 179 chrome_ctl = controller.LocalChromeController() |
| (...skipping 11 matching lines...) Expand all Loading... | |
| 176 url, connection, chrome_ctl.ChromeMetadata()) | 191 url, connection, chrome_ctl.ChromeMetadata()) |
| 177 trace_metadata['succeeded'] = True | 192 trace_metadata['succeeded'] = True |
| 178 trace_metadata.update(trace.ToJsonDict()[trace._METADATA_KEY]) | 193 trace_metadata.update(trace.ToJsonDict()[trace._METADATA_KEY]) |
| 179 except Exception as e: | 194 except Exception as e: |
| 180 sys.stderr.write(str(e)) | 195 sys.stderr.write(str(e)) |
| 181 | 196 |
| 182 if trace: | 197 if trace: |
| 183 with open(filename, 'w') as f: | 198 with open(filename, 'w') as f: |
| 184 json.dump(trace.ToJsonDict(), f, sort_keys=True, indent=2) | 199 json.dump(trace.ToJsonDict(), f, sort_keys=True, indent=2) |
| 185 | 200 |
| 186 sys.stdout = old_stdout | |
| 187 sys.stderr = old_stderr | 201 sys.stderr = old_stderr |
| 188 | 202 |
| 189 return trace_metadata | 203 return trace_metadata |
| 190 | 204 |
| 191 def _ProcessClovisTask(self, clovis_task): | 205 def _ProcessClovisTask(self, clovis_task): |
| 192 """Processes one clovis_task.""" | 206 """Processes one clovis_task.""" |
| 193 if clovis_task.Action() != 'trace': | 207 if clovis_task.Action() != 'trace': |
| 194 self._logger.error('Unsupported task action: %s' % clovis_task.Action()) | 208 self._logger.error('Unsupported task action: %s' % clovis_task.Action()) |
| 195 return | 209 return |
| 196 | 210 |
| (...skipping 19 matching lines...) Expand all Loading... | |
| 216 remote_filename = local_filename + '/' + str(repeat) | 230 remote_filename = local_filename + '/' + str(repeat) |
| 217 trace_metadata = self._GenerateTrace( | 231 trace_metadata = self._GenerateTrace( |
| 218 url, emulate_device, emulate_network, local_filename, log_filename) | 232 url, emulate_device, emulate_network, local_filename, log_filename) |
| 219 if trace_metadata['succeeded']: | 233 if trace_metadata['succeeded']: |
| 220 self._logger.debug('Uploading: %s' % remote_filename) | 234 self._logger.debug('Uploading: %s' % remote_filename) |
| 221 remote_trace_location = self._traces_dir + remote_filename | 235 remote_trace_location = self._traces_dir + remote_filename |
| 222 self._google_storage_accessor.UploadFile(local_filename, | 236 self._google_storage_accessor.UploadFile(local_filename, |
| 223 remote_trace_location) | 237 remote_trace_location) |
| 224 full_cloud_storage_path = ('gs://' + self._bucket_name + '/' + | 238 full_cloud_storage_path = ('gs://' + self._bucket_name + '/' + |
| 225 remote_trace_location) | 239 remote_trace_location) |
| 226 self._trace_database.AddTrace(full_cloud_storage_path, trace_metadata) | 240 self._trace_database.SetTrace(full_cloud_storage_path, trace_metadata) |
| 227 else: | 241 else: |
| 228 self._logger.warning('Trace generation failed for URL: %s' % url) | 242 self._logger.warning('Trace generation failed for URL: %s' % url) |
| 229 # TODO: upload the failure | |
| 230 if os.path.isfile(local_filename): | 243 if os.path.isfile(local_filename): |
| 231 self._google_storage_accessor.UploadFile(local_filename, | 244 self._google_storage_accessor.UploadFile( |
| 232 failures_dir + remote_filename) | 245 local_filename, failures_dir + remote_filename) |
| 233 self._logger.debug('Uploading log') | 246 self._logger.debug('Uploading analyze log') |
| 234 self._google_storage_accessor.UploadFile(log_filename, | 247 self._google_storage_accessor.UploadFile(log_filename, |
| 235 logs_dir + remote_filename) | 248 logs_dir + remote_filename) |
| 236 | 249 self._UploadTraceDatabase() |
| 237 | 250 |
| 238 if __name__ == '__main__': | 251 if __name__ == '__main__': |
| 239 parser = argparse.ArgumentParser( | 252 parser = argparse.ArgumentParser( |
| 240 description='ComputeEngine Worker for Clovis') | 253 description='ComputeEngine Worker for Clovis') |
| 241 parser.add_argument('--config', required=True, | 254 parser.add_argument('--config', required=True, |
| 242 help='Path to the configuration file.') | 255 help='Path to the configuration file.') |
| 243 args = parser.parse_args() | 256 args = parser.parse_args() |
| 244 | 257 |
| 245 # Configure logging. | 258 # Configure logging. |
| 246 logging.basicConfig(level=logging.WARNING) | 259 logging.basicConfig(level=logging.WARNING) |
| 247 worker_logger = logging.getLogger('worker') | 260 worker_logger = logging.getLogger('worker') |
| 248 worker_logger.setLevel(logging.INFO) | 261 worker_logger.setLevel(logging.INFO) |
| 249 | 262 |
| 250 worker_logger.info('Reading configuration') | 263 worker_logger.info('Reading configuration') |
| 251 with open(args.config) as config_json: | 264 with open(args.config) as config_json: |
| 252 worker = Worker(json.load(config_json), worker_logger) | 265 worker = Worker(json.load(config_json), worker_logger) |
| 253 worker.Start() | 266 worker.Start() |
| 254 | 267 |
| OLD | NEW |