Chromium Code Reviews| Index: tools/android/loading/gce/worker.py |
| diff --git a/tools/android/loading/gce/main.py b/tools/android/loading/gce/worker.py |
| similarity index 40% |
| rename from tools/android/loading/gce/main.py |
| rename to tools/android/loading/gce/worker.py |
| index 0ef2caece10f9050009a609a37f9fc15eead2c28..b9a0b88f2cbbf4dfd44f83da42a423fedcfd0f30 100644 |
| --- a/tools/android/loading/gce/main.py |
| +++ b/tools/android/loading/gce/worker.py |
| @@ -2,13 +2,15 @@ |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| +import argparse |
| import json |
| import os |
| import re |
| -import threading |
| -import time |
| -import subprocess |
| import sys |
| +import time |
| + |
| +from googleapiclient import discovery |
| +from oauth2client.client import GoogleCredentials |
| # NOTE: The parent directory needs to be first in sys.path to avoid conflicts |
| # with catapult modules that have colliding names, as catapult inserts itself |
| @@ -16,56 +18,99 @@ import sys |
| sys.path.insert(0, |
| os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir)) |
| import controller |
| +from frontend.clovis_task import ClovisTask |
|
blundell
2016/04/19 11:31:22
I think that rather than have the backend access t
droger
2016/04/19 12:10:51
Unfortunately this is not possible. Any file neede
|
| from google_storage_accessor import GoogleStorageAccessor |
| import loading_trace |
| from loading_trace_database import LoadingTraceDatabase |
| import options |
| -class ServerApp(object): |
| - """Simple web server application, collecting traces and writing them in |
| - Google Cloud Storage. |
| - """ |
| +class Worker(object): |
| + def __init__(self, config): |
| + """See README.md for the config format""" |
|
blundell
2016/04/19 11:31:22
nit: You're missing periods at the end of many of
droger
2016/04/19 12:10:51
Done, in the other CL as well.
|
| + self._credentials = GoogleCredentials.get_application_default() |
| - def __init__(self, configuration_file): |
| - """|configuration_file| is a path to a file containing JSON as described in |
| - README.md. |
| - """ |
| - self._tasks = [] # List of remaining tasks, only modified by _thread. |
| - self._failed_tasks = [] # Failed tasks, only modified by _thread. |
| - self._thread = None |
| - self._tasks_lock = threading.Lock() # Protects _tasks and _failed_tasks. |
| - self._initial_task_count = -1 |
| - self._start_time = None |
| - print 'Reading configuration' |
| - with open(configuration_file) as config_json: |
| - config = json.load(config_json) |
| - |
| - # Separate the cloud storage path into the bucket and the base path under |
| - # the bucket. |
| - storage_path_components = config['cloud_storage_path'].split('/') |
| - self._bucket_name = storage_path_components[0] |
| - self._base_path_in_bucket = '' |
| - if len(storage_path_components) > 1: |
| - self._base_path_in_bucket = '/'.join(storage_path_components[1:]) |
| - if not self._base_path_in_bucket.endswith('/'): |
| - self._base_path_in_bucket += '/' |
| - |
| - self._src_path = config['src_path'] |
| - self._google_storage_accessor = GoogleStorageAccessor( |
| - project_name=config['project_name'], bucket_name=self._bucket_name) |
| + # Separate the cloud storage path into the bucket and the base path under |
|
blundell
2016/04/19 11:31:22
Please move lines 33-36 to be directly above line
droger
2016/04/19 12:10:51
Done.
|
| + # the bucket. |
| + storage_path_components = config['cloud_storage_path'].split('/') |
| + self._bucket_name = storage_path_components[0] |
| + self._project_name = config['project_name'] |
| + self._taskqueue_tag = config['taskqueue_tag'] |
| + |
| + self._base_path_in_bucket = '' |
| + if len(storage_path_components) > 1: |
| + self._base_path_in_bucket = '/'.join(storage_path_components[1:]) |
| + if not self._base_path_in_bucket.endswith('/'): |
| + self._base_path_in_bucket += '/' |
| + |
| + # TODO: improve the trace database to support concurrent access. |
| + self._traces_dir = self._base_path_in_bucket + 'traces/' |
| + self._trace_database = LoadingTraceDatabase({}) |
| + |
| + self._src_path = config['src_path'] |
| + self._google_storage_accessor = GoogleStorageAccessor( |
| + credentials=self._credentials, project_name=self._project_name, |
| + bucket_name=self._bucket_name) |
| # Initialize the global options that will be used during trace generation. |
| options.OPTIONS.ParseArgs([]) |
| options.OPTIONS.local_binary = config['chrome_path'] |
| - def _IsProcessingTasks(self): |
| - """Returns True if the application is currently processing tasks.""" |
| - return self._thread is not None and self._thread.is_alive() |
| + def Start(self): |
| + """Main worker loop. |
| + |
| + Repeatedly pulls tasks from the task queue and processes them. Returns when |
| + the queue is empty |
| + """ |
| + task_api = discovery.build('taskqueue', 'v1beta2', |
| + credentials=self._credentials) |
| + queue_name = 'clovis-queue' |
| + # Workaround for |
| + # https://code.google.com/p/googleappengine/issues/detail?id=10199 |
| + project = 's~' + self._project_name |
| + |
| + finished = False |
| + started = False |
| + while not finished: |
| + response = task_api.tasks().lease( |
|
blundell
2016/04/19 11:31:22
Could we put most of the code from lines 75 throug
droger
2016/04/19 12:10:51
Done.
|
| + project=project, taskqueue=queue_name, numTasks=1, leaseSecs=180, |
| + groupByTag=True, tag=self._taskqueue_tag).execute() |
| + if (not response.get('items')) or (len(response['items']) < 1): |
| + if started: |
| + finished = True |
| + else: |
| + # Do not stop before processing at least one task. |
| + delay_seconds = 30 |
| + print 'No tasks, retrying in %s seconds' % delay_seconds |
| + time.sleep(delay_seconds) |
| + continue |
| + |
| + started = True |
| + google_task = response['items'][0] |
| + task_id = google_task['id'] |
| + print 'Processing task %s' % task_id |
| + |
| + clovis_task = ClovisTask.FromBase64(google_task['payloadBase64']) |
| + self._ProcessClovisTask(clovis_task) |
| + |
| + task_api.tasks().delete(project=project, taskqueue=queue_name, |
| + task=task_id).execute() |
| + print 'Finished task %s' % task_id |
| + self._Finalize() |
| + |
| + def _Finalize(self): |
| + """Called before exiting.""" |
| + print 'Uploading trace database' |
| + self._google_storage_accessor.UploadString( |
| + json.dumps(self._trace_database.ToJsonDict(), indent=2), |
| + self._traces_dir + 'trace_database.json') |
| + # TODO(droger): Implement automatic instance destruction. |
| + print 'Done' |
| + |
| def _GenerateTrace(self, url, emulate_device, emulate_network, filename, |
| log_filename): |
| - """ Generates a trace on _thread. |
| + """ Generates a trace. |
| Args: |
| url: URL as a string. |
| @@ -123,40 +168,28 @@ class ServerApp(object): |
| return trace_metadata |
| - def _GetCurrentTaskCount(self): |
| - """Returns the number of remaining tasks. Thread safe.""" |
| - self._tasks_lock.acquire() |
| - task_count = len(self._tasks) |
| - self._tasks_lock.release() |
| - return task_count |
| + def _ProcessClovisTask(self, clovis_task): |
| + """Processes one clovis_task""" |
| + if clovis_task.Action() != 'trace': |
| + print 'Unsupported task action: %s' % clovis_task.Action() |
| + return |
| - def _ProcessTasks(self, tasks, repeat_count, emulate_device, emulate_network): |
| - """Iterates over _task, generating a trace for each of them. Uploads the |
| - resulting traces to Google Cloud Storage. Runs on _thread. |
| + # Extract the task parameters. |
| + params = clovis_task.Params() |
| + urls = params['urls'] |
| + repeat_count = params.get('repeat_count', 1) |
| + emulate_device = params.get('emulate_device') |
| + emulate_network = params.get('emulate_network') |
| - Args: |
| - tasks: The list of URLs to process. |
| - repeat_count: The number of traces generated for each URL. |
| - emulate_device: Name of the device to emulate. Empty for no emulation. |
| - emulate_network: Type of network emulation. Empty for no emulation. |
| - """ |
| - # The main thread might be reading the task lists, take the lock to modify. |
| - self._tasks_lock.acquire() |
| - self._tasks = tasks |
| - self._failed_tasks = [] |
| - self._tasks_lock.release() |
| failures_dir = self._base_path_in_bucket + 'failures/' |
| - traces_dir = self._base_path_in_bucket + 'traces/' |
| - |
| - trace_database = LoadingTraceDatabase({}) |
| - |
| # TODO(blundell): Fix this up. |
| logs_dir = self._base_path_in_bucket + 'analyze_logs/' |
| log_filename = 'analyze.log' |
| # Avoid special characters in storage object names |
| pattern = re.compile(r"[#\?\[\]\*/]") |
| - while len(self._tasks) > 0: |
| - url = self._tasks[-1] |
| + |
| + while len(urls) > 0: |
| + url = urls.pop() |
| local_filename = pattern.sub('_', url) |
| for repeat in range(repeat_count): |
| print 'Generating trace for URL: %s' % url |
| @@ -165,116 +198,31 @@ class ServerApp(object): |
| url, emulate_device, emulate_network, local_filename, log_filename) |
| if trace_metadata['succeeded']: |
| print 'Uploading: %s' % remote_filename |
| - remote_trace_location = traces_dir + remote_filename |
| + remote_trace_location = self._traces_dir + remote_filename |
| self._google_storage_accessor.UploadFile(local_filename, |
| - remote_trace_location) |
| + remote_trace_location) |
| full_cloud_storage_path = ('gs://' + self._bucket_name + '/' + |
| remote_trace_location) |
| - trace_database.AddTrace(full_cloud_storage_path, trace_metadata) |
| + self._trace_database.AddTrace(full_cloud_storage_path, trace_metadata) |
| else: |
| print 'Trace generation failed for URL: %s' % url |
| - self._tasks_lock.acquire() |
| - self._failed_tasks.append({ "url": url, "repeat": repeat}) |
| - self._tasks_lock.release() |
| + # TODO: upload the failure |
| if os.path.isfile(local_filename): |
| self._google_storage_accessor.UploadFile(local_filename, |
| failures_dir + remote_filename) |
| print 'Uploading log' |
| self._google_storage_accessor.UploadFile(log_filename, |
| - logs_dir + remote_filename) |
| - # Pop once task is finished, for accurate status tracking. |
| - self._tasks_lock.acquire() |
| - url = self._tasks.pop() |
| - self._tasks_lock.release() |
| + logs_dir + remote_filename) |
| - self._google_storage_accessor.UploadString( |
| - json.dumps(trace_database.ToJsonDict(), indent=2), |
| - traces_dir + 'trace_database.json') |
| - |
| - if len(self._failed_tasks) > 0: |
| - print 'Uploading failing URLs' |
| - self._google_storage_accessor.UploadString( |
| - json.dumps(self._failed_tasks, indent=2), |
| - failures_dir + 'failures.json') |
| - def _SetTaskList(self, http_body): |
| - """Sets the list of tasks and starts processing them |
| - |
| - Args: |
| - http_body: JSON dictionary. See README.md for a description of the format. |
| +if __name__ == '__main__': |
| + parser = argparse.ArgumentParser( |
| + description='ComputeEngine Worker for Clovis') |
| + parser.add_argument('--config', required=True, |
| + help='Path to the configuration file.') |
| + args = parser.parse_args() |
| + print 'Reading configuration' |
| + with open(args.config) as config_json: |
| + worker = Worker(json.load(config_json)) |
| + worker.Start() |
| - Returns: |
| - A string to be sent back to the client, describing the success status of |
| - the request. |
| - """ |
| - if self._IsProcessingTasks(): |
| - return 'Error: Already running\n' |
| - |
| - load_parameters = json.loads(http_body) |
| - try: |
| - tasks = load_parameters['urls'] |
| - except KeyError: |
| - return 'Error: invalid urls\n' |
| - # Optional parameters. |
| - try: |
| - repeat_count = int(load_parameters.get('repeat_count', '1')) |
| - except ValueError: |
| - return 'Error: invalid repeat_count\n' |
| - emulate_device = load_parameters.get('emulate_device', '') |
| - emulate_network = load_parameters.get('emulate_network', '') |
| - |
| - if len(tasks) == 0: |
| - return 'Error: Empty task list\n' |
| - else: |
| - self._initial_task_count = len(tasks) |
| - self._start_time = time.time() |
| - self._thread = threading.Thread( |
| - target = self._ProcessTasks, |
| - args = (tasks, repeat_count, emulate_device, emulate_network)) |
| - self._thread.start() |
| - return 'Starting generation of %s tasks\n' % str(self._initial_task_count) |
| - |
| - def __call__(self, environ, start_response): |
| - path = environ['PATH_INFO'] |
| - |
| - if path == '/set_tasks': |
| - # Get the tasks from the HTTP body. |
| - try: |
| - body_size = int(environ.get('CONTENT_LENGTH', 0)) |
| - except (ValueError): |
| - body_size = 0 |
| - body = environ['wsgi.input'].read(body_size) |
| - data = self._SetTaskList(body) |
| - elif path == '/test': |
| - data = 'hello\n' |
| - elif path == '/status': |
| - if not self._IsProcessingTasks(): |
| - data = 'Idle\n' |
| - else: |
| - task_count = self._GetCurrentTaskCount() |
| - if task_count == 0: |
| - data = '%s tasks complete. Finalizing.\n' % self._initial_task_count |
| - else: |
| - data = 'Remaining tasks: %s / %s\n' % ( |
| - task_count, self._initial_task_count) |
| - elapsed = time.time() - self._start_time |
| - data += 'Elapsed time: %s seconds\n' % str(elapsed) |
| - self._tasks_lock.acquire() |
| - failed_tasks = self._failed_tasks |
| - self._tasks_lock.release() |
| - data += '%s failed tasks:\n' % len(failed_tasks) |
| - data += json.dumps(failed_tasks, indent=2) |
| - else: |
| - start_response('404 NOT FOUND', [('Content-Length', '0')]) |
| - return iter(['']) |
| - |
| - response_headers = [ |
| - ('Content-type','text/plain'), |
| - ('Content-Length', str(len(data))) |
| - ] |
| - start_response('200 OK', response_headers) |
| - return iter([data]) |
| - |
| - |
| -def StartApp(configuration_file): |
| - return ServerApp(configuration_file) |