Chromium Code Reviews| Index: tools/android/loading/cloud/backend/worker.py |
| diff --git a/tools/android/loading/cloud/backend/worker.py b/tools/android/loading/cloud/backend/worker.py |
| index b9900cec4748382f753c9a8628e0b202ca865a4d..99f4d1ff093ed0e533eeebb6ea00f1c4776dd57c 100644 |
| --- a/tools/android/loading/cloud/backend/worker.py |
| +++ b/tools/android/loading/cloud/backend/worker.py |
| @@ -32,6 +32,7 @@ class Worker(object): |
| """See README.md for the config format.""" |
| self._project_name = config['project_name'] |
| self._taskqueue_tag = config['taskqueue_tag'] |
| + self._src_path = config['src_path'] |
| self._credentials = GoogleCredentials.get_application_default() |
| self._logger = logger |
| @@ -45,15 +46,17 @@ class Worker(object): |
| if not self._base_path_in_bucket.endswith('/'): |
| self._base_path_in_bucket += '/' |
| - # TODO: improve the trace database to support concurrent access. |
| - self._traces_dir = self._base_path_in_bucket + 'traces/' |
| - self._trace_database = LoadingTraceDatabase({}) |
| - |
| - self._src_path = config['src_path'] |
| self._google_storage_accessor = GoogleStorageAccessor( |
| credentials=self._credentials, project_name=self._project_name, |
| bucket_name=self._bucket_name) |
| + self._traces_dir = self._base_path_in_bucket + 'traces/' |
|
Benoit L
2016/04/20 15:02:22
Generally speaking, I prefer to manipulate paths w
droger
2016/04/20 15:58:17
Done.
|
| + self._trace_database_path = self._traces_dir + config.get( |
| + 'trace_database_filename', 'trace_database.json') |
|
Benoit L
2016/04/20 15:02:22
Is the trace dir per worker?
If not, then the trac
droger
2016/04/20 15:58:17
It is intended that all worker write to the same d
|
| + |
| + # Recover any existing trace database in case the worker died. |
| + self._DownloadTraceDatabase() |
| + |
| # Initialize the global options that will be used during trace generation. |
| options.OPTIONS.ParseArgs([]) |
| options.OPTIONS.local_binary = config['chrome_path'] |
| @@ -94,6 +97,24 @@ class Worker(object): |
| self._logger.info('Finished task %s' % task_id) |
| self._Finalize() |
| + def _DownloadTraceDatabase(self): |
| + """Downloads the trace database from CloudStorage.""" |
| + trace_database_string = self._google_storage_accessor.DownloadAsString( |
|
Benoit L
2016/04/20 15:02:22
What about:
trace_db_str = self._google_storage_a
droger
2016/04/20 15:58:17
Done.
|
| + self._trace_database_path) |
| + trace_database_dict = {} |
| + if trace_database_string: |
| + self._logger.warning( |
| + 'Importing existing trace database %s' % self._trace_database_path) |
| + trace_database_dict = json.loads(trace_database_string) |
| + self._trace_database = LoadingTraceDatabase(trace_database_dict) |
| + |
| + def _UploadTraceDatabase(self): |
| + """Uploads the trace database to CloudStorage.""" |
| + self._logger.info('Uploading trace database') |
|
Benoit L
2016/04/20 15:02:22
tiny nit: Importing is "warning", uploading "info"
droger
2016/04/20 15:58:17
Done.
|
| + self._google_storage_accessor.UploadString( |
| + json.dumps(self._trace_database.ToJsonDict(), indent=2), |
| + self._trace_database_path) |
| + |
| def _FetchClovisTask(self, project_name, task_api, queue_name): |
| """Fetches a ClovisTask from the task queue. |
| @@ -119,14 +140,9 @@ class Worker(object): |
| def _Finalize(self): |
| """Called before exiting.""" |
| - self._logger.info('Uploading trace database') |
| - self._google_storage_accessor.UploadString( |
| - json.dumps(self._trace_database.ToJsonDict(), indent=2), |
| - self._traces_dir + 'trace_database.json') |
| # TODO(droger): Implement automatic instance destruction. |
| self._logger.info('Done') |
| - |
| def _GenerateTrace(self, url, emulate_device, emulate_network, filename, |
| log_filename): |
| """ Generates a trace. |
| @@ -151,7 +167,6 @@ class Worker(object): |
| if not url.startswith('http') and not url.startswith('file'): |
| url = 'http://' + url |
| - old_stdout = sys.stdout |
| old_stderr = sys.stderr |
| trace_metadata = { 'succeeded' : False, 'url' : url } |
| @@ -183,7 +198,6 @@ class Worker(object): |
| with open(filename, 'w') as f: |
| json.dump(trace.ToJsonDict(), f, sort_keys=True, indent=2) |
| - sys.stdout = old_stdout |
| sys.stderr = old_stderr |
| return trace_metadata |
| @@ -223,17 +237,16 @@ class Worker(object): |
| remote_trace_location) |
| full_cloud_storage_path = ('gs://' + self._bucket_name + '/' + |
| remote_trace_location) |
| - self._trace_database.AddTrace(full_cloud_storage_path, trace_metadata) |
| + self._trace_database.SetTrace(full_cloud_storage_path, trace_metadata) |
| else: |
| self._logger.warning('Trace generation failed for URL: %s' % url) |
| - # TODO: upload the failure |
| if os.path.isfile(local_filename): |
| - self._google_storage_accessor.UploadFile(local_filename, |
| - failures_dir + remote_filename) |
| - self._logger.debug('Uploading log') |
| + self._google_storage_accessor.UploadFile( |
| + local_filename, failures_dir + remote_filename) |
| + self._logger.debug('Uploading analyze log') |
| self._google_storage_accessor.UploadFile(log_filename, |
| logs_dir + remote_filename) |
| - |
| + self._UploadTraceDatabase() |
| if __name__ == '__main__': |
| parser = argparse.ArgumentParser( |