Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(33)

Unified Diff: tools/android/loading/cloud/backend/worker.py

Issue 1895033002: tools/android/loading Switch the GCE worker to pull queues (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@appengine
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/android/loading/cloud/backend/worker.py
diff --git a/tools/android/loading/gce/main.py b/tools/android/loading/cloud/backend/worker.py
similarity index 38%
rename from tools/android/loading/gce/main.py
rename to tools/android/loading/cloud/backend/worker.py
index 0ef2caece10f9050009a609a37f9fc15eead2c28..b9900cec4748382f753c9a8628e0b202ca865a4d 100644
--- a/tools/android/loading/gce/main.py
+++ b/tools/android/loading/cloud/backend/worker.py
@@ -2,77 +2,142 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
+import argparse
import json
+import logging
import os
import re
-import threading
-import time
-import subprocess
import sys
+import time
+
+from googleapiclient import discovery
+from oauth2client.client import GoogleCredentials
# NOTE: The parent directory needs to be first in sys.path to avoid conflicts
# with catapult modules that have colliding names, as catapult inserts itself
# into the path as the second element. This is an ugly and fragile hack.
sys.path.insert(0,
- os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir))
+ os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir,
+ os.pardir))
import controller
+from cloud.common.clovis_task import ClovisTask
from google_storage_accessor import GoogleStorageAccessor
import loading_trace
from loading_trace_database import LoadingTraceDatabase
import options
-class ServerApp(object):
- """Simple web server application, collecting traces and writing them in
- Google Cloud Storage.
- """
-
- def __init__(self, configuration_file):
- """|configuration_file| is a path to a file containing JSON as described in
- README.md.
- """
- self._tasks = [] # List of remaining tasks, only modified by _thread.
- self._failed_tasks = [] # Failed tasks, only modified by _thread.
- self._thread = None
- self._tasks_lock = threading.Lock() # Protects _tasks and _failed_tasks.
- self._initial_task_count = -1
- self._start_time = None
- print 'Reading configuration'
- with open(configuration_file) as config_json:
- config = json.load(config_json)
-
- # Separate the cloud storage path into the bucket and the base path under
- # the bucket.
- storage_path_components = config['cloud_storage_path'].split('/')
- self._bucket_name = storage_path_components[0]
- self._base_path_in_bucket = ''
- if len(storage_path_components) > 1:
- self._base_path_in_bucket = '/'.join(storage_path_components[1:])
- if not self._base_path_in_bucket.endswith('/'):
- self._base_path_in_bucket += '/'
-
- self._src_path = config['src_path']
- self._google_storage_accessor = GoogleStorageAccessor(
- project_name=config['project_name'], bucket_name=self._bucket_name)
+class Worker(object):
+ def __init__(self, config, logger):
+ """See README.md for the config format."""
+ self._project_name = config['project_name']
+ self._taskqueue_tag = config['taskqueue_tag']
+ self._credentials = GoogleCredentials.get_application_default()
+ self._logger = logger
+
+ # Separate the cloud storage path into the bucket and the base path under
+ # the bucket.
+ storage_path_components = config['cloud_storage_path'].split('/')
+ self._bucket_name = storage_path_components[0]
+ self._base_path_in_bucket = ''
+ if len(storage_path_components) > 1:
+ self._base_path_in_bucket = '/'.join(storage_path_components[1:])
+ if not self._base_path_in_bucket.endswith('/'):
+ self._base_path_in_bucket += '/'
+
+ # TODO: improve the trace database to support concurrent access.
+ self._traces_dir = self._base_path_in_bucket + 'traces/'
+ self._trace_database = LoadingTraceDatabase({})
+
+ self._src_path = config['src_path']
+ self._google_storage_accessor = GoogleStorageAccessor(
+ credentials=self._credentials, project_name=self._project_name,
+ bucket_name=self._bucket_name)
# Initialize the global options that will be used during trace generation.
options.OPTIONS.ParseArgs([])
options.OPTIONS.local_binary = config['chrome_path']
- def _IsProcessingTasks(self):
- """Returns True if the application is currently processing tasks."""
- return self._thread is not None and self._thread.is_alive()
+ def Start(self):
+ """Main worker loop.
+
+ Repeatedly pulls tasks from the task queue and processes them. Returns when
+ the queue is empty.
+ """
+ task_api = discovery.build('taskqueue', 'v1beta2',
+ credentials=self._credentials)
+ queue_name = 'clovis-queue'
+ # Workaround for
+ # https://code.google.com/p/googleappengine/issues/detail?id=10199
+ project = 's~' + self._project_name
+
+ while True:
+ self._logger.debug('Fetching new task.')
+ (clovis_task, task_id) = self._FetchClovisTask(project, task_api,
+ queue_name)
+ if not clovis_task:
+ if self._trace_database.ToJsonDict():
+ self._logger.info('No remaining tasks in the queue.')
+ break
+ else:
+ delay_seconds = 60
+ self._logger.info(
+ 'Nothing in the queue, retrying in %i seconds.' % delay_seconds)
+ time.sleep(delay_seconds)
+ continue
+
+ self._logger.info('Processing task %s' % task_id)
+ self._ProcessClovisTask(clovis_task)
+ self._logger.debug('Deleting task %s' % task_id)
+ task_api.tasks().delete(project=project, taskqueue=queue_name,
+ task=task_id).execute()
+ self._logger.info('Finished task %s' % task_id)
+ self._Finalize()
+
+ def _FetchClovisTask(self, project_name, task_api, queue_name):
+ """Fetches a ClovisTask from the task queue.
+
+ Params:
+ project_name(str): The name of the Google Cloud project.
+ task_api: The TaskQueue service.
+ queue_name(str): The name of the task queue.
+
+ Returns:
+ (ClovisTask, str): The fetched ClovisTask and its task ID, or (None, None)
+ if no tasks are found.
+ """
+ response = task_api.tasks().lease(
+ project=project_name, taskqueue=queue_name, numTasks=1, leaseSecs=180,
+ groupByTag=True, tag=self._taskqueue_tag).execute()
+ if (not response.get('items')) or (len(response['items']) < 1):
+ return (None, None)
+
+ google_task = response['items'][0]
+ task_id = google_task['id']
+ clovis_task = ClovisTask.FromBase64(google_task['payloadBase64'])
+ return (clovis_task, task_id)
+
+ def _Finalize(self):
+ """Called before exiting."""
+ self._logger.info('Uploading trace database')
+ self._google_storage_accessor.UploadString(
+ json.dumps(self._trace_database.ToJsonDict(), indent=2),
+ self._traces_dir + 'trace_database.json')
+ # TODO(droger): Implement automatic instance destruction.
+ self._logger.info('Done')
+
def _GenerateTrace(self, url, emulate_device, emulate_network, filename,
log_filename):
- """ Generates a trace on _thread.
+ """ Generates a trace.
Args:
url: URL as a string.
emulate_device: Name of the device to emulate. Empty for no emulation.
emulate_network: Type of network emulation. Empty for no emulation.
filename: Name of the file where the trace is saved.
- log_filename: Name of the file where standard output and errors are logged
+ log_filename: Name of the file where standard output and errors are
+ logged.
Returns:
A dictionary of metadata about the trace, including a 'succeeded' field
@@ -123,158 +188,67 @@ class ServerApp(object):
return trace_metadata
- def _GetCurrentTaskCount(self):
- """Returns the number of remaining tasks. Thread safe."""
- self._tasks_lock.acquire()
- task_count = len(self._tasks)
- self._tasks_lock.release()
- return task_count
+ def _ProcessClovisTask(self, clovis_task):
+ """Processes one clovis_task."""
+ if clovis_task.Action() != 'trace':
+ self._logger.error('Unsupported task action: %s' % clovis_task.Action())
+ return
- def _ProcessTasks(self, tasks, repeat_count, emulate_device, emulate_network):
- """Iterates over _task, generating a trace for each of them. Uploads the
- resulting traces to Google Cloud Storage. Runs on _thread.
+ # Extract the task parameters.
+ params = clovis_task.Params()
+ urls = params['urls']
+ repeat_count = params.get('repeat_count', 1)
+ emulate_device = params.get('emulate_device')
+ emulate_network = params.get('emulate_network')
- Args:
- tasks: The list of URLs to process.
- repeat_count: The number of traces generated for each URL.
- emulate_device: Name of the device to emulate. Empty for no emulation.
- emulate_network: Type of network emulation. Empty for no emulation.
- """
- # The main thread might be reading the task lists, take the lock to modify.
- self._tasks_lock.acquire()
- self._tasks = tasks
- self._failed_tasks = []
- self._tasks_lock.release()
failures_dir = self._base_path_in_bucket + 'failures/'
- traces_dir = self._base_path_in_bucket + 'traces/'
-
- trace_database = LoadingTraceDatabase({})
-
# TODO(blundell): Fix this up.
logs_dir = self._base_path_in_bucket + 'analyze_logs/'
log_filename = 'analyze.log'
# Avoid special characters in storage object names
pattern = re.compile(r"[#\?\[\]\*/]")
- while len(self._tasks) > 0:
- url = self._tasks[-1]
+
+ while len(urls) > 0:
+ url = urls.pop()
local_filename = pattern.sub('_', url)
for repeat in range(repeat_count):
- print 'Generating trace for URL: %s' % url
+ self._logger.debug('Generating trace for URL: %s' % url)
remote_filename = local_filename + '/' + str(repeat)
trace_metadata = self._GenerateTrace(
url, emulate_device, emulate_network, local_filename, log_filename)
if trace_metadata['succeeded']:
- print 'Uploading: %s' % remote_filename
- remote_trace_location = traces_dir + remote_filename
+ self._logger.debug('Uploading: %s' % remote_filename)
+ remote_trace_location = self._traces_dir + remote_filename
self._google_storage_accessor.UploadFile(local_filename,
- remote_trace_location)
+ remote_trace_location)
full_cloud_storage_path = ('gs://' + self._bucket_name + '/' +
remote_trace_location)
- trace_database.AddTrace(full_cloud_storage_path, trace_metadata)
+ self._trace_database.AddTrace(full_cloud_storage_path, trace_metadata)
else:
- print 'Trace generation failed for URL: %s' % url
- self._tasks_lock.acquire()
- self._failed_tasks.append({ "url": url, "repeat": repeat})
- self._tasks_lock.release()
+ self._logger.warning('Trace generation failed for URL: %s' % url)
+ # TODO: upload the failure
if os.path.isfile(local_filename):
self._google_storage_accessor.UploadFile(local_filename,
failures_dir + remote_filename)
- print 'Uploading log'
+ self._logger.debug('Uploading log')
self._google_storage_accessor.UploadFile(log_filename,
- logs_dir + remote_filename)
- # Pop once task is finished, for accurate status tracking.
- self._tasks_lock.acquire()
- url = self._tasks.pop()
- self._tasks_lock.release()
-
- self._google_storage_accessor.UploadString(
- json.dumps(trace_database.ToJsonDict(), indent=2),
- traces_dir + 'trace_database.json')
+ logs_dir + remote_filename)
- if len(self._failed_tasks) > 0:
- print 'Uploading failing URLs'
- self._google_storage_accessor.UploadString(
- json.dumps(self._failed_tasks, indent=2),
- failures_dir + 'failures.json')
- def _SetTaskList(self, http_body):
- """Sets the list of tasks and starts processing them
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(
+ description='ComputeEngine Worker for Clovis')
+ parser.add_argument('--config', required=True,
+ help='Path to the configuration file.')
+ args = parser.parse_args()
- Args:
- http_body: JSON dictionary. See README.md for a description of the format.
+ # Configure logging.
+ logging.basicConfig(level=logging.WARNING)
+ worker_logger = logging.getLogger('worker')
+ worker_logger.setLevel(logging.INFO)
- Returns:
- A string to be sent back to the client, describing the success status of
- the request.
- """
- if self._IsProcessingTasks():
- return 'Error: Already running\n'
+ worker_logger.info('Reading configuration')
+ with open(args.config) as config_json:
+ worker = Worker(json.load(config_json), worker_logger)
+ worker.Start()
- load_parameters = json.loads(http_body)
- try:
- tasks = load_parameters['urls']
- except KeyError:
- return 'Error: invalid urls\n'
- # Optional parameters.
- try:
- repeat_count = int(load_parameters.get('repeat_count', '1'))
- except ValueError:
- return 'Error: invalid repeat_count\n'
- emulate_device = load_parameters.get('emulate_device', '')
- emulate_network = load_parameters.get('emulate_network', '')
-
- if len(tasks) == 0:
- return 'Error: Empty task list\n'
- else:
- self._initial_task_count = len(tasks)
- self._start_time = time.time()
- self._thread = threading.Thread(
- target = self._ProcessTasks,
- args = (tasks, repeat_count, emulate_device, emulate_network))
- self._thread.start()
- return 'Starting generation of %s tasks\n' % str(self._initial_task_count)
-
- def __call__(self, environ, start_response):
- path = environ['PATH_INFO']
-
- if path == '/set_tasks':
- # Get the tasks from the HTTP body.
- try:
- body_size = int(environ.get('CONTENT_LENGTH', 0))
- except (ValueError):
- body_size = 0
- body = environ['wsgi.input'].read(body_size)
- data = self._SetTaskList(body)
- elif path == '/test':
- data = 'hello\n'
- elif path == '/status':
- if not self._IsProcessingTasks():
- data = 'Idle\n'
- else:
- task_count = self._GetCurrentTaskCount()
- if task_count == 0:
- data = '%s tasks complete. Finalizing.\n' % self._initial_task_count
- else:
- data = 'Remaining tasks: %s / %s\n' % (
- task_count, self._initial_task_count)
- elapsed = time.time() - self._start_time
- data += 'Elapsed time: %s seconds\n' % str(elapsed)
- self._tasks_lock.acquire()
- failed_tasks = self._failed_tasks
- self._tasks_lock.release()
- data += '%s failed tasks:\n' % len(failed_tasks)
- data += json.dumps(failed_tasks, indent=2)
- else:
- start_response('404 NOT FOUND', [('Content-Length', '0')])
- return iter([''])
-
- response_headers = [
- ('Content-type','text/plain'),
- ('Content-Length', str(len(data)))
- ]
- start_response('200 OK', response_headers)
- return iter([data])
-
-
-def StartApp(configuration_file):
- return ServerApp(configuration_file)
« no previous file with comments | « tools/android/loading/cloud/backend/startup-script.sh ('k') | tools/android/loading/cloud/frontend/lib/common » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698