Index: tools/android/loading/cloud/backend/worker.py |
diff --git a/tools/android/loading/gce/main.py b/tools/android/loading/cloud/backend/worker.py |
similarity index 38% |
rename from tools/android/loading/gce/main.py |
rename to tools/android/loading/cloud/backend/worker.py |
index 0ef2caece10f9050009a609a37f9fc15eead2c28..b9900cec4748382f753c9a8628e0b202ca865a4d 100644 |
--- a/tools/android/loading/gce/main.py |
+++ b/tools/android/loading/cloud/backend/worker.py |
@@ -2,77 +2,142 @@ |
# Use of this source code is governed by a BSD-style license that can be |
# found in the LICENSE file. |
+import argparse |
import json |
+import logging |
import os |
import re |
-import threading |
-import time |
-import subprocess |
import sys |
+import time |
+ |
+from googleapiclient import discovery |
+from oauth2client.client import GoogleCredentials |
# NOTE: The parent directory needs to be first in sys.path to avoid conflicts |
# with catapult modules that have colliding names, as catapult inserts itself |
# into the path as the second element. This is an ugly and fragile hack. |
sys.path.insert(0, |
- os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir)) |
+ os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir, |
+ os.pardir)) |
import controller |
+from cloud.common.clovis_task import ClovisTask |
from google_storage_accessor import GoogleStorageAccessor |
import loading_trace |
from loading_trace_database import LoadingTraceDatabase |
import options |
-class ServerApp(object): |
- """Simple web server application, collecting traces and writing them in |
- Google Cloud Storage. |
- """ |
- |
- def __init__(self, configuration_file): |
- """|configuration_file| is a path to a file containing JSON as described in |
- README.md. |
- """ |
- self._tasks = [] # List of remaining tasks, only modified by _thread. |
- self._failed_tasks = [] # Failed tasks, only modified by _thread. |
- self._thread = None |
- self._tasks_lock = threading.Lock() # Protects _tasks and _failed_tasks. |
- self._initial_task_count = -1 |
- self._start_time = None |
- print 'Reading configuration' |
- with open(configuration_file) as config_json: |
- config = json.load(config_json) |
- |
- # Separate the cloud storage path into the bucket and the base path under |
- # the bucket. |
- storage_path_components = config['cloud_storage_path'].split('/') |
- self._bucket_name = storage_path_components[0] |
- self._base_path_in_bucket = '' |
- if len(storage_path_components) > 1: |
- self._base_path_in_bucket = '/'.join(storage_path_components[1:]) |
- if not self._base_path_in_bucket.endswith('/'): |
- self._base_path_in_bucket += '/' |
- |
- self._src_path = config['src_path'] |
- self._google_storage_accessor = GoogleStorageAccessor( |
- project_name=config['project_name'], bucket_name=self._bucket_name) |
+class Worker(object): |
+ def __init__(self, config, logger): |
+ """See README.md for the config format.""" |
+ self._project_name = config['project_name'] |
+ self._taskqueue_tag = config['taskqueue_tag'] |
+ self._credentials = GoogleCredentials.get_application_default() |
+ self._logger = logger |
+ |
+ # Separate the cloud storage path into the bucket and the base path under |
+ # the bucket. |
+ storage_path_components = config['cloud_storage_path'].split('/') |
+ self._bucket_name = storage_path_components[0] |
+ self._base_path_in_bucket = '' |
+ if len(storage_path_components) > 1: |
+ self._base_path_in_bucket = '/'.join(storage_path_components[1:]) |
+ if not self._base_path_in_bucket.endswith('/'): |
+ self._base_path_in_bucket += '/' |
+ |
+ # TODO: improve the trace database to support concurrent access. |
+ self._traces_dir = self._base_path_in_bucket + 'traces/' |
+ self._trace_database = LoadingTraceDatabase({}) |
+ |
+ self._src_path = config['src_path'] |
+ self._google_storage_accessor = GoogleStorageAccessor( |
+ credentials=self._credentials, project_name=self._project_name, |
+ bucket_name=self._bucket_name) |
# Initialize the global options that will be used during trace generation. |
options.OPTIONS.ParseArgs([]) |
options.OPTIONS.local_binary = config['chrome_path'] |
- def _IsProcessingTasks(self): |
- """Returns True if the application is currently processing tasks.""" |
- return self._thread is not None and self._thread.is_alive() |
+ def Start(self): |
+ """Main worker loop. |
+ |
+ Repeatedly pulls tasks from the task queue and processes them. Returns when |
+ the queue is empty. |
+ """ |
+ task_api = discovery.build('taskqueue', 'v1beta2', |
+ credentials=self._credentials) |
+ queue_name = 'clovis-queue' |
+ # Workaround for |
+ # https://code.google.com/p/googleappengine/issues/detail?id=10199 |
+ project = 's~' + self._project_name |
+ |
+ while True: |
+ self._logger.debug('Fetching new task.') |
+ (clovis_task, task_id) = self._FetchClovisTask(project, task_api, |
+ queue_name) |
+ if not clovis_task: |
+ if self._trace_database.ToJsonDict(): |
+ self._logger.info('No remaining tasks in the queue.') |
+ break |
+ else: |
+ delay_seconds = 60 |
+ self._logger.info( |
+ 'Nothing in the queue, retrying in %i seconds.' % delay_seconds) |
+ time.sleep(delay_seconds) |
+ continue |
+ |
+ self._logger.info('Processing task %s' % task_id) |
+ self._ProcessClovisTask(clovis_task) |
+ self._logger.debug('Deleting task %s' % task_id) |
+ task_api.tasks().delete(project=project, taskqueue=queue_name, |
+ task=task_id).execute() |
+ self._logger.info('Finished task %s' % task_id) |
+ self._Finalize() |
+ |
+ def _FetchClovisTask(self, project_name, task_api, queue_name): |
+ """Fetches a ClovisTask from the task queue. |
+ |
+ Params: |
+ project_name(str): The name of the Google Cloud project. |
+ task_api: The TaskQueue service. |
+ queue_name(str): The name of the task queue. |
+ |
+ Returns: |
+ (ClovisTask, str): The fetched ClovisTask and its task ID, or (None, None) |
+ if no tasks are found. |
+ """ |
+ response = task_api.tasks().lease( |
+ project=project_name, taskqueue=queue_name, numTasks=1, leaseSecs=180, |
+ groupByTag=True, tag=self._taskqueue_tag).execute() |
+ if (not response.get('items')) or (len(response['items']) < 1): |
+ return (None, None) |
+ |
+ google_task = response['items'][0] |
+ task_id = google_task['id'] |
+ clovis_task = ClovisTask.FromBase64(google_task['payloadBase64']) |
+ return (clovis_task, task_id) |
+ |
+ def _Finalize(self): |
+ """Called before exiting.""" |
+ self._logger.info('Uploading trace database') |
+ self._google_storage_accessor.UploadString( |
+ json.dumps(self._trace_database.ToJsonDict(), indent=2), |
+ self._traces_dir + 'trace_database.json') |
+ # TODO(droger): Implement automatic instance destruction. |
+ self._logger.info('Done') |
+ |
def _GenerateTrace(self, url, emulate_device, emulate_network, filename, |
log_filename): |
- """ Generates a trace on _thread. |
+ """ Generates a trace. |
Args: |
url: URL as a string. |
emulate_device: Name of the device to emulate. Empty for no emulation. |
emulate_network: Type of network emulation. Empty for no emulation. |
filename: Name of the file where the trace is saved. |
- log_filename: Name of the file where standard output and errors are logged |
+ log_filename: Name of the file where standard output and errors are |
+ logged. |
Returns: |
A dictionary of metadata about the trace, including a 'succeeded' field |
@@ -123,158 +188,67 @@ class ServerApp(object): |
return trace_metadata |
- def _GetCurrentTaskCount(self): |
- """Returns the number of remaining tasks. Thread safe.""" |
- self._tasks_lock.acquire() |
- task_count = len(self._tasks) |
- self._tasks_lock.release() |
- return task_count |
+ def _ProcessClovisTask(self, clovis_task): |
+ """Processes one clovis_task.""" |
+ if clovis_task.Action() != 'trace': |
+ self._logger.error('Unsupported task action: %s' % clovis_task.Action()) |
+ return |
- def _ProcessTasks(self, tasks, repeat_count, emulate_device, emulate_network): |
- """Iterates over _task, generating a trace for each of them. Uploads the |
- resulting traces to Google Cloud Storage. Runs on _thread. |
+ # Extract the task parameters. |
+ params = clovis_task.Params() |
+ urls = params['urls'] |
+ repeat_count = params.get('repeat_count', 1) |
+ emulate_device = params.get('emulate_device') |
+ emulate_network = params.get('emulate_network') |
- Args: |
- tasks: The list of URLs to process. |
- repeat_count: The number of traces generated for each URL. |
- emulate_device: Name of the device to emulate. Empty for no emulation. |
- emulate_network: Type of network emulation. Empty for no emulation. |
- """ |
- # The main thread might be reading the task lists, take the lock to modify. |
- self._tasks_lock.acquire() |
- self._tasks = tasks |
- self._failed_tasks = [] |
- self._tasks_lock.release() |
failures_dir = self._base_path_in_bucket + 'failures/' |
- traces_dir = self._base_path_in_bucket + 'traces/' |
- |
- trace_database = LoadingTraceDatabase({}) |
- |
# TODO(blundell): Fix this up. |
logs_dir = self._base_path_in_bucket + 'analyze_logs/' |
log_filename = 'analyze.log' |
# Avoid special characters in storage object names |
pattern = re.compile(r"[#\?\[\]\*/]") |
- while len(self._tasks) > 0: |
- url = self._tasks[-1] |
+ |
+ while len(urls) > 0: |
+ url = urls.pop() |
local_filename = pattern.sub('_', url) |
for repeat in range(repeat_count): |
- print 'Generating trace for URL: %s' % url |
+ self._logger.debug('Generating trace for URL: %s' % url) |
remote_filename = local_filename + '/' + str(repeat) |
trace_metadata = self._GenerateTrace( |
url, emulate_device, emulate_network, local_filename, log_filename) |
if trace_metadata['succeeded']: |
- print 'Uploading: %s' % remote_filename |
- remote_trace_location = traces_dir + remote_filename |
+ self._logger.debug('Uploading: %s' % remote_filename) |
+ remote_trace_location = self._traces_dir + remote_filename |
self._google_storage_accessor.UploadFile(local_filename, |
- remote_trace_location) |
+ remote_trace_location) |
full_cloud_storage_path = ('gs://' + self._bucket_name + '/' + |
remote_trace_location) |
- trace_database.AddTrace(full_cloud_storage_path, trace_metadata) |
+ self._trace_database.AddTrace(full_cloud_storage_path, trace_metadata) |
else: |
- print 'Trace generation failed for URL: %s' % url |
- self._tasks_lock.acquire() |
- self._failed_tasks.append({ "url": url, "repeat": repeat}) |
- self._tasks_lock.release() |
+ self._logger.warning('Trace generation failed for URL: %s' % url) |
+ # TODO: upload the failure |
if os.path.isfile(local_filename): |
self._google_storage_accessor.UploadFile(local_filename, |
failures_dir + remote_filename) |
- print 'Uploading log' |
+ self._logger.debug('Uploading log') |
self._google_storage_accessor.UploadFile(log_filename, |
- logs_dir + remote_filename) |
- # Pop once task is finished, for accurate status tracking. |
- self._tasks_lock.acquire() |
- url = self._tasks.pop() |
- self._tasks_lock.release() |
- |
- self._google_storage_accessor.UploadString( |
- json.dumps(trace_database.ToJsonDict(), indent=2), |
- traces_dir + 'trace_database.json') |
+ logs_dir + remote_filename) |
- if len(self._failed_tasks) > 0: |
- print 'Uploading failing URLs' |
- self._google_storage_accessor.UploadString( |
- json.dumps(self._failed_tasks, indent=2), |
- failures_dir + 'failures.json') |
- def _SetTaskList(self, http_body): |
- """Sets the list of tasks and starts processing them |
+if __name__ == '__main__': |
+ parser = argparse.ArgumentParser( |
+ description='ComputeEngine Worker for Clovis') |
+ parser.add_argument('--config', required=True, |
+ help='Path to the configuration file.') |
+ args = parser.parse_args() |
- Args: |
- http_body: JSON dictionary. See README.md for a description of the format. |
+ # Configure logging. |
+ logging.basicConfig(level=logging.WARNING) |
+ worker_logger = logging.getLogger('worker') |
+ worker_logger.setLevel(logging.INFO) |
- Returns: |
- A string to be sent back to the client, describing the success status of |
- the request. |
- """ |
- if self._IsProcessingTasks(): |
- return 'Error: Already running\n' |
+ worker_logger.info('Reading configuration') |
+ with open(args.config) as config_json: |
+ worker = Worker(json.load(config_json), worker_logger) |
+ worker.Start() |
- load_parameters = json.loads(http_body) |
- try: |
- tasks = load_parameters['urls'] |
- except KeyError: |
- return 'Error: invalid urls\n' |
- # Optional parameters. |
- try: |
- repeat_count = int(load_parameters.get('repeat_count', '1')) |
- except ValueError: |
- return 'Error: invalid repeat_count\n' |
- emulate_device = load_parameters.get('emulate_device', '') |
- emulate_network = load_parameters.get('emulate_network', '') |
- |
- if len(tasks) == 0: |
- return 'Error: Empty task list\n' |
- else: |
- self._initial_task_count = len(tasks) |
- self._start_time = time.time() |
- self._thread = threading.Thread( |
- target = self._ProcessTasks, |
- args = (tasks, repeat_count, emulate_device, emulate_network)) |
- self._thread.start() |
- return 'Starting generation of %s tasks\n' % str(self._initial_task_count) |
- |
- def __call__(self, environ, start_response): |
- path = environ['PATH_INFO'] |
- |
- if path == '/set_tasks': |
- # Get the tasks from the HTTP body. |
- try: |
- body_size = int(environ.get('CONTENT_LENGTH', 0)) |
- except (ValueError): |
- body_size = 0 |
- body = environ['wsgi.input'].read(body_size) |
- data = self._SetTaskList(body) |
- elif path == '/test': |
- data = 'hello\n' |
- elif path == '/status': |
- if not self._IsProcessingTasks(): |
- data = 'Idle\n' |
- else: |
- task_count = self._GetCurrentTaskCount() |
- if task_count == 0: |
- data = '%s tasks complete. Finalizing.\n' % self._initial_task_count |
- else: |
- data = 'Remaining tasks: %s / %s\n' % ( |
- task_count, self._initial_task_count) |
- elapsed = time.time() - self._start_time |
- data += 'Elapsed time: %s seconds\n' % str(elapsed) |
- self._tasks_lock.acquire() |
- failed_tasks = self._failed_tasks |
- self._tasks_lock.release() |
- data += '%s failed tasks:\n' % len(failed_tasks) |
- data += json.dumps(failed_tasks, indent=2) |
- else: |
- start_response('404 NOT FOUND', [('Content-Length', '0')]) |
- return iter(['']) |
- |
- response_headers = [ |
- ('Content-type','text/plain'), |
- ('Content-Length', str(len(data))) |
- ] |
- start_response('200 OK', response_headers) |
- return iter([data]) |
- |
- |
-def StartApp(configuration_file): |
- return ServerApp(configuration_file) |