Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(70)

Side by Side Diff: tools/android/loading/cloud/frontend/clovis_frontend.py

Issue 1962283002: tools/android/loading Support "report" tasks in the Clovis frontend (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@report
Patch Set: lizeb comments Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import logging 5 import logging
6 import os 6 import os
7 import sys 7 import sys
8 import time 8 import time
9 9
10 import cloudstorage
10 import flask 11 import flask
11 from google.appengine.api import (app_identity, taskqueue) 12 from google.appengine.api import (app_identity, taskqueue)
12 from google.appengine.ext import deferred 13 from google.appengine.ext import deferred
13 from oauth2client.client import GoogleCredentials 14 from oauth2client.client import GoogleCredentials
14 15
15 from common.clovis_task import ClovisTask 16 from common.clovis_task import ClovisTask
16 import common.google_instance_helper 17 import common.google_instance_helper
18 from common.loading_trace_database import LoadingTraceDatabase
17 import email_helper 19 import email_helper
18 from memory_logs import MemoryLogs 20 from memory_logs import MemoryLogs
19 21
20 22
21 # Global variables. 23 # Global variables.
22 clovis_logger = logging.getLogger('clovis_frontend') 24 clovis_logger = logging.getLogger('clovis_frontend')
23 clovis_logger.setLevel(logging.DEBUG) 25 clovis_logger.setLevel(logging.DEBUG)
24 project_name = app_identity.get_application_id() 26 project_name = app_identity.get_application_id()
25 instance_helper = common.google_instance_helper.GoogleInstanceHelper( 27 instance_helper = common.google_instance_helper.GoogleInstanceHelper(
26 credentials=GoogleCredentials.get_application_default(), 28 credentials=GoogleCredentials.get_application_default(),
(...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after
142 clovis_logger.info('Instance template destruction for tag: ' + tag) 144 clovis_logger.info('Instance template destruction for tag: ' + tag)
143 if not instance_helper.DeleteTemplate(tag): 145 if not instance_helper.DeleteTemplate(tag):
144 clovis_logger.info('Instance template destruction failed for: ' + tag) 146 clovis_logger.info('Instance template destruction failed for: ' + tag)
145 if try_count <= 5: 147 if try_count <= 5:
146 deferred.defer(DeleteInstanceTemplate, tag, try_count + 1, _countdown=60) 148 deferred.defer(DeleteInstanceTemplate, tag, try_count + 1, _countdown=60)
147 return 149 return
148 clovis_logger.error('Giving up template destruction for: ' + tag) 150 clovis_logger.error('Giving up template destruction for: ' + tag)
149 clovis_logger.info('Cleanup complete for tag: ' + tag) 151 clovis_logger.info('Cleanup complete for tag: ' + tag)
150 152
151 153
154 def SplitClovisTask(task):
155 """Splits a ClovisTask in smaller ClovisTasks.
156
157 Args:
158 task: (ClovisTask) The task to split.
159
160 Returns:
161 list: The list of ClovisTasks.
162 """
163 # For report task, need to find the traces first.
164 if task.Action() == 'report':
165 bucket = task.BackendParams().get('storage_bucket')
166 if not bucket:
167 clovis_logger.error('Missing storage bucket for report task.')
168 return None
169 traces = GetTracePaths(bucket)
170 if not traces:
171 clovis_logger.error('No traces found in bucket: ' + bucket)
172 return None
173 task.ActionParams()['traces'] = traces
174
175 # Compute the split key.
176 split_params_for_action = {'trace': ('urls', 1), 'report': ('traces', 5)}
177 (split_key, slice_size) = split_params_for_action.get(task.Action(),
178 (None, 0))
179 if not split_key:
180 clovis_logger.error('Cannot split task with action: ' + task.Action())
181 return None
182
183 # Split the task using the split key.
184 clovis_logger.debug('Splitting task by: ' + split_key)
185 action_params = task.ActionParams()
186 values = action_params[split_key]
187 sub_tasks = []
188 for i in range(0, len(values), slice_size):
189 sub_task_params = action_params.copy()
190 sub_task_params[split_key] = [v for v in values[i:i+slice_size]]
191 sub_tasks.append(ClovisTask(task.Action(), sub_task_params,
192 task.BackendParams()))
193 return sub_tasks
194
195
196 def GetTracePaths(bucket):
197 """Returns a list of trace files in a bucket.
198
199 Finds and loads the trace databases, and returns their content as a list of
200 paths.
201
202 This function assumes a specific structure for the files in the bucket. These
203 assumptions must match the behavior of the backend:
204 - The trace databases are located under a 'trace' directory under |bucket|.
205 - The trace databases files are the only objects with the 'trace_database'
206 prefix in their name.
207
208 Returns:
209 list: The list of paths to traces, as strings.
210 """
211 traces = []
212 prefix = os.path.join('/', bucket, 'trace', 'trace_database')
213 file_stats = cloudstorage.listbucket(prefix)
214
215 for file_stat in file_stats:
216 database_file = file_stat.filename
217 clovis_logger.info('Loading trace database: ' + database_file)
218
219 with cloudstorage.open(database_file) as remote_file:
220 json_string = remote_file.read()
221 if not json_string:
222 clovis_logger.warning('Failed to download: ' + database_file)
223 continue
224
225 database = LoadingTraceDatabase.FromJsonString(json_string)
226 if not database:
227 clovis_logger.warning('Failed to parse: ' + database_file)
228 continue
229
230 for path in database.ToJsonDict():
231 traces.append(path)
232
233 return traces
234
235
152 def StartFromJsonString(http_body_str): 236 def StartFromJsonString(http_body_str):
153 """Main function handling a JSON task posted by the user.""" 237 """Main function handling a JSON task posted by the user."""
154 # Set up logging. 238 # Set up logging.
155 memory_logs = MemoryLogs(clovis_logger) 239 memory_logs = MemoryLogs(clovis_logger)
156 memory_logs.Start() 240 memory_logs.Start()
157 241
158 # Load the task from JSON. 242 # Load the task from JSON.
159 task = ClovisTask.FromJsonString(http_body_str) 243 task = ClovisTask.FromJsonString(http_body_str)
160 if not task: 244 if not task:
161 clovis_logger.error('Invalid JSON task.') 245 clovis_logger.error('Invalid JSON task.')
162 return Render('Invalid JSON task:\n' + http_body_str, memory_logs) 246 return Render('Invalid JSON task:\n' + http_body_str, memory_logs)
163 247
164 task_tag = task.BackendParams()['tag'] 248 task_tag = task.BackendParams()['tag']
249 clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(),
250 task_tag))
165 251
166 # Create the instance template if required. 252 # Create the instance template if required.
167 if not CreateInstanceTemplate(task): 253 if not CreateInstanceTemplate(task):
168 return Render('Template creation failed.', memory_logs) 254 return Render('Template creation failed.', memory_logs)
169 255
170 # Split the task in smaller tasks. 256 # Build the URL where the result will live.
171 sub_tasks = []
172 task_url = None 257 task_url = None
173 if task.Action() == 'trace': 258 if task.Action() == 'trace':
174 bucket = task.BackendParams().get('storage_bucket') 259 bucket = task.BackendParams().get('storage_bucket')
175 if bucket: 260 if bucket:
176 task_url = 'https://console.cloud.google.com/storage/' + bucket 261 task_url = 'https://console.cloud.google.com/storage/' + bucket
177 sub_tasks = SplitTraceTask(task) 262 elif task.Action() == 'report':
263 # This must match the table path defined in ReportTaskHandler.
264 table_id = ''.join(c for c in task_tag if c.isalnum() or c == '_')
265 table_name = 'clovis_dataset.report_' + table_id
266 task_url = 'https://bigquery.cloud.google.com/table/%s:%s' %(project_name,
267 table_name)
178 else: 268 else:
179 error_string = 'Unsupported action: %s.' % task.Action() 269 error_string = 'Unsupported action: %s.' % task.Action()
180 clovis_logger.error(error_string) 270 clovis_logger.error(error_string)
181 return Render(error_string, memory_logs) 271 return Render(error_string, memory_logs)
272 clovis_logger.info('Task result URL: ' + task_url)
273
274 # Split the task in smaller tasks.
275 sub_tasks = SplitClovisTask(task)
276 if not sub_tasks:
277 return Render('Task split failed.', memory_logs)
182 278
183 if not EnqueueTasks(sub_tasks, task_tag): 279 if not EnqueueTasks(sub_tasks, task_tag):
184 return Render('Task creation failed.', memory_logs) 280 return Render('Task creation failed.', memory_logs)
185 281
186 # Start the instances if required. 282 # Start the instances if required.
187 if not CreateInstances(task): 283 if not CreateInstances(task):
188 return Render('Instance creation failed.', memory_logs) 284 return Render('Instance creation failed.', memory_logs)
189 285
190 # Start polling the progress. 286 # Start polling the progress.
191 clovis_logger.info('Creating worker polling task.') 287 clovis_logger.info('Creating worker polling task.')
192 first_poll_delay_minutes = 10 288 first_poll_delay_minutes = 10
193 timeout_hours = task.BackendParams().get('timeout_hours', 5) 289 timeout_hours = task.BackendParams().get('timeout_hours', 5)
194 user_email = email_helper.GetUserEmail() 290 user_email = email_helper.GetUserEmail()
195 deferred.defer(PollWorkers, task_tag, time.time(), timeout_hours, user_email, 291 deferred.defer(PollWorkers, task_tag, time.time(), timeout_hours, user_email,
196 task_url, _countdown=(60 * first_poll_delay_minutes)) 292 task_url, _countdown=(60 * first_poll_delay_minutes))
197 293
198 return Render(flask.Markup( 294 return Render(flask.Markup(
199 'Success!<br>Your task %s has started.<br>' 295 'Success!<br>Your task %s has started.<br>'
200 'You will be notified at %s when completed.') % (task_tag, user_email), 296 'You will be notified at %s when completed.') % (task_tag, user_email),
201 memory_logs) 297 memory_logs)
202 298
203 299
204 def SplitTraceTask(task):
205 """Splits a tracing task with potentially many URLs into several tracing tasks
206 with few URLs.
207 """
208 clovis_logger.debug('Splitting trace task.')
209 action_params = task.ActionParams()
210 urls = action_params['urls']
211
212 # Split the task in smaller tasks with fewer URLs each.
213 urls_per_task = 1
214 sub_tasks = []
215 for i in range(0, len(urls), urls_per_task):
216 sub_task_params = action_params.copy()
217 sub_task_params['urls'] = [url for url in urls[i:i+urls_per_task]]
218 sub_tasks.append(ClovisTask(task.Action(), sub_task_params,
219 task.BackendParams()))
220 return sub_tasks
221
222
223 def EnqueueTasks(tasks, task_tag): 300 def EnqueueTasks(tasks, task_tag):
224 """Enqueues a list of tasks in the Google Cloud task queue, for consumption by 301 """Enqueues a list of tasks in the Google Cloud task queue, for consumption by
225 Google Compute Engine. 302 Google Compute Engine.
226 """ 303 """
227 q = taskqueue.Queue('clovis-queue') 304 q = taskqueue.Queue('clovis-queue')
228 # Add tasks to the queue by groups. 305 # Add tasks to the queue by groups.
229 # TODO(droger): This supports thousands of tasks, but maybe not millions. 306 # TODO(droger): This supports thousands of tasks, but maybe not millions.
230 # Defer the enqueuing if it times out. 307 # Defer the enqueuing if it times out.
231 group_size = 100 308 group_size = 100
232 callbacks = [] 309 callbacks = []
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
268 @app.errorhandler(404) 345 @app.errorhandler(404)
269 def PageNotFound(e): # pylint: disable=unused-argument 346 def PageNotFound(e): # pylint: disable=unused-argument
270 """Return a custom 404 error.""" 347 """Return a custom 404 error."""
271 return 'Sorry, Nothing at this URL.', 404 348 return 'Sorry, Nothing at this URL.', 404
272 349
273 350
274 @app.errorhandler(500) 351 @app.errorhandler(500)
275 def ApplicationError(e): 352 def ApplicationError(e):
276 """Return a custom 500 error.""" 353 """Return a custom 500 error."""
277 return 'Sorry, unexpected error: {}'.format(e), 499 354 return 'Sorry, unexpected error: {}'.format(e), 499
OLDNEW
« no previous file with comments | « tools/android/loading/cloud/frontend/README.md ('k') | tools/android/loading/cloud/frontend/requirements.txt » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698