| OLD | NEW |
| 1 # Copyright 2016 The Chromium Authors. All rights reserved. | 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 import argparse | 5 import argparse |
| 6 import json | 6 import json |
| 7 import logging | 7 import logging |
| 8 import os | 8 import os |
| 9 import re | 9 import re |
| 10 import sys | 10 import sys |
| (...skipping 14 matching lines...) Expand all Loading... |
| 25 import loading_trace | 25 import loading_trace |
| 26 from loading_trace_database import LoadingTraceDatabase | 26 from loading_trace_database import LoadingTraceDatabase |
| 27 import options | 27 import options |
| 28 | 28 |
| 29 | 29 |
| 30 class Worker(object): | 30 class Worker(object): |
| 31 def __init__(self, config, logger): | 31 def __init__(self, config, logger): |
| 32 """See README.md for the config format.""" | 32 """See README.md for the config format.""" |
| 33 self._project_name = config['project_name'] | 33 self._project_name = config['project_name'] |
| 34 self._taskqueue_tag = config['taskqueue_tag'] | 34 self._taskqueue_tag = config['taskqueue_tag'] |
| 35 self._src_path = config['src_path'] |
| 35 self._credentials = GoogleCredentials.get_application_default() | 36 self._credentials = GoogleCredentials.get_application_default() |
| 36 self._logger = logger | 37 self._logger = logger |
| 37 | 38 |
| 38 # Separate the cloud storage path into the bucket and the base path under | 39 # Separate the cloud storage path into the bucket and the base path under |
| 39 # the bucket. | 40 # the bucket. |
| 40 storage_path_components = config['cloud_storage_path'].split('/') | 41 storage_path_components = config['cloud_storage_path'].split('/') |
| 41 self._bucket_name = storage_path_components[0] | 42 self._bucket_name = storage_path_components[0] |
| 42 self._base_path_in_bucket = '' | 43 self._base_path_in_bucket = '' |
| 43 if len(storage_path_components) > 1: | 44 if len(storage_path_components) > 1: |
| 44 self._base_path_in_bucket = '/'.join(storage_path_components[1:]) | 45 self._base_path_in_bucket = '/'.join(storage_path_components[1:]) |
| 45 if not self._base_path_in_bucket.endswith('/'): | 46 if not self._base_path_in_bucket.endswith('/'): |
| 46 self._base_path_in_bucket += '/' | 47 self._base_path_in_bucket += '/' |
| 47 | 48 |
| 48 # TODO: improve the trace database to support concurrent access. | |
| 49 self._traces_dir = self._base_path_in_bucket + 'traces/' | |
| 50 self._trace_database = LoadingTraceDatabase({}) | |
| 51 | |
| 52 self._src_path = config['src_path'] | |
| 53 self._google_storage_accessor = GoogleStorageAccessor( | 49 self._google_storage_accessor = GoogleStorageAccessor( |
| 54 credentials=self._credentials, project_name=self._project_name, | 50 credentials=self._credentials, project_name=self._project_name, |
| 55 bucket_name=self._bucket_name) | 51 bucket_name=self._bucket_name) |
| 56 | 52 |
| 53 self._traces_dir = os.path.join(self._base_path_in_bucket, 'traces') |
| 54 self._trace_database_path = os.path.join( |
| 55 self._traces_dir, |
| 56 config.get('trace_database_filename', 'trace_database.json')) |
| 57 |
| 58 # Recover any existing trace database in case the worker died. |
| 59 self._DownloadTraceDatabase() |
| 60 |
| 57 # Initialize the global options that will be used during trace generation. | 61 # Initialize the global options that will be used during trace generation. |
| 58 options.OPTIONS.ParseArgs([]) | 62 options.OPTIONS.ParseArgs([]) |
| 59 options.OPTIONS.local_binary = config['chrome_path'] | 63 options.OPTIONS.local_binary = config['chrome_path'] |
| 60 | 64 |
| 61 def Start(self): | 65 def Start(self): |
| 62 """Main worker loop. | 66 """Main worker loop. |
| 63 | 67 |
| 64 Repeatedly pulls tasks from the task queue and processes them. Returns when | 68 Repeatedly pulls tasks from the task queue and processes them. Returns when |
| 65 the queue is empty. | 69 the queue is empty. |
| 66 """ | 70 """ |
| (...skipping 20 matching lines...) Expand all Loading... |
| 87 continue | 91 continue |
| 88 | 92 |
| 89 self._logger.info('Processing task %s' % task_id) | 93 self._logger.info('Processing task %s' % task_id) |
| 90 self._ProcessClovisTask(clovis_task) | 94 self._ProcessClovisTask(clovis_task) |
| 91 self._logger.debug('Deleting task %s' % task_id) | 95 self._logger.debug('Deleting task %s' % task_id) |
| 92 task_api.tasks().delete(project=project, taskqueue=queue_name, | 96 task_api.tasks().delete(project=project, taskqueue=queue_name, |
| 93 task=task_id).execute() | 97 task=task_id).execute() |
| 94 self._logger.info('Finished task %s' % task_id) | 98 self._logger.info('Finished task %s' % task_id) |
| 95 self._Finalize() | 99 self._Finalize() |
| 96 | 100 |
| 101 def _DownloadTraceDatabase(self): |
| 102 """Downloads the trace database from CloudStorage.""" |
| 103 self._logger.info('Downloading trace database') |
| 104 trace_database_string = self._google_storage_accessor.DownloadAsString( |
| 105 self._trace_database_path) or '{}' |
| 106 trace_database_dict = json.loads(trace_database_string) |
| 107 self._trace_database = LoadingTraceDatabase(trace_database_dict) |
| 108 |
| 109 def _UploadTraceDatabase(self): |
| 110 """Uploads the trace database to CloudStorage.""" |
| 111 self._logger.info('Uploading trace database') |
| 112 self._google_storage_accessor.UploadString( |
| 113 json.dumps(self._trace_database.ToJsonDict(), indent=2), |
| 114 self._trace_database_path) |
| 115 |
| 97 def _FetchClovisTask(self, project_name, task_api, queue_name): | 116 def _FetchClovisTask(self, project_name, task_api, queue_name): |
| 98 """Fetches a ClovisTask from the task queue. | 117 """Fetches a ClovisTask from the task queue. |
| 99 | 118 |
| 100 Params: | 119 Params: |
| 101 project_name(str): The name of the Google Cloud project. | 120 project_name(str): The name of the Google Cloud project. |
| 102 task_api: The TaskQueue service. | 121 task_api: The TaskQueue service. |
| 103 queue_name(str): The name of the task queue. | 122 queue_name(str): The name of the task queue. |
| 104 | 123 |
| 105 Returns: | 124 Returns: |
| 106 (ClovisTask, str): The fetched ClovisTask and its task ID, or (None, None) | 125 (ClovisTask, str): The fetched ClovisTask and its task ID, or (None, None) |
| 107 if no tasks are found. | 126 if no tasks are found. |
| 108 """ | 127 """ |
| 109 response = task_api.tasks().lease( | 128 response = task_api.tasks().lease( |
| 110 project=project_name, taskqueue=queue_name, numTasks=1, leaseSecs=180, | 129 project=project_name, taskqueue=queue_name, numTasks=1, leaseSecs=180, |
| 111 groupByTag=True, tag=self._taskqueue_tag).execute() | 130 groupByTag=True, tag=self._taskqueue_tag).execute() |
| 112 if (not response.get('items')) or (len(response['items']) < 1): | 131 if (not response.get('items')) or (len(response['items']) < 1): |
| 113 return (None, None) | 132 return (None, None) |
| 114 | 133 |
| 115 google_task = response['items'][0] | 134 google_task = response['items'][0] |
| 116 task_id = google_task['id'] | 135 task_id = google_task['id'] |
| 117 clovis_task = ClovisTask.FromBase64(google_task['payloadBase64']) | 136 clovis_task = ClovisTask.FromBase64(google_task['payloadBase64']) |
| 118 return (clovis_task, task_id) | 137 return (clovis_task, task_id) |
| 119 | 138 |
| 120 def _Finalize(self): | 139 def _Finalize(self): |
| 121 """Called before exiting.""" | 140 """Called before exiting.""" |
| 122 self._logger.info('Uploading trace database') | |
| 123 self._google_storage_accessor.UploadString( | |
| 124 json.dumps(self._trace_database.ToJsonDict(), indent=2), | |
| 125 self._traces_dir + 'trace_database.json') | |
| 126 # TODO(droger): Implement automatic instance destruction. | 141 # TODO(droger): Implement automatic instance destruction. |
| 127 self._logger.info('Done') | 142 self._logger.info('Done') |
| 128 | 143 |
| 129 | |
| 130 def _GenerateTrace(self, url, emulate_device, emulate_network, filename, | 144 def _GenerateTrace(self, url, emulate_device, emulate_network, filename, |
| 131 log_filename): | 145 log_filename): |
| 132 """ Generates a trace. | 146 """ Generates a trace. |
| 133 | 147 |
| 134 Args: | 148 Args: |
| 135 url: URL as a string. | 149 url: URL as a string. |
| 136 emulate_device: Name of the device to emulate. Empty for no emulation. | 150 emulate_device: Name of the device to emulate. Empty for no emulation. |
| 137 emulate_network: Type of network emulation. Empty for no emulation. | 151 emulate_network: Type of network emulation. Empty for no emulation. |
| 138 filename: Name of the file where the trace is saved. | 152 filename: Name of the file where the trace is saved. |
| 139 log_filename: Name of the file where standard output and errors are | 153 log_filename: Name of the file where standard output and errors are |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 194 self._logger.error('Unsupported task action: %s' % clovis_task.Action()) | 208 self._logger.error('Unsupported task action: %s' % clovis_task.Action()) |
| 195 return | 209 return |
| 196 | 210 |
| 197 # Extract the task parameters. | 211 # Extract the task parameters. |
| 198 params = clovis_task.Params() | 212 params = clovis_task.Params() |
| 199 urls = params['urls'] | 213 urls = params['urls'] |
| 200 repeat_count = params.get('repeat_count', 1) | 214 repeat_count = params.get('repeat_count', 1) |
| 201 emulate_device = params.get('emulate_device') | 215 emulate_device = params.get('emulate_device') |
| 202 emulate_network = params.get('emulate_network') | 216 emulate_network = params.get('emulate_network') |
| 203 | 217 |
| 204 failures_dir = self._base_path_in_bucket + 'failures/' | 218 failures_dir = os.path.join(self._base_path_in_bucket, 'failures') |
| 205 # TODO(blundell): Fix this up. | 219 # TODO(blundell): Fix this up. |
| 206 logs_dir = self._base_path_in_bucket + 'analyze_logs/' | 220 logs_dir = os.path.join(self._base_path_in_bucket, 'analyze_logs') |
| 207 log_filename = 'analyze.log' | 221 log_filename = 'analyze.log' |
| 208 # Avoid special characters in storage object names | 222 # Avoid special characters in storage object names |
| 209 pattern = re.compile(r"[#\?\[\]\*/]") | 223 pattern = re.compile(r"[#\?\[\]\*/]") |
| 210 | 224 |
| 211 while len(urls) > 0: | 225 while len(urls) > 0: |
| 212 url = urls.pop() | 226 url = urls.pop() |
| 213 local_filename = pattern.sub('_', url) | 227 local_filename = pattern.sub('_', url) |
| 214 for repeat in range(repeat_count): | 228 for repeat in range(repeat_count): |
| 215 self._logger.debug('Generating trace for URL: %s' % url) | 229 self._logger.debug('Generating trace for URL: %s' % url) |
| 216 remote_filename = local_filename + '/' + str(repeat) | 230 remote_filename = os.path.join(local_filename, str(repeat)) |
| 217 trace_metadata = self._GenerateTrace( | 231 trace_metadata = self._GenerateTrace( |
| 218 url, emulate_device, emulate_network, local_filename, log_filename) | 232 url, emulate_device, emulate_network, local_filename, log_filename) |
| 219 if trace_metadata['succeeded']: | 233 if trace_metadata['succeeded']: |
| 220 self._logger.debug('Uploading: %s' % remote_filename) | 234 self._logger.debug('Uploading: %s' % remote_filename) |
| 221 remote_trace_location = self._traces_dir + remote_filename | 235 remote_trace_location = os.path.join(self._traces_dir, |
| 236 remote_filename) |
| 222 self._google_storage_accessor.UploadFile(local_filename, | 237 self._google_storage_accessor.UploadFile(local_filename, |
| 223 remote_trace_location) | 238 remote_trace_location) |
| 224 full_cloud_storage_path = ('gs://' + self._bucket_name + '/' + | 239 full_cloud_storage_path = os.path.join('gs://' + self._bucket_name, |
| 225 remote_trace_location) | 240 remote_trace_location) |
| 226 self._trace_database.AddTrace(full_cloud_storage_path, trace_metadata) | 241 self._trace_database.SetTrace(full_cloud_storage_path, trace_metadata) |
| 227 else: | 242 else: |
| 228 self._logger.warning('Trace generation failed for URL: %s' % url) | 243 self._logger.warning('Trace generation failed for URL: %s' % url) |
| 229 # TODO: upload the failure | |
| 230 if os.path.isfile(local_filename): | 244 if os.path.isfile(local_filename): |
| 231 self._google_storage_accessor.UploadFile(local_filename, | 245 self._google_storage_accessor.UploadFile( |
| 232 failures_dir + remote_filename) | 246 local_filename, os.path.join(failures_dir, remote_filename)) |
| 233 self._logger.debug('Uploading log') | 247 self._logger.debug('Uploading analyze log') |
| 234 self._google_storage_accessor.UploadFile(log_filename, | 248 self._google_storage_accessor.UploadFile( |
| 235 logs_dir + remote_filename) | 249 log_filename, os.path.join(logs_dir, remote_filename)) |
| 236 | 250 self._UploadTraceDatabase() |
| 237 | 251 |
| 238 if __name__ == '__main__': | 252 if __name__ == '__main__': |
| 239 parser = argparse.ArgumentParser( | 253 parser = argparse.ArgumentParser( |
| 240 description='ComputeEngine Worker for Clovis') | 254 description='ComputeEngine Worker for Clovis') |
| 241 parser.add_argument('--config', required=True, | 255 parser.add_argument('--config', required=True, |
| 242 help='Path to the configuration file.') | 256 help='Path to the configuration file.') |
| 243 args = parser.parse_args() | 257 args = parser.parse_args() |
| 244 | 258 |
| 245 # Configure logging. | 259 # Configure logging. |
| 246 logging.basicConfig(level=logging.WARNING) | 260 logging.basicConfig(level=logging.WARNING) |
| 247 worker_logger = logging.getLogger('worker') | 261 worker_logger = logging.getLogger('worker') |
| 248 worker_logger.setLevel(logging.INFO) | 262 worker_logger.setLevel(logging.INFO) |
| 249 | 263 |
| 250 worker_logger.info('Reading configuration') | 264 worker_logger.info('Reading configuration') |
| 251 with open(args.config) as config_json: | 265 with open(args.config) as config_json: |
| 252 worker = Worker(json.load(config_json), worker_logger) | 266 worker = Worker(json.load(config_json), worker_logger) |
| 253 worker.Start() | 267 worker.Start() |
| 254 | 268 |
| OLD | NEW |