Index: tools/android/loading/cloud/backend/worker.py |
diff --git a/tools/android/loading/cloud/backend/worker.py b/tools/android/loading/cloud/backend/worker.py |
index b9900cec4748382f753c9a8628e0b202ca865a4d..99f4d1ff093ed0e533eeebb6ea00f1c4776dd57c 100644 |
--- a/tools/android/loading/cloud/backend/worker.py |
+++ b/tools/android/loading/cloud/backend/worker.py |
@@ -32,6 +32,7 @@ class Worker(object): |
"""See README.md for the config format.""" |
self._project_name = config['project_name'] |
self._taskqueue_tag = config['taskqueue_tag'] |
+ self._src_path = config['src_path'] |
self._credentials = GoogleCredentials.get_application_default() |
self._logger = logger |
@@ -45,15 +46,17 @@ class Worker(object): |
if not self._base_path_in_bucket.endswith('/'): |
self._base_path_in_bucket += '/' |
- # TODO: improve the trace database to support concurrent access. |
- self._traces_dir = self._base_path_in_bucket + 'traces/' |
- self._trace_database = LoadingTraceDatabase({}) |
- |
- self._src_path = config['src_path'] |
self._google_storage_accessor = GoogleStorageAccessor( |
credentials=self._credentials, project_name=self._project_name, |
bucket_name=self._bucket_name) |
+ self._traces_dir = self._base_path_in_bucket + 'traces/' |
Benoit L
2016/04/20 15:02:22
Generally speaking, I prefer to manipulate paths w
droger
2016/04/20 15:58:17
Done.
|
+ self._trace_database_path = self._traces_dir + config.get( |
+ 'trace_database_filename', 'trace_database.json') |
Benoit L
2016/04/20 15:02:22
Is the trace dir per worker?
If not, then the trac
droger
2016/04/20 15:58:17
It is intended that all worker write to the same d
|
+ |
+ # Recover any existing trace database in case the worker died. |
+ self._DownloadTraceDatabase() |
+ |
# Initialize the global options that will be used during trace generation. |
options.OPTIONS.ParseArgs([]) |
options.OPTIONS.local_binary = config['chrome_path'] |
@@ -94,6 +97,24 @@ class Worker(object): |
self._logger.info('Finished task %s' % task_id) |
self._Finalize() |
+ def _DownloadTraceDatabase(self): |
+ """Downloads the trace database from CloudStorage.""" |
+ trace_database_string = self._google_storage_accessor.DownloadAsString( |
Benoit L
2016/04/20 15:02:22
What about:
trace_db_str = self._google_storage_a
droger
2016/04/20 15:58:17
Done.
|
+ self._trace_database_path) |
+ trace_database_dict = {} |
+ if trace_database_string: |
+ self._logger.warning( |
+ 'Importing existing trace database %s' % self._trace_database_path) |
+ trace_database_dict = json.loads(trace_database_string) |
+ self._trace_database = LoadingTraceDatabase(trace_database_dict) |
+ |
+ def _UploadTraceDatabase(self): |
+ """Uploads the trace database to CloudStorage.""" |
+ self._logger.info('Uploading trace database') |
Benoit L
2016/04/20 15:02:22
tiny nit: Importing is "warning", uploading "info"
droger
2016/04/20 15:58:17
Done.
|
+ self._google_storage_accessor.UploadString( |
+ json.dumps(self._trace_database.ToJsonDict(), indent=2), |
+ self._trace_database_path) |
+ |
def _FetchClovisTask(self, project_name, task_api, queue_name): |
"""Fetches a ClovisTask from the task queue. |
@@ -119,14 +140,9 @@ class Worker(object): |
def _Finalize(self): |
"""Called before exiting.""" |
- self._logger.info('Uploading trace database') |
- self._google_storage_accessor.UploadString( |
- json.dumps(self._trace_database.ToJsonDict(), indent=2), |
- self._traces_dir + 'trace_database.json') |
# TODO(droger): Implement automatic instance destruction. |
self._logger.info('Done') |
- |
def _GenerateTrace(self, url, emulate_device, emulate_network, filename, |
log_filename): |
""" Generates a trace. |
@@ -151,7 +167,6 @@ class Worker(object): |
if not url.startswith('http') and not url.startswith('file'): |
url = 'http://' + url |
- old_stdout = sys.stdout |
old_stderr = sys.stderr |
trace_metadata = { 'succeeded' : False, 'url' : url } |
@@ -183,7 +198,6 @@ class Worker(object): |
with open(filename, 'w') as f: |
json.dump(trace.ToJsonDict(), f, sort_keys=True, indent=2) |
- sys.stdout = old_stdout |
sys.stderr = old_stderr |
return trace_metadata |
@@ -223,17 +237,16 @@ class Worker(object): |
remote_trace_location) |
full_cloud_storage_path = ('gs://' + self._bucket_name + '/' + |
remote_trace_location) |
- self._trace_database.AddTrace(full_cloud_storage_path, trace_metadata) |
+ self._trace_database.SetTrace(full_cloud_storage_path, trace_metadata) |
else: |
self._logger.warning('Trace generation failed for URL: %s' % url) |
- # TODO: upload the failure |
if os.path.isfile(local_filename): |
- self._google_storage_accessor.UploadFile(local_filename, |
- failures_dir + remote_filename) |
- self._logger.debug('Uploading log') |
+ self._google_storage_accessor.UploadFile( |
+ local_filename, failures_dir + remote_filename) |
+ self._logger.debug('Uploading analyze log') |
self._google_storage_accessor.UploadFile(log_filename, |
logs_dir + remote_filename) |
- |
+ self._UploadTraceDatabase() |
if __name__ == '__main__': |
parser = argparse.ArgumentParser( |