Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(148)

Side by Side Diff: tools/android/loading/cloud/backend/worker.py

Issue 1908483002: tools/android/loading Loading trace database improvements (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Minor cleanup Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import argparse 5 import argparse
6 import json 6 import json
7 import logging 7 import logging
8 import os 8 import os
9 import re 9 import re
10 import sys 10 import sys
(...skipping 14 matching lines...) Expand all
25 import loading_trace 25 import loading_trace
26 from loading_trace_database import LoadingTraceDatabase 26 from loading_trace_database import LoadingTraceDatabase
27 import options 27 import options
28 28
29 29
30 class Worker(object): 30 class Worker(object):
31 def __init__(self, config, logger): 31 def __init__(self, config, logger):
32 """See README.md for the config format.""" 32 """See README.md for the config format."""
33 self._project_name = config['project_name'] 33 self._project_name = config['project_name']
34 self._taskqueue_tag = config['taskqueue_tag'] 34 self._taskqueue_tag = config['taskqueue_tag']
35 self._src_path = config['src_path']
35 self._credentials = GoogleCredentials.get_application_default() 36 self._credentials = GoogleCredentials.get_application_default()
36 self._logger = logger 37 self._logger = logger
37 38
38 # Separate the cloud storage path into the bucket and the base path under 39 # Separate the cloud storage path into the bucket and the base path under
39 # the bucket. 40 # the bucket.
40 storage_path_components = config['cloud_storage_path'].split('/') 41 storage_path_components = config['cloud_storage_path'].split('/')
41 self._bucket_name = storage_path_components[0] 42 self._bucket_name = storage_path_components[0]
42 self._base_path_in_bucket = '' 43 self._base_path_in_bucket = ''
43 if len(storage_path_components) > 1: 44 if len(storage_path_components) > 1:
44 self._base_path_in_bucket = '/'.join(storage_path_components[1:]) 45 self._base_path_in_bucket = '/'.join(storage_path_components[1:])
45 if not self._base_path_in_bucket.endswith('/'): 46 if not self._base_path_in_bucket.endswith('/'):
46 self._base_path_in_bucket += '/' 47 self._base_path_in_bucket += '/'
47 48
48 # TODO: improve the trace database to support concurrent access.
49 self._traces_dir = self._base_path_in_bucket + 'traces/'
50 self._trace_database = LoadingTraceDatabase({})
51
52 self._src_path = config['src_path']
53 self._google_storage_accessor = GoogleStorageAccessor( 49 self._google_storage_accessor = GoogleStorageAccessor(
54 credentials=self._credentials, project_name=self._project_name, 50 credentials=self._credentials, project_name=self._project_name,
55 bucket_name=self._bucket_name) 51 bucket_name=self._bucket_name)
56 52
53 self._traces_dir = os.path.join(self._base_path_in_bucket, 'traces')
54 self._trace_database_path = os.path.join(
55 self._traces_dir,
56 config.get('trace_database_filename', 'trace_database.json'))
57
58 # Recover any existing trace database in case the worker died.
59 self._DownloadTraceDatabase()
60
57 # Initialize the global options that will be used during trace generation. 61 # Initialize the global options that will be used during trace generation.
58 options.OPTIONS.ParseArgs([]) 62 options.OPTIONS.ParseArgs([])
59 options.OPTIONS.local_binary = config['chrome_path'] 63 options.OPTIONS.local_binary = config['chrome_path']
60 64
61 def Start(self): 65 def Start(self):
62 """Main worker loop. 66 """Main worker loop.
63 67
64 Repeatedly pulls tasks from the task queue and processes them. Returns when 68 Repeatedly pulls tasks from the task queue and processes them. Returns when
65 the queue is empty. 69 the queue is empty.
66 """ 70 """
(...skipping 20 matching lines...) Expand all
87 continue 91 continue
88 92
89 self._logger.info('Processing task %s' % task_id) 93 self._logger.info('Processing task %s' % task_id)
90 self._ProcessClovisTask(clovis_task) 94 self._ProcessClovisTask(clovis_task)
91 self._logger.debug('Deleting task %s' % task_id) 95 self._logger.debug('Deleting task %s' % task_id)
92 task_api.tasks().delete(project=project, taskqueue=queue_name, 96 task_api.tasks().delete(project=project, taskqueue=queue_name,
93 task=task_id).execute() 97 task=task_id).execute()
94 self._logger.info('Finished task %s' % task_id) 98 self._logger.info('Finished task %s' % task_id)
95 self._Finalize() 99 self._Finalize()
96 100
101 def _DownloadTraceDatabase(self):
102 """Downloads the trace database from CloudStorage."""
103 self._logger.info('Downloading trace database')
104 trace_database_string = self._google_storage_accessor.DownloadAsString(
105 self._trace_database_path) or '{}'
106 trace_database_dict = json.loads(trace_database_string)
107 self._trace_database = LoadingTraceDatabase(trace_database_dict)
108
109 def _UploadTraceDatabase(self):
110 """Uploads the trace database to CloudStorage."""
111 self._logger.info('Uploading trace database')
112 self._google_storage_accessor.UploadString(
113 json.dumps(self._trace_database.ToJsonDict(), indent=2),
114 self._trace_database_path)
115
97 def _FetchClovisTask(self, project_name, task_api, queue_name): 116 def _FetchClovisTask(self, project_name, task_api, queue_name):
98 """Fetches a ClovisTask from the task queue. 117 """Fetches a ClovisTask from the task queue.
99 118
100 Params: 119 Params:
101 project_name(str): The name of the Google Cloud project. 120 project_name(str): The name of the Google Cloud project.
102 task_api: The TaskQueue service. 121 task_api: The TaskQueue service.
103 queue_name(str): The name of the task queue. 122 queue_name(str): The name of the task queue.
104 123
105 Returns: 124 Returns:
106 (ClovisTask, str): The fetched ClovisTask and its task ID, or (None, None) 125 (ClovisTask, str): The fetched ClovisTask and its task ID, or (None, None)
107 if no tasks are found. 126 if no tasks are found.
108 """ 127 """
109 response = task_api.tasks().lease( 128 response = task_api.tasks().lease(
110 project=project_name, taskqueue=queue_name, numTasks=1, leaseSecs=180, 129 project=project_name, taskqueue=queue_name, numTasks=1, leaseSecs=180,
111 groupByTag=True, tag=self._taskqueue_tag).execute() 130 groupByTag=True, tag=self._taskqueue_tag).execute()
112 if (not response.get('items')) or (len(response['items']) < 1): 131 if (not response.get('items')) or (len(response['items']) < 1):
113 return (None, None) 132 return (None, None)
114 133
115 google_task = response['items'][0] 134 google_task = response['items'][0]
116 task_id = google_task['id'] 135 task_id = google_task['id']
117 clovis_task = ClovisTask.FromBase64(google_task['payloadBase64']) 136 clovis_task = ClovisTask.FromBase64(google_task['payloadBase64'])
118 return (clovis_task, task_id) 137 return (clovis_task, task_id)
119 138
120 def _Finalize(self): 139 def _Finalize(self):
121 """Called before exiting.""" 140 """Called before exiting."""
122 self._logger.info('Uploading trace database')
123 self._google_storage_accessor.UploadString(
124 json.dumps(self._trace_database.ToJsonDict(), indent=2),
125 self._traces_dir + 'trace_database.json')
126 # TODO(droger): Implement automatic instance destruction. 141 # TODO(droger): Implement automatic instance destruction.
127 self._logger.info('Done') 142 self._logger.info('Done')
128 143
129
130 def _GenerateTrace(self, url, emulate_device, emulate_network, filename, 144 def _GenerateTrace(self, url, emulate_device, emulate_network, filename,
131 log_filename): 145 log_filename):
132 """ Generates a trace. 146 """ Generates a trace.
133 147
134 Args: 148 Args:
135 url: URL as a string. 149 url: URL as a string.
136 emulate_device: Name of the device to emulate. Empty for no emulation. 150 emulate_device: Name of the device to emulate. Empty for no emulation.
137 emulate_network: Type of network emulation. Empty for no emulation. 151 emulate_network: Type of network emulation. Empty for no emulation.
138 filename: Name of the file where the trace is saved. 152 filename: Name of the file where the trace is saved.
139 log_filename: Name of the file where standard output and errors are 153 log_filename: Name of the file where standard output and errors are
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
194 self._logger.error('Unsupported task action: %s' % clovis_task.Action()) 208 self._logger.error('Unsupported task action: %s' % clovis_task.Action())
195 return 209 return
196 210
197 # Extract the task parameters. 211 # Extract the task parameters.
198 params = clovis_task.Params() 212 params = clovis_task.Params()
199 urls = params['urls'] 213 urls = params['urls']
200 repeat_count = params.get('repeat_count', 1) 214 repeat_count = params.get('repeat_count', 1)
201 emulate_device = params.get('emulate_device') 215 emulate_device = params.get('emulate_device')
202 emulate_network = params.get('emulate_network') 216 emulate_network = params.get('emulate_network')
203 217
204 failures_dir = self._base_path_in_bucket + 'failures/' 218 failures_dir = os.path.join(self._base_path_in_bucket, 'failures')
205 # TODO(blundell): Fix this up. 219 # TODO(blundell): Fix this up.
206 logs_dir = self._base_path_in_bucket + 'analyze_logs/' 220 logs_dir = os.path.join(self._base_path_in_bucket, 'analyze_logs')
207 log_filename = 'analyze.log' 221 log_filename = 'analyze.log'
208 # Avoid special characters in storage object names 222 # Avoid special characters in storage object names
209 pattern = re.compile(r"[#\?\[\]\*/]") 223 pattern = re.compile(r"[#\?\[\]\*/]")
210 224
211 while len(urls) > 0: 225 while len(urls) > 0:
212 url = urls.pop() 226 url = urls.pop()
213 local_filename = pattern.sub('_', url) 227 local_filename = pattern.sub('_', url)
214 for repeat in range(repeat_count): 228 for repeat in range(repeat_count):
215 self._logger.debug('Generating trace for URL: %s' % url) 229 self._logger.debug('Generating trace for URL: %s' % url)
216 remote_filename = local_filename + '/' + str(repeat) 230 remote_filename = os.path.join(local_filename, str(repeat))
217 trace_metadata = self._GenerateTrace( 231 trace_metadata = self._GenerateTrace(
218 url, emulate_device, emulate_network, local_filename, log_filename) 232 url, emulate_device, emulate_network, local_filename, log_filename)
219 if trace_metadata['succeeded']: 233 if trace_metadata['succeeded']:
220 self._logger.debug('Uploading: %s' % remote_filename) 234 self._logger.debug('Uploading: %s' % remote_filename)
221 remote_trace_location = self._traces_dir + remote_filename 235 remote_trace_location = os.path.join(self._traces_dir,
236 remote_filename)
222 self._google_storage_accessor.UploadFile(local_filename, 237 self._google_storage_accessor.UploadFile(local_filename,
223 remote_trace_location) 238 remote_trace_location)
224 full_cloud_storage_path = ('gs://' + self._bucket_name + '/' + 239 full_cloud_storage_path = os.path.join('gs://' + self._bucket_name,
225 remote_trace_location) 240 remote_trace_location)
226 self._trace_database.AddTrace(full_cloud_storage_path, trace_metadata) 241 self._trace_database.SetTrace(full_cloud_storage_path, trace_metadata)
227 else: 242 else:
228 self._logger.warning('Trace generation failed for URL: %s' % url) 243 self._logger.warning('Trace generation failed for URL: %s' % url)
229 # TODO: upload the failure
230 if os.path.isfile(local_filename): 244 if os.path.isfile(local_filename):
231 self._google_storage_accessor.UploadFile(local_filename, 245 self._google_storage_accessor.UploadFile(
232 failures_dir + remote_filename) 246 local_filename, os.path.join(failures_dir, remote_filename))
233 self._logger.debug('Uploading log') 247 self._logger.debug('Uploading analyze log')
234 self._google_storage_accessor.UploadFile(log_filename, 248 self._google_storage_accessor.UploadFile(
235 logs_dir + remote_filename) 249 log_filename, os.path.join(logs_dir, remote_filename))
236 250 self._UploadTraceDatabase()
237 251
238 if __name__ == '__main__': 252 if __name__ == '__main__':
239 parser = argparse.ArgumentParser( 253 parser = argparse.ArgumentParser(
240 description='ComputeEngine Worker for Clovis') 254 description='ComputeEngine Worker for Clovis')
241 parser.add_argument('--config', required=True, 255 parser.add_argument('--config', required=True,
242 help='Path to the configuration file.') 256 help='Path to the configuration file.')
243 args = parser.parse_args() 257 args = parser.parse_args()
244 258
245 # Configure logging. 259 # Configure logging.
246 logging.basicConfig(level=logging.WARNING) 260 logging.basicConfig(level=logging.WARNING)
247 worker_logger = logging.getLogger('worker') 261 worker_logger = logging.getLogger('worker')
248 worker_logger.setLevel(logging.INFO) 262 worker_logger.setLevel(logging.INFO)
249 263
250 worker_logger.info('Reading configuration') 264 worker_logger.info('Reading configuration')
251 with open(args.config) as config_json: 265 with open(args.config) as config_json:
252 worker = Worker(json.load(config_json), worker_logger) 266 worker = Worker(json.load(config_json), worker_logger)
253 worker.Start() 267 worker.Start()
254 268
OLDNEW
« no previous file with comments | « tools/android/loading/cloud/backend/startup-script.sh ('k') | tools/android/loading/loading_trace_database.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698