Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 # Copyright 2016 The Chromium Authors. All rights reserved. | 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 import logging | 5 import logging |
| 6 import os | 6 import os |
| 7 import sys | 7 import sys |
| 8 import time | 8 import time |
| 9 | 9 |
| 10 import cloudstorage | |
| 10 import flask | 11 import flask |
| 11 from google.appengine.api import (app_identity, taskqueue) | 12 from google.appengine.api import (app_identity, taskqueue) |
| 12 from google.appengine.ext import deferred | 13 from google.appengine.ext import deferred |
| 13 from oauth2client.client import GoogleCredentials | 14 from oauth2client.client import GoogleCredentials |
| 14 | 15 |
| 15 from common.clovis_task import ClovisTask | 16 from common.clovis_task import ClovisTask |
| 16 import common.google_instance_helper | 17 import common.google_instance_helper |
| 18 from common.loading_trace_database import LoadingTraceDatabase | |
| 17 import email_helper | 19 import email_helper |
| 18 from memory_logs import MemoryLogs | 20 from memory_logs import MemoryLogs |
| 19 | 21 |
| 20 | 22 |
| 21 # Global variables. | 23 # Global variables. |
| 22 clovis_logger = logging.getLogger('clovis_frontend') | 24 clovis_logger = logging.getLogger('clovis_frontend') |
| 23 clovis_logger.setLevel(logging.DEBUG) | 25 clovis_logger.setLevel(logging.DEBUG) |
| 24 project_name = app_identity.get_application_id() | 26 project_name = app_identity.get_application_id() |
| 25 instance_helper = common.google_instance_helper.GoogleInstanceHelper( | 27 instance_helper = common.google_instance_helper.GoogleInstanceHelper( |
| 26 credentials=GoogleCredentials.get_application_default(), | 28 credentials=GoogleCredentials.get_application_default(), |
| (...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 142 clovis_logger.info('Instance template destruction for tag: ' + tag) | 144 clovis_logger.info('Instance template destruction for tag: ' + tag) |
| 143 if not instance_helper.DeleteTemplate(tag): | 145 if not instance_helper.DeleteTemplate(tag): |
| 144 clovis_logger.info('Instance template destruction failed for: ' + tag) | 146 clovis_logger.info('Instance template destruction failed for: ' + tag) |
| 145 if try_count <= 5: | 147 if try_count <= 5: |
| 146 deferred.defer(DeleteInstanceTemplate, tag, try_count + 1, _countdown=60) | 148 deferred.defer(DeleteInstanceTemplate, tag, try_count + 1, _countdown=60) |
| 147 return | 149 return |
| 148 clovis_logger.error('Giving up template destruction for: ' + tag) | 150 clovis_logger.error('Giving up template destruction for: ' + tag) |
| 149 clovis_logger.info('Cleanup complete for tag: ' + tag) | 151 clovis_logger.info('Cleanup complete for tag: ' + tag) |
| 150 | 152 |
| 151 | 153 |
| 154 def SplitClovisTask(task): | |
| 155 """Splits a ClovisTask in smaller ClovisTasks. | |
| 156 | |
| 157 Args: | |
| 158 task (ClovisTask): The task to split. | |
| 159 | |
| 160 Returns: | |
| 161 list: The list of ClovisTasks. | |
| 162 """ | |
| 163 if task.Action() == 'trace': | |
| 164 return SplitTask(task, 'urls') | |
|
blundell
2016/05/13 11:19:55
I would just fold SplitTask() into this function:
| |
| 165 elif task.Action() == 'report': | |
| 166 bucket = task.BackendParams().get('storage_bucket') | |
| 167 if not bucket: | |
| 168 clovis_logger.error('Missing storage bucket fot report task.') | |
|
blundell
2016/05/13 11:19:55
nit: for
| |
| 169 return None | |
| 170 traces = GetTracePaths(bucket) | |
| 171 if not traces: | |
|
blundell
2016/05/13 11:19:55
In general, your functions could use some blank li
| |
| 172 clovis_logger.error('No traces found in bucket: ' + bucket) | |
| 173 return None | |
| 174 task.ActionParams()['traces'] = traces | |
| 175 return SplitTask(task, 'traces') | |
| 176 clovis_logger.error('Cannot split task with action: ' + task.Action()) | |
| 177 return None | |
| 178 | |
| 179 | |
| 180 def GetTracePaths(bucket): | |
| 181 """Returns a list of trace files in a bucket. | |
| 182 | |
| 183 Finds and loads the trace databases, and returns their content as a list of | |
| 184 paths. | |
| 185 | |
| 186 This function assumes a specific structure for the files in the bucket. These | |
| 187 assumptions must match the behavior of the backend: | |
| 188 - The trace databases are located under a 'trace' directory under |bucket|. | |
| 189 - The trace databases files are the only objects with the 'trace_database' | |
| 190 prefix in their name. | |
| 191 | |
| 192 Returns: | |
| 193 list: The list of paths to traces, as strings. | |
| 194 """ | |
| 195 traces = [] | |
| 196 prefix = os.path.join('/', bucket, 'trace', 'trace_database') | |
| 197 file_stats = cloudstorage.listbucket(prefix) | |
| 198 for file_stat in file_stats: | |
| 199 database_file = file_stat.filename | |
| 200 clovis_logger.info('Loading trace database: ' + database_file) | |
| 201 with cloudstorage.open(database_file) as remote_file: | |
| 202 json_string = remote_file.read() | |
| 203 if not json_string: | |
| 204 clovis_logger.warning('Failed to download: ' + database_file) | |
| 205 continue | |
| 206 database = LoadingTraceDatabase.FromJsonString(json_string) | |
| 207 if not database: | |
| 208 clovis_logger.warning('Failed to parse: ' + database_file) | |
| 209 continue | |
| 210 for path in database.ToJsonDict(): | |
| 211 traces.append(path) | |
| 212 return traces | |
| 213 | |
| 214 | |
| 152 def StartFromJsonString(http_body_str): | 215 def StartFromJsonString(http_body_str): |
| 153 """Main function handling a JSON task posted by the user.""" | 216 """Main function handling a JSON task posted by the user.""" |
| 154 # Set up logging. | 217 # Set up logging. |
| 155 memory_logs = MemoryLogs(clovis_logger) | 218 memory_logs = MemoryLogs(clovis_logger) |
| 156 memory_logs.Start() | 219 memory_logs.Start() |
| 157 | 220 |
| 158 # Load the task from JSON. | 221 # Load the task from JSON. |
| 159 task = ClovisTask.FromJsonString(http_body_str) | 222 task = ClovisTask.FromJsonString(http_body_str) |
| 160 if not task: | 223 if not task: |
| 161 clovis_logger.error('Invalid JSON task.') | 224 clovis_logger.error('Invalid JSON task.') |
| 162 return Render('Invalid JSON task:\n' + http_body_str, memory_logs) | 225 return Render('Invalid JSON task:\n' + http_body_str, memory_logs) |
| 163 | 226 |
| 164 task_tag = task.BackendParams()['tag'] | 227 task_tag = task.BackendParams()['tag'] |
| 228 clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(), | |
| 229 task_tag)) | |
| 165 | 230 |
| 166 # Create the instance template if required. | 231 # Create the instance template if required. |
| 167 if not CreateInstanceTemplate(task): | 232 if not CreateInstanceTemplate(task): |
| 168 return Render('Template creation failed.', memory_logs) | 233 return Render('Template creation failed.', memory_logs) |
| 169 | 234 |
| 170 # Split the task in smaller tasks. | 235 # Build the URL where the result will live. |
| 171 sub_tasks = [] | |
| 172 task_url = None | 236 task_url = None |
| 173 if task.Action() == 'trace': | 237 if task.Action() == 'trace': |
| 174 bucket = task.BackendParams().get('storage_bucket') | 238 bucket = task.BackendParams().get('storage_bucket') |
| 175 if bucket: | 239 if bucket: |
| 176 task_url = 'https://console.cloud.google.com/storage/' + bucket | 240 task_url = 'https://console.cloud.google.com/storage/' + bucket |
| 177 sub_tasks = SplitTraceTask(task) | 241 elif task.Action() == 'report': |
| 242 # This must match the table path defined in ReportTaskHandler. | |
| 243 table_id = ''.join(c for c in task_tag if c.isalnum() or c == '_') | |
|
blundell
2016/05/13 11:19:55
Maybe this should be defined in a function in comm
droger
2016/05/13 15:44:17
Good point.
I'm doing a separate CL for this thoug
| |
| 244 table_name = 'clovis_dataset.report_' + table_id | |
| 245 task_url = 'https://bigquery.cloud.google.com/table/%s:%s' %(project_name, | |
| 246 table_name) | |
| 178 else: | 247 else: |
| 179 error_string = 'Unsupported action: %s.' % task.Action() | 248 error_string = 'Unsupported action: %s.' % task.Action() |
| 180 clovis_logger.error(error_string) | 249 clovis_logger.error(error_string) |
| 181 return Render(error_string, memory_logs) | 250 return Render(error_string, memory_logs) |
| 251 clovis_logger.info('Task result URL: ' + task_url) | |
| 252 | |
| 253 # Split the task in smaller tasks. | |
| 254 sub_tasks = SplitClovisTask(task) | |
| 255 if not sub_tasks: | |
| 256 return Render('Task split failed.', memory_logs) | |
| 182 | 257 |
| 183 if not EnqueueTasks(sub_tasks, task_tag): | 258 if not EnqueueTasks(sub_tasks, task_tag): |
| 184 return Render('Task creation failed.', memory_logs) | 259 return Render('Task creation failed.', memory_logs) |
| 185 | 260 |
| 186 # Start the instances if required. | 261 # Start the instances if required. |
| 187 if not CreateInstances(task): | 262 if not CreateInstances(task): |
| 188 return Render('Instance creation failed.', memory_logs) | 263 return Render('Instance creation failed.', memory_logs) |
| 189 | 264 |
| 190 # Start polling the progress. | 265 # Start polling the progress. |
| 191 clovis_logger.info('Creating worker polling task.') | 266 clovis_logger.info('Creating worker polling task.') |
| 192 first_poll_delay_minutes = 10 | 267 first_poll_delay_minutes = 10 |
| 193 timeout_hours = task.BackendParams().get('timeout_hours', 5) | 268 timeout_hours = task.BackendParams().get('timeout_hours', 5) |
| 194 user_email = email_helper.GetUserEmail() | 269 user_email = email_helper.GetUserEmail() |
| 195 deferred.defer(PollWorkers, task_tag, time.time(), timeout_hours, user_email, | 270 deferred.defer(PollWorkers, task_tag, time.time(), timeout_hours, user_email, |
| 196 task_url, _countdown=(60 * first_poll_delay_minutes)) | 271 task_url, _countdown=(60 * first_poll_delay_minutes)) |
| 197 | 272 |
| 198 return Render(flask.Markup( | 273 return Render(flask.Markup( |
| 199 'Success!<br>Your task %s has started.<br>' | 274 'Success!<br>Your task %s has started.<br>' |
| 200 'You will be notified at %s when completed.') % (task_tag, user_email), | 275 'You will be notified at %s when completed.') % (task_tag, user_email), |
| 201 memory_logs) | 276 memory_logs) |
| 202 | 277 |
| 203 | 278 |
| 204 def SplitTraceTask(task): | 279 def SplitTask(task, key, values_per_task=1): |
| 205 """Splits a tracing task with potentially many URLs into several tracing tasks | 280 """Splits a task with potentially many values into several tasks with few |
| 206 with few URLs. | 281 values. |
| 282 | |
| 283 Params: | |
| 284 key(str): The key in the action parameters that is used as the dimension of | |
|
blundell
2016/05/13 11:19:55
same comment as the other one re: type information
| |
| 285 the split. | |
| 286 The value associated to this key must be a list. | |
| 287 The resulting tasks are identical to the input task, except that | |
| 288 they only have a slice of this list. | |
| 289 values_per_task(int): The size of the slice of the list identified by |key|. | |
| 290 | |
| 291 Returns: | |
| 292 list: The list of smaller tasks. | |
| 207 """ | 293 """ |
| 208 clovis_logger.debug('Splitting trace task.') | 294 clovis_logger.debug('Splitting task.') |
| 209 action_params = task.ActionParams() | 295 action_params = task.ActionParams() |
| 210 urls = action_params['urls'] | 296 values = action_params[key] |
| 211 | 297 |
| 212 # Split the task in smaller tasks with fewer URLs each. | 298 # Split the task in smaller tasks with fewer values each. |
| 213 urls_per_task = 1 | |
| 214 sub_tasks = [] | 299 sub_tasks = [] |
| 215 for i in range(0, len(urls), urls_per_task): | 300 for i in range(0, len(values), values_per_task): |
| 216 sub_task_params = action_params.copy() | 301 sub_task_params = action_params.copy() |
| 217 sub_task_params['urls'] = [url for url in urls[i:i+urls_per_task]] | 302 sub_task_params[key] = [v for v in values[i:i+values_per_task]] |
| 218 sub_tasks.append(ClovisTask(task.Action(), sub_task_params, | 303 sub_tasks.append(ClovisTask(task.Action(), sub_task_params, |
| 219 task.BackendParams())) | 304 task.BackendParams())) |
| 220 return sub_tasks | 305 return sub_tasks |
| 221 | 306 |
| 222 | 307 |
| 223 def EnqueueTasks(tasks, task_tag): | 308 def EnqueueTasks(tasks, task_tag): |
| 224 """Enqueues a list of tasks in the Google Cloud task queue, for consumption by | 309 """Enqueues a list of tasks in the Google Cloud task queue, for consumption by |
| 225 Google Compute Engine. | 310 Google Compute Engine. |
| 226 """ | 311 """ |
| 227 q = taskqueue.Queue('clovis-queue') | 312 q = taskqueue.Queue('clovis-queue') |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 268 @app.errorhandler(404) | 353 @app.errorhandler(404) |
| 269 def PageNotFound(e): # pylint: disable=unused-argument | 354 def PageNotFound(e): # pylint: disable=unused-argument |
| 270 """Return a custom 404 error.""" | 355 """Return a custom 404 error.""" |
| 271 return 'Sorry, Nothing at this URL.', 404 | 356 return 'Sorry, Nothing at this URL.', 404 |
| 272 | 357 |
| 273 | 358 |
| 274 @app.errorhandler(500) | 359 @app.errorhandler(500) |
| 275 def ApplicationError(e): | 360 def ApplicationError(e): |
| 276 """Return a custom 500 error.""" | 361 """Return a custom 500 error.""" |
| 277 return 'Sorry, unexpected error: {}'.format(e), 499 | 362 return 'Sorry, unexpected error: {}'.format(e), 499 |
| OLD | NEW |