Index: tools/android/loading/cloud/backend/worker.py |
diff --git a/tools/android/loading/cloud/backend/worker.py b/tools/android/loading/cloud/backend/worker.py |
index b9900cec4748382f753c9a8628e0b202ca865a4d..95181718241455d2899d3368d22e270909d1f17b 100644 |
--- a/tools/android/loading/cloud/backend/worker.py |
+++ b/tools/android/loading/cloud/backend/worker.py |
@@ -32,6 +32,7 @@ class Worker(object): |
"""See README.md for the config format.""" |
self._project_name = config['project_name'] |
self._taskqueue_tag = config['taskqueue_tag'] |
+ self._src_path = config['src_path'] |
self._credentials = GoogleCredentials.get_application_default() |
self._logger = logger |
@@ -45,15 +46,18 @@ class Worker(object): |
if not self._base_path_in_bucket.endswith('/'): |
self._base_path_in_bucket += '/' |
- # TODO: improve the trace database to support concurrent access. |
- self._traces_dir = self._base_path_in_bucket + 'traces/' |
- self._trace_database = LoadingTraceDatabase({}) |
- |
- self._src_path = config['src_path'] |
self._google_storage_accessor = GoogleStorageAccessor( |
credentials=self._credentials, project_name=self._project_name, |
bucket_name=self._bucket_name) |
+ self._traces_dir = os.path.join(self._base_path_in_bucket, 'traces') |
+ self._trace_database_path = os.path.join( |
+ self._traces_dir, |
+ config.get('trace_database_filename', 'trace_database.json')) |
+ |
+ # Recover any existing trace database in case the worker died. |
+ self._DownloadTraceDatabase() |
+ |
# Initialize the global options that will be used during trace generation. |
options.OPTIONS.ParseArgs([]) |
options.OPTIONS.local_binary = config['chrome_path'] |
@@ -94,6 +98,21 @@ class Worker(object): |
self._logger.info('Finished task %s' % task_id) |
self._Finalize() |
+ def _DownloadTraceDatabase(self): |
+ """Downloads the trace database from CloudStorage.""" |
+ self._logger.info('Downloading trace database') |
+ trace_database_string = self._google_storage_accessor.DownloadAsString( |
+ self._trace_database_path) or '{}' |
+ trace_database_dict = json.loads(trace_database_string) |
+ self._trace_database = LoadingTraceDatabase(trace_database_dict) |
+ |
+ def _UploadTraceDatabase(self): |
+ """Uploads the trace database to CloudStorage.""" |
+ self._logger.info('Uploading trace database') |
+ self._google_storage_accessor.UploadString( |
+ json.dumps(self._trace_database.ToJsonDict(), indent=2), |
+ self._trace_database_path) |
+ |
def _FetchClovisTask(self, project_name, task_api, queue_name): |
"""Fetches a ClovisTask from the task queue. |
@@ -119,14 +138,9 @@ class Worker(object): |
def _Finalize(self): |
"""Called before exiting.""" |
- self._logger.info('Uploading trace database') |
- self._google_storage_accessor.UploadString( |
- json.dumps(self._trace_database.ToJsonDict(), indent=2), |
- self._traces_dir + 'trace_database.json') |
# TODO(droger): Implement automatic instance destruction. |
self._logger.info('Done') |
- |
def _GenerateTrace(self, url, emulate_device, emulate_network, filename, |
log_filename): |
""" Generates a trace. |
@@ -201,9 +215,9 @@ class Worker(object): |
emulate_device = params.get('emulate_device') |
emulate_network = params.get('emulate_network') |
- failures_dir = self._base_path_in_bucket + 'failures/' |
+ failures_dir = os.path.join(self._base_path_in_bucket, 'failures') |
# TODO(blundell): Fix this up. |
- logs_dir = self._base_path_in_bucket + 'analyze_logs/' |
+ logs_dir = os.path.join(self._base_path_in_bucket, 'analyze_logs') |
log_filename = 'analyze.log' |
# Avoid special characters in storage object names |
pattern = re.compile(r"[#\?\[\]\*/]") |
@@ -213,27 +227,27 @@ class Worker(object): |
local_filename = pattern.sub('_', url) |
for repeat in range(repeat_count): |
self._logger.debug('Generating trace for URL: %s' % url) |
- remote_filename = local_filename + '/' + str(repeat) |
+ remote_filename = os.path.join(local_filename, str(repeat)) |
trace_metadata = self._GenerateTrace( |
url, emulate_device, emulate_network, local_filename, log_filename) |
if trace_metadata['succeeded']: |
self._logger.debug('Uploading: %s' % remote_filename) |
- remote_trace_location = self._traces_dir + remote_filename |
+ remote_trace_location = os.path.join(self._traces_dir, |
+ remote_filename) |
self._google_storage_accessor.UploadFile(local_filename, |
remote_trace_location) |
- full_cloud_storage_path = ('gs://' + self._bucket_name + '/' + |
- remote_trace_location) |
- self._trace_database.AddTrace(full_cloud_storage_path, trace_metadata) |
+ full_cloud_storage_path = os.path.join('gs://' + self._bucket_name, |
+ remote_trace_location) |
+ self._trace_database.SetTrace(full_cloud_storage_path, trace_metadata) |
else: |
self._logger.warning('Trace generation failed for URL: %s' % url) |
- # TODO: upload the failure |
if os.path.isfile(local_filename): |
- self._google_storage_accessor.UploadFile(local_filename, |
- failures_dir + remote_filename) |
- self._logger.debug('Uploading log') |
- self._google_storage_accessor.UploadFile(log_filename, |
- logs_dir + remote_filename) |
- |
+ self._google_storage_accessor.UploadFile( |
+ local_filename, os.path.join(failures_dir, remote_filename)) |
+ self._logger.debug('Uploading analyze log') |
+ self._google_storage_accessor.UploadFile( |
+ log_filename, os.path.join(logs_dir, remote_filename)) |
+ self._UploadTraceDatabase() |
if __name__ == '__main__': |
parser = argparse.ArgumentParser( |