Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(5)

Side by Side Diff: tools/android/loading/cloud/backend/worker.py

Issue 1908483002: tools/android/loading Loading trace database improvements (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import argparse 5 import argparse
6 import json 6 import json
7 import logging 7 import logging
8 import os 8 import os
9 import re 9 import re
10 import sys 10 import sys
(...skipping 14 matching lines...) Expand all
25 import loading_trace 25 import loading_trace
26 from loading_trace_database import LoadingTraceDatabase 26 from loading_trace_database import LoadingTraceDatabase
27 import options 27 import options
28 28
29 29
30 class Worker(object): 30 class Worker(object):
31 def __init__(self, config, logger): 31 def __init__(self, config, logger):
32 """See README.md for the config format.""" 32 """See README.md for the config format."""
33 self._project_name = config['project_name'] 33 self._project_name = config['project_name']
34 self._taskqueue_tag = config['taskqueue_tag'] 34 self._taskqueue_tag = config['taskqueue_tag']
35 self._src_path = config['src_path']
35 self._credentials = GoogleCredentials.get_application_default() 36 self._credentials = GoogleCredentials.get_application_default()
36 self._logger = logger 37 self._logger = logger
37 38
38 # Separate the cloud storage path into the bucket and the base path under 39 # Separate the cloud storage path into the bucket and the base path under
39 # the bucket. 40 # the bucket.
40 storage_path_components = config['cloud_storage_path'].split('/') 41 storage_path_components = config['cloud_storage_path'].split('/')
41 self._bucket_name = storage_path_components[0] 42 self._bucket_name = storage_path_components[0]
42 self._base_path_in_bucket = '' 43 self._base_path_in_bucket = ''
43 if len(storage_path_components) > 1: 44 if len(storage_path_components) > 1:
44 self._base_path_in_bucket = '/'.join(storage_path_components[1:]) 45 self._base_path_in_bucket = '/'.join(storage_path_components[1:])
45 if not self._base_path_in_bucket.endswith('/'): 46 if not self._base_path_in_bucket.endswith('/'):
46 self._base_path_in_bucket += '/' 47 self._base_path_in_bucket += '/'
47 48
48 # TODO: improve the trace database to support concurrent access.
49 self._traces_dir = self._base_path_in_bucket + 'traces/'
50 self._trace_database = LoadingTraceDatabase({})
51
52 self._src_path = config['src_path']
53 self._google_storage_accessor = GoogleStorageAccessor( 49 self._google_storage_accessor = GoogleStorageAccessor(
54 credentials=self._credentials, project_name=self._project_name, 50 credentials=self._credentials, project_name=self._project_name,
55 bucket_name=self._bucket_name) 51 bucket_name=self._bucket_name)
56 52
53 self._traces_dir = self._base_path_in_bucket + 'traces/'
Benoit L 2016/04/20 15:02:22 Generally speaking, I prefer to manipulate paths w
droger 2016/04/20 15:58:17 Done.
54 self._trace_database_path = self._traces_dir + config.get(
55 'trace_database_filename', 'trace_database.json')
Benoit L 2016/04/20 15:02:22 Is the trace dir per worker? If not, then the trac
droger 2016/04/20 15:58:17 It is intended that all worker write to the same d
56
57 # Recover any existing trace database in case the worker died.
58 self._DownloadTraceDatabase()
59
57 # Initialize the global options that will be used during trace generation. 60 # Initialize the global options that will be used during trace generation.
58 options.OPTIONS.ParseArgs([]) 61 options.OPTIONS.ParseArgs([])
59 options.OPTIONS.local_binary = config['chrome_path'] 62 options.OPTIONS.local_binary = config['chrome_path']
60 63
61 def Start(self): 64 def Start(self):
62 """Main worker loop. 65 """Main worker loop.
63 66
64 Repeatedly pulls tasks from the task queue and processes them. Returns when 67 Repeatedly pulls tasks from the task queue and processes them. Returns when
65 the queue is empty. 68 the queue is empty.
66 """ 69 """
(...skipping 20 matching lines...) Expand all
87 continue 90 continue
88 91
89 self._logger.info('Processing task %s' % task_id) 92 self._logger.info('Processing task %s' % task_id)
90 self._ProcessClovisTask(clovis_task) 93 self._ProcessClovisTask(clovis_task)
91 self._logger.debug('Deleting task %s' % task_id) 94 self._logger.debug('Deleting task %s' % task_id)
92 task_api.tasks().delete(project=project, taskqueue=queue_name, 95 task_api.tasks().delete(project=project, taskqueue=queue_name,
93 task=task_id).execute() 96 task=task_id).execute()
94 self._logger.info('Finished task %s' % task_id) 97 self._logger.info('Finished task %s' % task_id)
95 self._Finalize() 98 self._Finalize()
96 99
100 def _DownloadTraceDatabase(self):
101 """Downloads the trace database from CloudStorage."""
102 trace_database_string = self._google_storage_accessor.DownloadAsString(
Benoit L 2016/04/20 15:02:22 What about: trace_db_str = self._google_storage_a
droger 2016/04/20 15:58:17 Done.
103 self._trace_database_path)
104 trace_database_dict = {}
105 if trace_database_string:
106 self._logger.warning(
107 'Importing existing trace database %s' % self._trace_database_path)
108 trace_database_dict = json.loads(trace_database_string)
109 self._trace_database = LoadingTraceDatabase(trace_database_dict)
110
111 def _UploadTraceDatabase(self):
112 """Uploads the trace database to CloudStorage."""
113 self._logger.info('Uploading trace database')
Benoit L 2016/04/20 15:02:22 tiny nit: Importing is "warning", uploading "info"
droger 2016/04/20 15:58:17 Done.
114 self._google_storage_accessor.UploadString(
115 json.dumps(self._trace_database.ToJsonDict(), indent=2),
116 self._trace_database_path)
117
97 def _FetchClovisTask(self, project_name, task_api, queue_name): 118 def _FetchClovisTask(self, project_name, task_api, queue_name):
98 """Fetches a ClovisTask from the task queue. 119 """Fetches a ClovisTask from the task queue.
99 120
100 Params: 121 Params:
101 project_name(str): The name of the Google Cloud project. 122 project_name(str): The name of the Google Cloud project.
102 task_api: The TaskQueue service. 123 task_api: The TaskQueue service.
103 queue_name(str): The name of the task queue. 124 queue_name(str): The name of the task queue.
104 125
105 Returns: 126 Returns:
106 (ClovisTask, str): The fetched ClovisTask and its task ID, or (None, None) 127 (ClovisTask, str): The fetched ClovisTask and its task ID, or (None, None)
107 if no tasks are found. 128 if no tasks are found.
108 """ 129 """
109 response = task_api.tasks().lease( 130 response = task_api.tasks().lease(
110 project=project_name, taskqueue=queue_name, numTasks=1, leaseSecs=180, 131 project=project_name, taskqueue=queue_name, numTasks=1, leaseSecs=180,
111 groupByTag=True, tag=self._taskqueue_tag).execute() 132 groupByTag=True, tag=self._taskqueue_tag).execute()
112 if (not response.get('items')) or (len(response['items']) < 1): 133 if (not response.get('items')) or (len(response['items']) < 1):
113 return (None, None) 134 return (None, None)
114 135
115 google_task = response['items'][0] 136 google_task = response['items'][0]
116 task_id = google_task['id'] 137 task_id = google_task['id']
117 clovis_task = ClovisTask.FromBase64(google_task['payloadBase64']) 138 clovis_task = ClovisTask.FromBase64(google_task['payloadBase64'])
118 return (clovis_task, task_id) 139 return (clovis_task, task_id)
119 140
120 def _Finalize(self): 141 def _Finalize(self):
121 """Called before exiting.""" 142 """Called before exiting."""
122 self._logger.info('Uploading trace database')
123 self._google_storage_accessor.UploadString(
124 json.dumps(self._trace_database.ToJsonDict(), indent=2),
125 self._traces_dir + 'trace_database.json')
126 # TODO(droger): Implement automatic instance destruction. 143 # TODO(droger): Implement automatic instance destruction.
127 self._logger.info('Done') 144 self._logger.info('Done')
128 145
129
130 def _GenerateTrace(self, url, emulate_device, emulate_network, filename, 146 def _GenerateTrace(self, url, emulate_device, emulate_network, filename,
131 log_filename): 147 log_filename):
132 """ Generates a trace. 148 """ Generates a trace.
133 149
134 Args: 150 Args:
135 url: URL as a string. 151 url: URL as a string.
136 emulate_device: Name of the device to emulate. Empty for no emulation. 152 emulate_device: Name of the device to emulate. Empty for no emulation.
137 emulate_network: Type of network emulation. Empty for no emulation. 153 emulate_network: Type of network emulation. Empty for no emulation.
138 filename: Name of the file where the trace is saved. 154 filename: Name of the file where the trace is saved.
139 log_filename: Name of the file where standard output and errors are 155 log_filename: Name of the file where standard output and errors are
140 logged. 156 logged.
141 157
142 Returns: 158 Returns:
143 A dictionary of metadata about the trace, including a 'succeeded' field 159 A dictionary of metadata about the trace, including a 'succeeded' field
144 indicating whether the trace was successfully generated. 160 indicating whether the trace was successfully generated.
145 """ 161 """
146 try: 162 try:
147 os.remove(filename) # Remove any existing trace for this URL. 163 os.remove(filename) # Remove any existing trace for this URL.
148 except OSError: 164 except OSError:
149 pass # Nothing to remove. 165 pass # Nothing to remove.
150 166
151 if not url.startswith('http') and not url.startswith('file'): 167 if not url.startswith('http') and not url.startswith('file'):
152 url = 'http://' + url 168 url = 'http://' + url
153 169
154 old_stdout = sys.stdout
155 old_stderr = sys.stderr 170 old_stderr = sys.stderr
156 171
157 trace_metadata = { 'succeeded' : False, 'url' : url } 172 trace_metadata = { 'succeeded' : False, 'url' : url }
158 trace = None 173 trace = None
159 with open(log_filename, 'w') as sys.stdout: 174 with open(log_filename, 'w') as sys.stdout:
160 try: 175 try:
161 sys.stderr = sys.stdout 176 sys.stderr = sys.stdout
162 177
163 # Set up the controller. 178 # Set up the controller.
164 chrome_ctl = controller.LocalChromeController() 179 chrome_ctl = controller.LocalChromeController()
(...skipping 11 matching lines...) Expand all
176 url, connection, chrome_ctl.ChromeMetadata()) 191 url, connection, chrome_ctl.ChromeMetadata())
177 trace_metadata['succeeded'] = True 192 trace_metadata['succeeded'] = True
178 trace_metadata.update(trace.ToJsonDict()[trace._METADATA_KEY]) 193 trace_metadata.update(trace.ToJsonDict()[trace._METADATA_KEY])
179 except Exception as e: 194 except Exception as e:
180 sys.stderr.write(str(e)) 195 sys.stderr.write(str(e))
181 196
182 if trace: 197 if trace:
183 with open(filename, 'w') as f: 198 with open(filename, 'w') as f:
184 json.dump(trace.ToJsonDict(), f, sort_keys=True, indent=2) 199 json.dump(trace.ToJsonDict(), f, sort_keys=True, indent=2)
185 200
186 sys.stdout = old_stdout
187 sys.stderr = old_stderr 201 sys.stderr = old_stderr
188 202
189 return trace_metadata 203 return trace_metadata
190 204
191 def _ProcessClovisTask(self, clovis_task): 205 def _ProcessClovisTask(self, clovis_task):
192 """Processes one clovis_task.""" 206 """Processes one clovis_task."""
193 if clovis_task.Action() != 'trace': 207 if clovis_task.Action() != 'trace':
194 self._logger.error('Unsupported task action: %s' % clovis_task.Action()) 208 self._logger.error('Unsupported task action: %s' % clovis_task.Action())
195 return 209 return
196 210
(...skipping 19 matching lines...) Expand all
216 remote_filename = local_filename + '/' + str(repeat) 230 remote_filename = local_filename + '/' + str(repeat)
217 trace_metadata = self._GenerateTrace( 231 trace_metadata = self._GenerateTrace(
218 url, emulate_device, emulate_network, local_filename, log_filename) 232 url, emulate_device, emulate_network, local_filename, log_filename)
219 if trace_metadata['succeeded']: 233 if trace_metadata['succeeded']:
220 self._logger.debug('Uploading: %s' % remote_filename) 234 self._logger.debug('Uploading: %s' % remote_filename)
221 remote_trace_location = self._traces_dir + remote_filename 235 remote_trace_location = self._traces_dir + remote_filename
222 self._google_storage_accessor.UploadFile(local_filename, 236 self._google_storage_accessor.UploadFile(local_filename,
223 remote_trace_location) 237 remote_trace_location)
224 full_cloud_storage_path = ('gs://' + self._bucket_name + '/' + 238 full_cloud_storage_path = ('gs://' + self._bucket_name + '/' +
225 remote_trace_location) 239 remote_trace_location)
226 self._trace_database.AddTrace(full_cloud_storage_path, trace_metadata) 240 self._trace_database.SetTrace(full_cloud_storage_path, trace_metadata)
227 else: 241 else:
228 self._logger.warning('Trace generation failed for URL: %s' % url) 242 self._logger.warning('Trace generation failed for URL: %s' % url)
229 # TODO: upload the failure
230 if os.path.isfile(local_filename): 243 if os.path.isfile(local_filename):
231 self._google_storage_accessor.UploadFile(local_filename, 244 self._google_storage_accessor.UploadFile(
232 failures_dir + remote_filename) 245 local_filename, failures_dir + remote_filename)
233 self._logger.debug('Uploading log') 246 self._logger.debug('Uploading analyze log')
234 self._google_storage_accessor.UploadFile(log_filename, 247 self._google_storage_accessor.UploadFile(log_filename,
235 logs_dir + remote_filename) 248 logs_dir + remote_filename)
236 249 self._UploadTraceDatabase()
237 250
238 if __name__ == '__main__': 251 if __name__ == '__main__':
239 parser = argparse.ArgumentParser( 252 parser = argparse.ArgumentParser(
240 description='ComputeEngine Worker for Clovis') 253 description='ComputeEngine Worker for Clovis')
241 parser.add_argument('--config', required=True, 254 parser.add_argument('--config', required=True,
242 help='Path to the configuration file.') 255 help='Path to the configuration file.')
243 args = parser.parse_args() 256 args = parser.parse_args()
244 257
245 # Configure logging. 258 # Configure logging.
246 logging.basicConfig(level=logging.WARNING) 259 logging.basicConfig(level=logging.WARNING)
247 worker_logger = logging.getLogger('worker') 260 worker_logger = logging.getLogger('worker')
248 worker_logger.setLevel(logging.INFO) 261 worker_logger.setLevel(logging.INFO)
249 262
250 worker_logger.info('Reading configuration') 263 worker_logger.info('Reading configuration')
251 with open(args.config) as config_json: 264 with open(args.config) as config_json:
252 worker = Worker(json.load(config_json), worker_logger) 265 worker = Worker(json.load(config_json), worker_logger)
253 worker.Start() 266 worker.Start()
254 267
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698