OLD | NEW |
1 # Copyright 2016 The Chromium Authors. All rights reserved. | 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import argparse | 5 import argparse |
6 import json | 6 import json |
7 import logging | 7 import logging |
8 import os | 8 import os |
9 import re | 9 import re |
10 import sys | 10 import sys |
(...skipping 14 matching lines...) Expand all Loading... |
25 import loading_trace | 25 import loading_trace |
26 from loading_trace_database import LoadingTraceDatabase | 26 from loading_trace_database import LoadingTraceDatabase |
27 import options | 27 import options |
28 | 28 |
29 | 29 |
30 class Worker(object): | 30 class Worker(object): |
31 def __init__(self, config, logger): | 31 def __init__(self, config, logger): |
32 """See README.md for the config format.""" | 32 """See README.md for the config format.""" |
33 self._project_name = config['project_name'] | 33 self._project_name = config['project_name'] |
34 self._taskqueue_tag = config['taskqueue_tag'] | 34 self._taskqueue_tag = config['taskqueue_tag'] |
| 35 self._src_path = config['src_path'] |
35 self._credentials = GoogleCredentials.get_application_default() | 36 self._credentials = GoogleCredentials.get_application_default() |
36 self._logger = logger | 37 self._logger = logger |
37 | 38 |
38 # Separate the cloud storage path into the bucket and the base path under | 39 # Separate the cloud storage path into the bucket and the base path under |
39 # the bucket. | 40 # the bucket. |
40 storage_path_components = config['cloud_storage_path'].split('/') | 41 storage_path_components = config['cloud_storage_path'].split('/') |
41 self._bucket_name = storage_path_components[0] | 42 self._bucket_name = storage_path_components[0] |
42 self._base_path_in_bucket = '' | 43 self._base_path_in_bucket = '' |
43 if len(storage_path_components) > 1: | 44 if len(storage_path_components) > 1: |
44 self._base_path_in_bucket = '/'.join(storage_path_components[1:]) | 45 self._base_path_in_bucket = '/'.join(storage_path_components[1:]) |
45 if not self._base_path_in_bucket.endswith('/'): | 46 if not self._base_path_in_bucket.endswith('/'): |
46 self._base_path_in_bucket += '/' | 47 self._base_path_in_bucket += '/' |
47 | 48 |
48 # TODO: improve the trace database to support concurrent access. | |
49 self._traces_dir = self._base_path_in_bucket + 'traces/' | |
50 self._trace_database = LoadingTraceDatabase({}) | |
51 | |
52 self._src_path = config['src_path'] | |
53 self._google_storage_accessor = GoogleStorageAccessor( | 49 self._google_storage_accessor = GoogleStorageAccessor( |
54 credentials=self._credentials, project_name=self._project_name, | 50 credentials=self._credentials, project_name=self._project_name, |
55 bucket_name=self._bucket_name) | 51 bucket_name=self._bucket_name) |
56 | 52 |
| 53 self._traces_dir = os.path.join(self._base_path_in_bucket, 'traces') |
| 54 self._trace_database_path = os.path.join( |
| 55 self._traces_dir, |
| 56 config.get('trace_database_filename', 'trace_database.json')) |
| 57 |
| 58 # Recover any existing trace database in case the worker died. |
| 59 self._DownloadTraceDatabase() |
| 60 |
57 # Initialize the global options that will be used during trace generation. | 61 # Initialize the global options that will be used during trace generation. |
58 options.OPTIONS.ParseArgs([]) | 62 options.OPTIONS.ParseArgs([]) |
59 options.OPTIONS.local_binary = config['chrome_path'] | 63 options.OPTIONS.local_binary = config['chrome_path'] |
60 | 64 |
61 def Start(self): | 65 def Start(self): |
62 """Main worker loop. | 66 """Main worker loop. |
63 | 67 |
64 Repeatedly pulls tasks from the task queue and processes them. Returns when | 68 Repeatedly pulls tasks from the task queue and processes them. Returns when |
65 the queue is empty. | 69 the queue is empty. |
66 """ | 70 """ |
(...skipping 20 matching lines...) Expand all Loading... |
87 continue | 91 continue |
88 | 92 |
89 self._logger.info('Processing task %s' % task_id) | 93 self._logger.info('Processing task %s' % task_id) |
90 self._ProcessClovisTask(clovis_task) | 94 self._ProcessClovisTask(clovis_task) |
91 self._logger.debug('Deleting task %s' % task_id) | 95 self._logger.debug('Deleting task %s' % task_id) |
92 task_api.tasks().delete(project=project, taskqueue=queue_name, | 96 task_api.tasks().delete(project=project, taskqueue=queue_name, |
93 task=task_id).execute() | 97 task=task_id).execute() |
94 self._logger.info('Finished task %s' % task_id) | 98 self._logger.info('Finished task %s' % task_id) |
95 self._Finalize() | 99 self._Finalize() |
96 | 100 |
| 101 def _DownloadTraceDatabase(self): |
| 102 """Downloads the trace database from CloudStorage.""" |
| 103 self._logger.info('Downloading trace database') |
| 104 trace_database_string = self._google_storage_accessor.DownloadAsString( |
| 105 self._trace_database_path) or '{}' |
| 106 trace_database_dict = json.loads(trace_database_string) |
| 107 self._trace_database = LoadingTraceDatabase(trace_database_dict) |
| 108 |
| 109 def _UploadTraceDatabase(self): |
| 110 """Uploads the trace database to CloudStorage.""" |
| 111 self._logger.info('Uploading trace database') |
| 112 self._google_storage_accessor.UploadString( |
| 113 json.dumps(self._trace_database.ToJsonDict(), indent=2), |
| 114 self._trace_database_path) |
| 115 |
97 def _FetchClovisTask(self, project_name, task_api, queue_name): | 116 def _FetchClovisTask(self, project_name, task_api, queue_name): |
98 """Fetches a ClovisTask from the task queue. | 117 """Fetches a ClovisTask from the task queue. |
99 | 118 |
100 Params: | 119 Params: |
101 project_name(str): The name of the Google Cloud project. | 120 project_name(str): The name of the Google Cloud project. |
102 task_api: The TaskQueue service. | 121 task_api: The TaskQueue service. |
103 queue_name(str): The name of the task queue. | 122 queue_name(str): The name of the task queue. |
104 | 123 |
105 Returns: | 124 Returns: |
106 (ClovisTask, str): The fetched ClovisTask and its task ID, or (None, None) | 125 (ClovisTask, str): The fetched ClovisTask and its task ID, or (None, None) |
107 if no tasks are found. | 126 if no tasks are found. |
108 """ | 127 """ |
109 response = task_api.tasks().lease( | 128 response = task_api.tasks().lease( |
110 project=project_name, taskqueue=queue_name, numTasks=1, leaseSecs=180, | 129 project=project_name, taskqueue=queue_name, numTasks=1, leaseSecs=180, |
111 groupByTag=True, tag=self._taskqueue_tag).execute() | 130 groupByTag=True, tag=self._taskqueue_tag).execute() |
112 if (not response.get('items')) or (len(response['items']) < 1): | 131 if (not response.get('items')) or (len(response['items']) < 1): |
113 return (None, None) | 132 return (None, None) |
114 | 133 |
115 google_task = response['items'][0] | 134 google_task = response['items'][0] |
116 task_id = google_task['id'] | 135 task_id = google_task['id'] |
117 clovis_task = ClovisTask.FromBase64(google_task['payloadBase64']) | 136 clovis_task = ClovisTask.FromBase64(google_task['payloadBase64']) |
118 return (clovis_task, task_id) | 137 return (clovis_task, task_id) |
119 | 138 |
120 def _Finalize(self): | 139 def _Finalize(self): |
121 """Called before exiting.""" | 140 """Called before exiting.""" |
122 self._logger.info('Uploading trace database') | |
123 self._google_storage_accessor.UploadString( | |
124 json.dumps(self._trace_database.ToJsonDict(), indent=2), | |
125 self._traces_dir + 'trace_database.json') | |
126 # TODO(droger): Implement automatic instance destruction. | 141 # TODO(droger): Implement automatic instance destruction. |
127 self._logger.info('Done') | 142 self._logger.info('Done') |
128 | 143 |
129 | |
130 def _GenerateTrace(self, url, emulate_device, emulate_network, filename, | 144 def _GenerateTrace(self, url, emulate_device, emulate_network, filename, |
131 log_filename): | 145 log_filename): |
132 """ Generates a trace. | 146 """ Generates a trace. |
133 | 147 |
134 Args: | 148 Args: |
135 url: URL as a string. | 149 url: URL as a string. |
136 emulate_device: Name of the device to emulate. Empty for no emulation. | 150 emulate_device: Name of the device to emulate. Empty for no emulation. |
137 emulate_network: Type of network emulation. Empty for no emulation. | 151 emulate_network: Type of network emulation. Empty for no emulation. |
138 filename: Name of the file where the trace is saved. | 152 filename: Name of the file where the trace is saved. |
139 log_filename: Name of the file where standard output and errors are | 153 log_filename: Name of the file where standard output and errors are |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
194 self._logger.error('Unsupported task action: %s' % clovis_task.Action()) | 208 self._logger.error('Unsupported task action: %s' % clovis_task.Action()) |
195 return | 209 return |
196 | 210 |
197 # Extract the task parameters. | 211 # Extract the task parameters. |
198 params = clovis_task.Params() | 212 params = clovis_task.Params() |
199 urls = params['urls'] | 213 urls = params['urls'] |
200 repeat_count = params.get('repeat_count', 1) | 214 repeat_count = params.get('repeat_count', 1) |
201 emulate_device = params.get('emulate_device') | 215 emulate_device = params.get('emulate_device') |
202 emulate_network = params.get('emulate_network') | 216 emulate_network = params.get('emulate_network') |
203 | 217 |
204 failures_dir = self._base_path_in_bucket + 'failures/' | 218 failures_dir = os.path.join(self._base_path_in_bucket, 'failures') |
205 # TODO(blundell): Fix this up. | 219 # TODO(blundell): Fix this up. |
206 logs_dir = self._base_path_in_bucket + 'analyze_logs/' | 220 logs_dir = os.path.join(self._base_path_in_bucket, 'analyze_logs') |
207 log_filename = 'analyze.log' | 221 log_filename = 'analyze.log' |
208 # Avoid special characters in storage object names | 222 # Avoid special characters in storage object names |
209 pattern = re.compile(r"[#\?\[\]\*/]") | 223 pattern = re.compile(r"[#\?\[\]\*/]") |
210 | 224 |
211 while len(urls) > 0: | 225 while len(urls) > 0: |
212 url = urls.pop() | 226 url = urls.pop() |
213 local_filename = pattern.sub('_', url) | 227 local_filename = pattern.sub('_', url) |
214 for repeat in range(repeat_count): | 228 for repeat in range(repeat_count): |
215 self._logger.debug('Generating trace for URL: %s' % url) | 229 self._logger.debug('Generating trace for URL: %s' % url) |
216 remote_filename = local_filename + '/' + str(repeat) | 230 remote_filename = os.path.join(local_filename, str(repeat)) |
217 trace_metadata = self._GenerateTrace( | 231 trace_metadata = self._GenerateTrace( |
218 url, emulate_device, emulate_network, local_filename, log_filename) | 232 url, emulate_device, emulate_network, local_filename, log_filename) |
219 if trace_metadata['succeeded']: | 233 if trace_metadata['succeeded']: |
220 self._logger.debug('Uploading: %s' % remote_filename) | 234 self._logger.debug('Uploading: %s' % remote_filename) |
221 remote_trace_location = self._traces_dir + remote_filename | 235 remote_trace_location = os.path.join(self._traces_dir, |
| 236 remote_filename) |
222 self._google_storage_accessor.UploadFile(local_filename, | 237 self._google_storage_accessor.UploadFile(local_filename, |
223 remote_trace_location) | 238 remote_trace_location) |
224 full_cloud_storage_path = ('gs://' + self._bucket_name + '/' + | 239 full_cloud_storage_path = os.path.join('gs://' + self._bucket_name, |
225 remote_trace_location) | 240 remote_trace_location) |
226 self._trace_database.AddTrace(full_cloud_storage_path, trace_metadata) | 241 self._trace_database.SetTrace(full_cloud_storage_path, trace_metadata) |
227 else: | 242 else: |
228 self._logger.warning('Trace generation failed for URL: %s' % url) | 243 self._logger.warning('Trace generation failed for URL: %s' % url) |
229 # TODO: upload the failure | |
230 if os.path.isfile(local_filename): | 244 if os.path.isfile(local_filename): |
231 self._google_storage_accessor.UploadFile(local_filename, | 245 self._google_storage_accessor.UploadFile( |
232 failures_dir + remote_filename) | 246 local_filename, os.path.join(failures_dir, remote_filename)) |
233 self._logger.debug('Uploading log') | 247 self._logger.debug('Uploading analyze log') |
234 self._google_storage_accessor.UploadFile(log_filename, | 248 self._google_storage_accessor.UploadFile( |
235 logs_dir + remote_filename) | 249 log_filename, os.path.join(logs_dir, remote_filename)) |
236 | 250 self._UploadTraceDatabase() |
237 | 251 |
238 if __name__ == '__main__': | 252 if __name__ == '__main__': |
239 parser = argparse.ArgumentParser( | 253 parser = argparse.ArgumentParser( |
240 description='ComputeEngine Worker for Clovis') | 254 description='ComputeEngine Worker for Clovis') |
241 parser.add_argument('--config', required=True, | 255 parser.add_argument('--config', required=True, |
242 help='Path to the configuration file.') | 256 help='Path to the configuration file.') |
243 args = parser.parse_args() | 257 args = parser.parse_args() |
244 | 258 |
245 # Configure logging. | 259 # Configure logging. |
246 logging.basicConfig(level=logging.WARNING) | 260 logging.basicConfig(level=logging.WARNING) |
247 worker_logger = logging.getLogger('worker') | 261 worker_logger = logging.getLogger('worker') |
248 worker_logger.setLevel(logging.INFO) | 262 worker_logger.setLevel(logging.INFO) |
249 | 263 |
250 worker_logger.info('Reading configuration') | 264 worker_logger.info('Reading configuration') |
251 with open(args.config) as config_json: | 265 with open(args.config) as config_json: |
252 worker = Worker(json.load(config_json), worker_logger) | 266 worker = Worker(json.load(config_json), worker_logger) |
253 worker.Start() | 267 worker.Start() |
254 | 268 |
OLD | NEW |