| Index: tools/android/loading/cloud/backend/worker.py
|
| diff --git a/tools/android/loading/gce/main.py b/tools/android/loading/cloud/backend/worker.py
|
| similarity index 38%
|
| rename from tools/android/loading/gce/main.py
|
| rename to tools/android/loading/cloud/backend/worker.py
|
| index 0ef2caece10f9050009a609a37f9fc15eead2c28..b9900cec4748382f753c9a8628e0b202ca865a4d 100644
|
| --- a/tools/android/loading/gce/main.py
|
| +++ b/tools/android/loading/cloud/backend/worker.py
|
| @@ -2,77 +2,142 @@
|
| # Use of this source code is governed by a BSD-style license that can be
|
| # found in the LICENSE file.
|
|
|
| +import argparse
|
| import json
|
| +import logging
|
| import os
|
| import re
|
| -import threading
|
| -import time
|
| -import subprocess
|
| import sys
|
| +import time
|
| +
|
| +from googleapiclient import discovery
|
| +from oauth2client.client import GoogleCredentials
|
|
|
| # NOTE: The parent directory needs to be first in sys.path to avoid conflicts
|
| # with catapult modules that have colliding names, as catapult inserts itself
|
| # into the path as the second element. This is an ugly and fragile hack.
|
| sys.path.insert(0,
|
| - os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir))
|
| + os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir,
|
| + os.pardir))
|
| import controller
|
| +from cloud.common.clovis_task import ClovisTask
|
| from google_storage_accessor import GoogleStorageAccessor
|
| import loading_trace
|
| from loading_trace_database import LoadingTraceDatabase
|
| import options
|
|
|
|
|
| -class ServerApp(object):
|
| - """Simple web server application, collecting traces and writing them in
|
| - Google Cloud Storage.
|
| - """
|
| -
|
| - def __init__(self, configuration_file):
|
| - """|configuration_file| is a path to a file containing JSON as described in
|
| - README.md.
|
| - """
|
| - self._tasks = [] # List of remaining tasks, only modified by _thread.
|
| - self._failed_tasks = [] # Failed tasks, only modified by _thread.
|
| - self._thread = None
|
| - self._tasks_lock = threading.Lock() # Protects _tasks and _failed_tasks.
|
| - self._initial_task_count = -1
|
| - self._start_time = None
|
| - print 'Reading configuration'
|
| - with open(configuration_file) as config_json:
|
| - config = json.load(config_json)
|
| -
|
| - # Separate the cloud storage path into the bucket and the base path under
|
| - # the bucket.
|
| - storage_path_components = config['cloud_storage_path'].split('/')
|
| - self._bucket_name = storage_path_components[0]
|
| - self._base_path_in_bucket = ''
|
| - if len(storage_path_components) > 1:
|
| - self._base_path_in_bucket = '/'.join(storage_path_components[1:])
|
| - if not self._base_path_in_bucket.endswith('/'):
|
| - self._base_path_in_bucket += '/'
|
| -
|
| - self._src_path = config['src_path']
|
| - self._google_storage_accessor = GoogleStorageAccessor(
|
| - project_name=config['project_name'], bucket_name=self._bucket_name)
|
| +class Worker(object):
|
| + def __init__(self, config, logger):
|
| + """See README.md for the config format."""
|
| + self._project_name = config['project_name']
|
| + self._taskqueue_tag = config['taskqueue_tag']
|
| + self._credentials = GoogleCredentials.get_application_default()
|
| + self._logger = logger
|
| +
|
| + # Separate the cloud storage path into the bucket and the base path under
|
| + # the bucket.
|
| + storage_path_components = config['cloud_storage_path'].split('/')
|
| + self._bucket_name = storage_path_components[0]
|
| + self._base_path_in_bucket = ''
|
| + if len(storage_path_components) > 1:
|
| + self._base_path_in_bucket = '/'.join(storage_path_components[1:])
|
| + if not self._base_path_in_bucket.endswith('/'):
|
| + self._base_path_in_bucket += '/'
|
| +
|
| + # TODO: improve the trace database to support concurrent access.
|
| + self._traces_dir = self._base_path_in_bucket + 'traces/'
|
| + self._trace_database = LoadingTraceDatabase({})
|
| +
|
| + self._src_path = config['src_path']
|
| + self._google_storage_accessor = GoogleStorageAccessor(
|
| + credentials=self._credentials, project_name=self._project_name,
|
| + bucket_name=self._bucket_name)
|
|
|
| # Initialize the global options that will be used during trace generation.
|
| options.OPTIONS.ParseArgs([])
|
| options.OPTIONS.local_binary = config['chrome_path']
|
|
|
| - def _IsProcessingTasks(self):
|
| - """Returns True if the application is currently processing tasks."""
|
| - return self._thread is not None and self._thread.is_alive()
|
| + def Start(self):
|
| + """Main worker loop.
|
| +
|
| + Repeatedly pulls tasks from the task queue and processes them. Returns when
|
| + the queue is empty.
|
| + """
|
| + task_api = discovery.build('taskqueue', 'v1beta2',
|
| + credentials=self._credentials)
|
| + queue_name = 'clovis-queue'
|
| + # Workaround for
|
| + # https://code.google.com/p/googleappengine/issues/detail?id=10199
|
| + project = 's~' + self._project_name
|
| +
|
| + while True:
|
| + self._logger.debug('Fetching new task.')
|
| + (clovis_task, task_id) = self._FetchClovisTask(project, task_api,
|
| + queue_name)
|
| + if not clovis_task:
|
| + if self._trace_database.ToJsonDict():
|
| + self._logger.info('No remaining tasks in the queue.')
|
| + break
|
| + else:
|
| + delay_seconds = 60
|
| + self._logger.info(
|
| + 'Nothing in the queue, retrying in %i seconds.' % delay_seconds)
|
| + time.sleep(delay_seconds)
|
| + continue
|
| +
|
| + self._logger.info('Processing task %s' % task_id)
|
| + self._ProcessClovisTask(clovis_task)
|
| + self._logger.debug('Deleting task %s' % task_id)
|
| + task_api.tasks().delete(project=project, taskqueue=queue_name,
|
| + task=task_id).execute()
|
| + self._logger.info('Finished task %s' % task_id)
|
| + self._Finalize()
|
| +
|
| + def _FetchClovisTask(self, project_name, task_api, queue_name):
|
| + """Fetches a ClovisTask from the task queue.
|
| +
|
| + Params:
|
| + project_name(str): The name of the Google Cloud project.
|
| + task_api: The TaskQueue service.
|
| + queue_name(str): The name of the task queue.
|
| +
|
| + Returns:
|
| + (ClovisTask, str): The fetched ClovisTask and its task ID, or (None, None)
|
| + if no tasks are found.
|
| + """
|
| + response = task_api.tasks().lease(
|
| + project=project_name, taskqueue=queue_name, numTasks=1, leaseSecs=180,
|
| + groupByTag=True, tag=self._taskqueue_tag).execute()
|
| + if (not response.get('items')) or (len(response['items']) < 1):
|
| + return (None, None)
|
| +
|
| + google_task = response['items'][0]
|
| + task_id = google_task['id']
|
| + clovis_task = ClovisTask.FromBase64(google_task['payloadBase64'])
|
| + return (clovis_task, task_id)
|
| +
|
| + def _Finalize(self):
|
| + """Called before exiting."""
|
| + self._logger.info('Uploading trace database')
|
| + self._google_storage_accessor.UploadString(
|
| + json.dumps(self._trace_database.ToJsonDict(), indent=2),
|
| + self._traces_dir + 'trace_database.json')
|
| + # TODO(droger): Implement automatic instance destruction.
|
| + self._logger.info('Done')
|
| +
|
|
|
| def _GenerateTrace(self, url, emulate_device, emulate_network, filename,
|
| log_filename):
|
| - """ Generates a trace on _thread.
|
| + """ Generates a trace.
|
|
|
| Args:
|
| url: URL as a string.
|
| emulate_device: Name of the device to emulate. Empty for no emulation.
|
| emulate_network: Type of network emulation. Empty for no emulation.
|
| filename: Name of the file where the trace is saved.
|
| - log_filename: Name of the file where standard output and errors are logged
|
| + log_filename: Name of the file where standard output and errors are
|
| + logged.
|
|
|
| Returns:
|
| A dictionary of metadata about the trace, including a 'succeeded' field
|
| @@ -123,158 +188,67 @@ class ServerApp(object):
|
|
|
| return trace_metadata
|
|
|
| - def _GetCurrentTaskCount(self):
|
| - """Returns the number of remaining tasks. Thread safe."""
|
| - self._tasks_lock.acquire()
|
| - task_count = len(self._tasks)
|
| - self._tasks_lock.release()
|
| - return task_count
|
| + def _ProcessClovisTask(self, clovis_task):
|
| + """Processes one clovis_task."""
|
| + if clovis_task.Action() != 'trace':
|
| + self._logger.error('Unsupported task action: %s' % clovis_task.Action())
|
| + return
|
|
|
| - def _ProcessTasks(self, tasks, repeat_count, emulate_device, emulate_network):
|
| - """Iterates over _task, generating a trace for each of them. Uploads the
|
| - resulting traces to Google Cloud Storage. Runs on _thread.
|
| + # Extract the task parameters.
|
| + params = clovis_task.Params()
|
| + urls = params['urls']
|
| + repeat_count = params.get('repeat_count', 1)
|
| + emulate_device = params.get('emulate_device')
|
| + emulate_network = params.get('emulate_network')
|
|
|
| - Args:
|
| - tasks: The list of URLs to process.
|
| - repeat_count: The number of traces generated for each URL.
|
| - emulate_device: Name of the device to emulate. Empty for no emulation.
|
| - emulate_network: Type of network emulation. Empty for no emulation.
|
| - """
|
| - # The main thread might be reading the task lists, take the lock to modify.
|
| - self._tasks_lock.acquire()
|
| - self._tasks = tasks
|
| - self._failed_tasks = []
|
| - self._tasks_lock.release()
|
| failures_dir = self._base_path_in_bucket + 'failures/'
|
| - traces_dir = self._base_path_in_bucket + 'traces/'
|
| -
|
| - trace_database = LoadingTraceDatabase({})
|
| -
|
| # TODO(blundell): Fix this up.
|
| logs_dir = self._base_path_in_bucket + 'analyze_logs/'
|
| log_filename = 'analyze.log'
|
| # Avoid special characters in storage object names
|
| pattern = re.compile(r"[#\?\[\]\*/]")
|
| - while len(self._tasks) > 0:
|
| - url = self._tasks[-1]
|
| +
|
| + while len(urls) > 0:
|
| + url = urls.pop()
|
| local_filename = pattern.sub('_', url)
|
| for repeat in range(repeat_count):
|
| - print 'Generating trace for URL: %s' % url
|
| + self._logger.debug('Generating trace for URL: %s' % url)
|
| remote_filename = local_filename + '/' + str(repeat)
|
| trace_metadata = self._GenerateTrace(
|
| url, emulate_device, emulate_network, local_filename, log_filename)
|
| if trace_metadata['succeeded']:
|
| - print 'Uploading: %s' % remote_filename
|
| - remote_trace_location = traces_dir + remote_filename
|
| + self._logger.debug('Uploading: %s' % remote_filename)
|
| + remote_trace_location = self._traces_dir + remote_filename
|
| self._google_storage_accessor.UploadFile(local_filename,
|
| - remote_trace_location)
|
| + remote_trace_location)
|
| full_cloud_storage_path = ('gs://' + self._bucket_name + '/' +
|
| remote_trace_location)
|
| - trace_database.AddTrace(full_cloud_storage_path, trace_metadata)
|
| + self._trace_database.AddTrace(full_cloud_storage_path, trace_metadata)
|
| else:
|
| - print 'Trace generation failed for URL: %s' % url
|
| - self._tasks_lock.acquire()
|
| - self._failed_tasks.append({ "url": url, "repeat": repeat})
|
| - self._tasks_lock.release()
|
| + self._logger.warning('Trace generation failed for URL: %s' % url)
|
| + # TODO: upload the failure
|
| if os.path.isfile(local_filename):
|
| self._google_storage_accessor.UploadFile(local_filename,
|
| failures_dir + remote_filename)
|
| - print 'Uploading log'
|
| + self._logger.debug('Uploading log')
|
| self._google_storage_accessor.UploadFile(log_filename,
|
| - logs_dir + remote_filename)
|
| - # Pop once task is finished, for accurate status tracking.
|
| - self._tasks_lock.acquire()
|
| - url = self._tasks.pop()
|
| - self._tasks_lock.release()
|
| -
|
| - self._google_storage_accessor.UploadString(
|
| - json.dumps(trace_database.ToJsonDict(), indent=2),
|
| - traces_dir + 'trace_database.json')
|
| + logs_dir + remote_filename)
|
|
|
| - if len(self._failed_tasks) > 0:
|
| - print 'Uploading failing URLs'
|
| - self._google_storage_accessor.UploadString(
|
| - json.dumps(self._failed_tasks, indent=2),
|
| - failures_dir + 'failures.json')
|
|
|
| - def _SetTaskList(self, http_body):
|
| - """Sets the list of tasks and starts processing them
|
| +if __name__ == '__main__':
|
| + parser = argparse.ArgumentParser(
|
| + description='ComputeEngine Worker for Clovis')
|
| + parser.add_argument('--config', required=True,
|
| + help='Path to the configuration file.')
|
| + args = parser.parse_args()
|
|
|
| - Args:
|
| - http_body: JSON dictionary. See README.md for a description of the format.
|
| + # Configure logging.
|
| + logging.basicConfig(level=logging.WARNING)
|
| + worker_logger = logging.getLogger('worker')
|
| + worker_logger.setLevel(logging.INFO)
|
|
|
| - Returns:
|
| - A string to be sent back to the client, describing the success status of
|
| - the request.
|
| - """
|
| - if self._IsProcessingTasks():
|
| - return 'Error: Already running\n'
|
| + worker_logger.info('Reading configuration')
|
| + with open(args.config) as config_json:
|
| + worker = Worker(json.load(config_json), worker_logger)
|
| + worker.Start()
|
|
|
| - load_parameters = json.loads(http_body)
|
| - try:
|
| - tasks = load_parameters['urls']
|
| - except KeyError:
|
| - return 'Error: invalid urls\n'
|
| - # Optional parameters.
|
| - try:
|
| - repeat_count = int(load_parameters.get('repeat_count', '1'))
|
| - except ValueError:
|
| - return 'Error: invalid repeat_count\n'
|
| - emulate_device = load_parameters.get('emulate_device', '')
|
| - emulate_network = load_parameters.get('emulate_network', '')
|
| -
|
| - if len(tasks) == 0:
|
| - return 'Error: Empty task list\n'
|
| - else:
|
| - self._initial_task_count = len(tasks)
|
| - self._start_time = time.time()
|
| - self._thread = threading.Thread(
|
| - target = self._ProcessTasks,
|
| - args = (tasks, repeat_count, emulate_device, emulate_network))
|
| - self._thread.start()
|
| - return 'Starting generation of %s tasks\n' % str(self._initial_task_count)
|
| -
|
| - def __call__(self, environ, start_response):
|
| - path = environ['PATH_INFO']
|
| -
|
| - if path == '/set_tasks':
|
| - # Get the tasks from the HTTP body.
|
| - try:
|
| - body_size = int(environ.get('CONTENT_LENGTH', 0))
|
| - except (ValueError):
|
| - body_size = 0
|
| - body = environ['wsgi.input'].read(body_size)
|
| - data = self._SetTaskList(body)
|
| - elif path == '/test':
|
| - data = 'hello\n'
|
| - elif path == '/status':
|
| - if not self._IsProcessingTasks():
|
| - data = 'Idle\n'
|
| - else:
|
| - task_count = self._GetCurrentTaskCount()
|
| - if task_count == 0:
|
| - data = '%s tasks complete. Finalizing.\n' % self._initial_task_count
|
| - else:
|
| - data = 'Remaining tasks: %s / %s\n' % (
|
| - task_count, self._initial_task_count)
|
| - elapsed = time.time() - self._start_time
|
| - data += 'Elapsed time: %s seconds\n' % str(elapsed)
|
| - self._tasks_lock.acquire()
|
| - failed_tasks = self._failed_tasks
|
| - self._tasks_lock.release()
|
| - data += '%s failed tasks:\n' % len(failed_tasks)
|
| - data += json.dumps(failed_tasks, indent=2)
|
| - else:
|
| - start_response('404 NOT FOUND', [('Content-Length', '0')])
|
| - return iter([''])
|
| -
|
| - response_headers = [
|
| - ('Content-type','text/plain'),
|
| - ('Content-Length', str(len(data)))
|
| - ]
|
| - start_response('200 OK', response_headers)
|
| - return iter([data])
|
| -
|
| -
|
| -def StartApp(configuration_file):
|
| - return ServerApp(configuration_file)
|
|
|