| OLD | NEW |
| 1 # Copyright 2016 The Chromium Authors. All rights reserved. | 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 import argparse |
| 5 import json | 6 import json |
| 7 import logging |
| 6 import os | 8 import os |
| 7 import re | 9 import re |
| 8 import threading | 10 import sys |
| 9 import time | 11 import time |
| 10 import subprocess | 12 |
| 11 import sys | 13 from googleapiclient import discovery |
| 14 from oauth2client.client import GoogleCredentials |
| 12 | 15 |
| 13 # NOTE: The parent directory needs to be first in sys.path to avoid conflicts | 16 # NOTE: The parent directory needs to be first in sys.path to avoid conflicts |
| 14 # with catapult modules that have colliding names, as catapult inserts itself | 17 # with catapult modules that have colliding names, as catapult inserts itself |
| 15 # into the path as the second element. This is an ugly and fragile hack. | 18 # into the path as the second element. This is an ugly and fragile hack. |
| 16 sys.path.insert(0, | 19 sys.path.insert(0, |
| 17 os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir)) | 20 os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir, |
| 21 os.pardir)) |
| 18 import controller | 22 import controller |
| 23 from cloud.common.clovis_task import ClovisTask |
| 19 from google_storage_accessor import GoogleStorageAccessor | 24 from google_storage_accessor import GoogleStorageAccessor |
| 20 import loading_trace | 25 import loading_trace |
| 21 from loading_trace_database import LoadingTraceDatabase | 26 from loading_trace_database import LoadingTraceDatabase |
| 22 import options | 27 import options |
| 23 | 28 |
| 24 | 29 |
| 25 class ServerApp(object): | 30 class Worker(object): |
| 26 """Simple web server application, collecting traces and writing them in | 31 def __init__(self, config, logger): |
| 27 Google Cloud Storage. | 32 """See README.md for the config format.""" |
| 28 """ | 33 self._project_name = config['project_name'] |
| 34 self._taskqueue_tag = config['taskqueue_tag'] |
| 35 self._credentials = GoogleCredentials.get_application_default() |
| 36 self._logger = logger |
| 29 | 37 |
| 30 def __init__(self, configuration_file): | 38 # Separate the cloud storage path into the bucket and the base path under |
| 31 """|configuration_file| is a path to a file containing JSON as described in | 39 # the bucket. |
| 32 README.md. | 40 storage_path_components = config['cloud_storage_path'].split('/') |
| 33 """ | 41 self._bucket_name = storage_path_components[0] |
| 34 self._tasks = [] # List of remaining tasks, only modified by _thread. | 42 self._base_path_in_bucket = '' |
| 35 self._failed_tasks = [] # Failed tasks, only modified by _thread. | 43 if len(storage_path_components) > 1: |
| 36 self._thread = None | 44 self._base_path_in_bucket = '/'.join(storage_path_components[1:]) |
| 37 self._tasks_lock = threading.Lock() # Protects _tasks and _failed_tasks. | 45 if not self._base_path_in_bucket.endswith('/'): |
| 38 self._initial_task_count = -1 | 46 self._base_path_in_bucket += '/' |
| 39 self._start_time = None | |
| 40 print 'Reading configuration' | |
| 41 with open(configuration_file) as config_json: | |
| 42 config = json.load(config_json) | |
| 43 | 47 |
| 44 # Separate the cloud storage path into the bucket and the base path under | 48 # TODO: improve the trace database to support concurrent access. |
| 45 # the bucket. | 49 self._traces_dir = self._base_path_in_bucket + 'traces/' |
| 46 storage_path_components = config['cloud_storage_path'].split('/') | 50 self._trace_database = LoadingTraceDatabase({}) |
| 47 self._bucket_name = storage_path_components[0] | |
| 48 self._base_path_in_bucket = '' | |
| 49 if len(storage_path_components) > 1: | |
| 50 self._base_path_in_bucket = '/'.join(storage_path_components[1:]) | |
| 51 if not self._base_path_in_bucket.endswith('/'): | |
| 52 self._base_path_in_bucket += '/' | |
| 53 | 51 |
| 54 self._src_path = config['src_path'] | 52 self._src_path = config['src_path'] |
| 55 self._google_storage_accessor = GoogleStorageAccessor( | 53 self._google_storage_accessor = GoogleStorageAccessor( |
| 56 project_name=config['project_name'], bucket_name=self._bucket_name) | 54 credentials=self._credentials, project_name=self._project_name, |
| 55 bucket_name=self._bucket_name) |
| 57 | 56 |
| 58 # Initialize the global options that will be used during trace generation. | 57 # Initialize the global options that will be used during trace generation. |
| 59 options.OPTIONS.ParseArgs([]) | 58 options.OPTIONS.ParseArgs([]) |
| 60 options.OPTIONS.local_binary = config['chrome_path'] | 59 options.OPTIONS.local_binary = config['chrome_path'] |
| 61 | 60 |
| 62 def _IsProcessingTasks(self): | 61 def Start(self): |
| 63 """Returns True if the application is currently processing tasks.""" | 62 """Main worker loop. |
| 64 return self._thread is not None and self._thread.is_alive() | 63 |
| 64 Repeatedly pulls tasks from the task queue and processes them. Returns when |
| 65 the queue is empty. |
| 66 """ |
| 67 task_api = discovery.build('taskqueue', 'v1beta2', |
| 68 credentials=self._credentials) |
| 69 queue_name = 'clovis-queue' |
| 70 # Workaround for |
| 71 # https://code.google.com/p/googleappengine/issues/detail?id=10199 |
| 72 project = 's~' + self._project_name |
| 73 |
| 74 while True: |
| 75 self._logger.debug('Fetching new task.') |
| 76 (clovis_task, task_id) = self._FetchClovisTask(project, task_api, |
| 77 queue_name) |
| 78 if not clovis_task: |
| 79 if self._trace_database.ToJsonDict(): |
| 80 self._logger.info('No remaining tasks in the queue.') |
| 81 break |
| 82 else: |
| 83 delay_seconds = 60 |
| 84 self._logger.info( |
| 85 'Nothing in the queue, retrying in %i seconds.' % delay_seconds) |
| 86 time.sleep(delay_seconds) |
| 87 continue |
| 88 |
| 89 self._logger.info('Processing task %s' % task_id) |
| 90 self._ProcessClovisTask(clovis_task) |
| 91 self._logger.debug('Deleting task %s' % task_id) |
| 92 task_api.tasks().delete(project=project, taskqueue=queue_name, |
| 93 task=task_id).execute() |
| 94 self._logger.info('Finished task %s' % task_id) |
| 95 self._Finalize() |
| 96 |
| 97 def _FetchClovisTask(self, project_name, task_api, queue_name): |
| 98 """Fetches a ClovisTask from the task queue. |
| 99 |
| 100 Params: |
| 101 project_name(str): The name of the Google Cloud project. |
| 102 task_api: The TaskQueue service. |
| 103 queue_name(str): The name of the task queue. |
| 104 |
| 105 Returns: |
| 106 (ClovisTask, str): The fetched ClovisTask and its task ID, or (None, None) |
| 107 if no tasks are found. |
| 108 """ |
| 109 response = task_api.tasks().lease( |
| 110 project=project_name, taskqueue=queue_name, numTasks=1, leaseSecs=180, |
| 111 groupByTag=True, tag=self._taskqueue_tag).execute() |
| 112 if (not response.get('items')) or (len(response['items']) < 1): |
| 113 return (None, None) |
| 114 |
| 115 google_task = response['items'][0] |
| 116 task_id = google_task['id'] |
| 117 clovis_task = ClovisTask.FromBase64(google_task['payloadBase64']) |
| 118 return (clovis_task, task_id) |
| 119 |
| 120 def _Finalize(self): |
| 121 """Called before exiting.""" |
| 122 self._logger.info('Uploading trace database') |
| 123 self._google_storage_accessor.UploadString( |
| 124 json.dumps(self._trace_database.ToJsonDict(), indent=2), |
| 125 self._traces_dir + 'trace_database.json') |
| 126 # TODO(droger): Implement automatic instance destruction. |
| 127 self._logger.info('Done') |
| 128 |
| 65 | 129 |
| 66 def _GenerateTrace(self, url, emulate_device, emulate_network, filename, | 130 def _GenerateTrace(self, url, emulate_device, emulate_network, filename, |
| 67 log_filename): | 131 log_filename): |
| 68 """ Generates a trace on _thread. | 132 """ Generates a trace. |
| 69 | 133 |
| 70 Args: | 134 Args: |
| 71 url: URL as a string. | 135 url: URL as a string. |
| 72 emulate_device: Name of the device to emulate. Empty for no emulation. | 136 emulate_device: Name of the device to emulate. Empty for no emulation. |
| 73 emulate_network: Type of network emulation. Empty for no emulation. | 137 emulate_network: Type of network emulation. Empty for no emulation. |
| 74 filename: Name of the file where the trace is saved. | 138 filename: Name of the file where the trace is saved. |
| 75 log_filename: Name of the file where standard output and errors are logged | 139 log_filename: Name of the file where standard output and errors are |
| 140 logged. |
| 76 | 141 |
| 77 Returns: | 142 Returns: |
| 78 A dictionary of metadata about the trace, including a 'succeeded' field | 143 A dictionary of metadata about the trace, including a 'succeeded' field |
| 79 indicating whether the trace was successfully generated. | 144 indicating whether the trace was successfully generated. |
| 80 """ | 145 """ |
| 81 try: | 146 try: |
| 82 os.remove(filename) # Remove any existing trace for this URL. | 147 os.remove(filename) # Remove any existing trace for this URL. |
| 83 except OSError: | 148 except OSError: |
| 84 pass # Nothing to remove. | 149 pass # Nothing to remove. |
| 85 | 150 |
| (...skipping 30 matching lines...) Expand all Loading... |
| 116 | 181 |
| 117 if trace: | 182 if trace: |
| 118 with open(filename, 'w') as f: | 183 with open(filename, 'w') as f: |
| 119 json.dump(trace.ToJsonDict(), f, sort_keys=True, indent=2) | 184 json.dump(trace.ToJsonDict(), f, sort_keys=True, indent=2) |
| 120 | 185 |
| 121 sys.stdout = old_stdout | 186 sys.stdout = old_stdout |
| 122 sys.stderr = old_stderr | 187 sys.stderr = old_stderr |
| 123 | 188 |
| 124 return trace_metadata | 189 return trace_metadata |
| 125 | 190 |
| 126 def _GetCurrentTaskCount(self): | 191 def _ProcessClovisTask(self, clovis_task): |
| 127 """Returns the number of remaining tasks. Thread safe.""" | 192 """Processes one clovis_task.""" |
| 128 self._tasks_lock.acquire() | 193 if clovis_task.Action() != 'trace': |
| 129 task_count = len(self._tasks) | 194 self._logger.error('Unsupported task action: %s' % clovis_task.Action()) |
| 130 self._tasks_lock.release() | 195 return |
| 131 return task_count | |
| 132 | 196 |
| 133 def _ProcessTasks(self, tasks, repeat_count, emulate_device, emulate_network): | 197 # Extract the task parameters. |
| 134 """Iterates over _task, generating a trace for each of them. Uploads the | 198 params = clovis_task.Params() |
| 135 resulting traces to Google Cloud Storage. Runs on _thread. | 199 urls = params['urls'] |
| 200 repeat_count = params.get('repeat_count', 1) |
| 201 emulate_device = params.get('emulate_device') |
| 202 emulate_network = params.get('emulate_network') |
| 136 | 203 |
| 137 Args: | |
| 138 tasks: The list of URLs to process. | |
| 139 repeat_count: The number of traces generated for each URL. | |
| 140 emulate_device: Name of the device to emulate. Empty for no emulation. | |
| 141 emulate_network: Type of network emulation. Empty for no emulation. | |
| 142 """ | |
| 143 # The main thread might be reading the task lists, take the lock to modify. | |
| 144 self._tasks_lock.acquire() | |
| 145 self._tasks = tasks | |
| 146 self._failed_tasks = [] | |
| 147 self._tasks_lock.release() | |
| 148 failures_dir = self._base_path_in_bucket + 'failures/' | 204 failures_dir = self._base_path_in_bucket + 'failures/' |
| 149 traces_dir = self._base_path_in_bucket + 'traces/' | |
| 150 | |
| 151 trace_database = LoadingTraceDatabase({}) | |
| 152 | |
| 153 # TODO(blundell): Fix this up. | 205 # TODO(blundell): Fix this up. |
| 154 logs_dir = self._base_path_in_bucket + 'analyze_logs/' | 206 logs_dir = self._base_path_in_bucket + 'analyze_logs/' |
| 155 log_filename = 'analyze.log' | 207 log_filename = 'analyze.log' |
| 156 # Avoid special characters in storage object names | 208 # Avoid special characters in storage object names |
| 157 pattern = re.compile(r"[#\?\[\]\*/]") | 209 pattern = re.compile(r"[#\?\[\]\*/]") |
| 158 while len(self._tasks) > 0: | 210 |
| 159 url = self._tasks[-1] | 211 while len(urls) > 0: |
| 212 url = urls.pop() |
| 160 local_filename = pattern.sub('_', url) | 213 local_filename = pattern.sub('_', url) |
| 161 for repeat in range(repeat_count): | 214 for repeat in range(repeat_count): |
| 162 print 'Generating trace for URL: %s' % url | 215 self._logger.debug('Generating trace for URL: %s' % url) |
| 163 remote_filename = local_filename + '/' + str(repeat) | 216 remote_filename = local_filename + '/' + str(repeat) |
| 164 trace_metadata = self._GenerateTrace( | 217 trace_metadata = self._GenerateTrace( |
| 165 url, emulate_device, emulate_network, local_filename, log_filename) | 218 url, emulate_device, emulate_network, local_filename, log_filename) |
| 166 if trace_metadata['succeeded']: | 219 if trace_metadata['succeeded']: |
| 167 print 'Uploading: %s' % remote_filename | 220 self._logger.debug('Uploading: %s' % remote_filename) |
| 168 remote_trace_location = traces_dir + remote_filename | 221 remote_trace_location = self._traces_dir + remote_filename |
| 169 self._google_storage_accessor.UploadFile(local_filename, | 222 self._google_storage_accessor.UploadFile(local_filename, |
| 170 remote_trace_location) | 223 remote_trace_location) |
| 171 full_cloud_storage_path = ('gs://' + self._bucket_name + '/' + | 224 full_cloud_storage_path = ('gs://' + self._bucket_name + '/' + |
| 172 remote_trace_location) | 225 remote_trace_location) |
| 173 trace_database.AddTrace(full_cloud_storage_path, trace_metadata) | 226 self._trace_database.AddTrace(full_cloud_storage_path, trace_metadata) |
| 174 else: | 227 else: |
| 175 print 'Trace generation failed for URL: %s' % url | 228 self._logger.warning('Trace generation failed for URL: %s' % url) |
| 176 self._tasks_lock.acquire() | 229 # TODO: upload the failure |
| 177 self._failed_tasks.append({ "url": url, "repeat": repeat}) | |
| 178 self._tasks_lock.release() | |
| 179 if os.path.isfile(local_filename): | 230 if os.path.isfile(local_filename): |
| 180 self._google_storage_accessor.UploadFile(local_filename, | 231 self._google_storage_accessor.UploadFile(local_filename, |
| 181 failures_dir + remote_filename) | 232 failures_dir + remote_filename) |
| 182 print 'Uploading log' | 233 self._logger.debug('Uploading log') |
| 183 self._google_storage_accessor.UploadFile(log_filename, | 234 self._google_storage_accessor.UploadFile(log_filename, |
| 184 logs_dir + remote_filename) | 235 logs_dir + remote_filename) |
| 185 # Pop once task is finished, for accurate status tracking. | |
| 186 self._tasks_lock.acquire() | |
| 187 url = self._tasks.pop() | |
| 188 self._tasks_lock.release() | |
| 189 | |
| 190 self._google_storage_accessor.UploadString( | |
| 191 json.dumps(trace_database.ToJsonDict(), indent=2), | |
| 192 traces_dir + 'trace_database.json') | |
| 193 | |
| 194 if len(self._failed_tasks) > 0: | |
| 195 print 'Uploading failing URLs' | |
| 196 self._google_storage_accessor.UploadString( | |
| 197 json.dumps(self._failed_tasks, indent=2), | |
| 198 failures_dir + 'failures.json') | |
| 199 | |
| 200 def _SetTaskList(self, http_body): | |
| 201 """Sets the list of tasks and starts processing them | |
| 202 | |
| 203 Args: | |
| 204 http_body: JSON dictionary. See README.md for a description of the format. | |
| 205 | |
| 206 Returns: | |
| 207 A string to be sent back to the client, describing the success status of | |
| 208 the request. | |
| 209 """ | |
| 210 if self._IsProcessingTasks(): | |
| 211 return 'Error: Already running\n' | |
| 212 | |
| 213 load_parameters = json.loads(http_body) | |
| 214 try: | |
| 215 tasks = load_parameters['urls'] | |
| 216 except KeyError: | |
| 217 return 'Error: invalid urls\n' | |
| 218 # Optional parameters. | |
| 219 try: | |
| 220 repeat_count = int(load_parameters.get('repeat_count', '1')) | |
| 221 except ValueError: | |
| 222 return 'Error: invalid repeat_count\n' | |
| 223 emulate_device = load_parameters.get('emulate_device', '') | |
| 224 emulate_network = load_parameters.get('emulate_network', '') | |
| 225 | |
| 226 if len(tasks) == 0: | |
| 227 return 'Error: Empty task list\n' | |
| 228 else: | |
| 229 self._initial_task_count = len(tasks) | |
| 230 self._start_time = time.time() | |
| 231 self._thread = threading.Thread( | |
| 232 target = self._ProcessTasks, | |
| 233 args = (tasks, repeat_count, emulate_device, emulate_network)) | |
| 234 self._thread.start() | |
| 235 return 'Starting generation of %s tasks\n' % str(self._initial_task_count) | |
| 236 | |
| 237 def __call__(self, environ, start_response): | |
| 238 path = environ['PATH_INFO'] | |
| 239 | |
| 240 if path == '/set_tasks': | |
| 241 # Get the tasks from the HTTP body. | |
| 242 try: | |
| 243 body_size = int(environ.get('CONTENT_LENGTH', 0)) | |
| 244 except (ValueError): | |
| 245 body_size = 0 | |
| 246 body = environ['wsgi.input'].read(body_size) | |
| 247 data = self._SetTaskList(body) | |
| 248 elif path == '/test': | |
| 249 data = 'hello\n' | |
| 250 elif path == '/status': | |
| 251 if not self._IsProcessingTasks(): | |
| 252 data = 'Idle\n' | |
| 253 else: | |
| 254 task_count = self._GetCurrentTaskCount() | |
| 255 if task_count == 0: | |
| 256 data = '%s tasks complete. Finalizing.\n' % self._initial_task_count | |
| 257 else: | |
| 258 data = 'Remaining tasks: %s / %s\n' % ( | |
| 259 task_count, self._initial_task_count) | |
| 260 elapsed = time.time() - self._start_time | |
| 261 data += 'Elapsed time: %s seconds\n' % str(elapsed) | |
| 262 self._tasks_lock.acquire() | |
| 263 failed_tasks = self._failed_tasks | |
| 264 self._tasks_lock.release() | |
| 265 data += '%s failed tasks:\n' % len(failed_tasks) | |
| 266 data += json.dumps(failed_tasks, indent=2) | |
| 267 else: | |
| 268 start_response('404 NOT FOUND', [('Content-Length', '0')]) | |
| 269 return iter(['']) | |
| 270 | |
| 271 response_headers = [ | |
| 272 ('Content-type','text/plain'), | |
| 273 ('Content-Length', str(len(data))) | |
| 274 ] | |
| 275 start_response('200 OK', response_headers) | |
| 276 return iter([data]) | |
| 277 | 236 |
| 278 | 237 |
| 279 def StartApp(configuration_file): | 238 if __name__ == '__main__': |
| 280 return ServerApp(configuration_file) | 239 parser = argparse.ArgumentParser( |
| 240 description='ComputeEngine Worker for Clovis') |
| 241 parser.add_argument('--config', required=True, |
| 242 help='Path to the configuration file.') |
| 243 args = parser.parse_args() |
| 244 |
| 245 # Configure logging. |
| 246 logging.basicConfig(level=logging.WARNING) |
| 247 worker_logger = logging.getLogger('worker') |
| 248 worker_logger.setLevel(logging.INFO) |
| 249 |
| 250 worker_logger.info('Reading configuration') |
| 251 with open(args.config) as config_json: |
| 252 worker = Worker(json.load(config_json), worker_logger) |
| 253 worker.Start() |
| 254 |
| OLD | NEW |