OLD | NEW |
---|---|
1 # Copyright 2016 The Chromium Authors. All rights reserved. | 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import logging | 5 import logging |
6 import os | 6 import os |
7 import sys | 7 import sys |
8 import time | 8 import time |
9 | 9 |
10 import cloudstorage | |
10 import flask | 11 import flask |
11 from google.appengine.api import (app_identity, taskqueue) | 12 from google.appengine.api import (app_identity, taskqueue) |
12 from google.appengine.ext import deferred | 13 from google.appengine.ext import deferred |
13 from oauth2client.client import GoogleCredentials | 14 from oauth2client.client import GoogleCredentials |
14 | 15 |
15 from common.clovis_task import ClovisTask | 16 from common.clovis_task import ClovisTask |
16 import common.google_instance_helper | 17 import common.google_instance_helper |
18 from common.loading_trace_database import LoadingTraceDatabase | |
17 import email_helper | 19 import email_helper |
18 from memory_logs import MemoryLogs | 20 from memory_logs import MemoryLogs |
19 | 21 |
20 | 22 |
21 # Global variables. | 23 # Global variables. |
22 clovis_logger = logging.getLogger('clovis_frontend') | 24 clovis_logger = logging.getLogger('clovis_frontend') |
23 clovis_logger.setLevel(logging.DEBUG) | 25 clovis_logger.setLevel(logging.DEBUG) |
24 project_name = app_identity.get_application_id() | 26 project_name = app_identity.get_application_id() |
25 instance_helper = common.google_instance_helper.GoogleInstanceHelper( | 27 instance_helper = common.google_instance_helper.GoogleInstanceHelper( |
26 credentials=GoogleCredentials.get_application_default(), | 28 credentials=GoogleCredentials.get_application_default(), |
(...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
142 clovis_logger.info('Instance template destruction for tag: ' + tag) | 144 clovis_logger.info('Instance template destruction for tag: ' + tag) |
143 if not instance_helper.DeleteTemplate(tag): | 145 if not instance_helper.DeleteTemplate(tag): |
144 clovis_logger.info('Instance template destruction failed for: ' + tag) | 146 clovis_logger.info('Instance template destruction failed for: ' + tag) |
145 if try_count <= 5: | 147 if try_count <= 5: |
146 deferred.defer(DeleteInstanceTemplate, tag, try_count + 1, _countdown=60) | 148 deferred.defer(DeleteInstanceTemplate, tag, try_count + 1, _countdown=60) |
147 return | 149 return |
148 clovis_logger.error('Giving up template destruction for: ' + tag) | 150 clovis_logger.error('Giving up template destruction for: ' + tag) |
149 clovis_logger.info('Cleanup complete for tag: ' + tag) | 151 clovis_logger.info('Cleanup complete for tag: ' + tag) |
150 | 152 |
151 | 153 |
154 def SplitClovisTask(task): | |
155 """Splits a ClovisTask in smaller ClovisTasks. | |
156 | |
157 Args: | |
158 task (ClovisTask): The task to split. | |
159 | |
160 Returns: | |
161 list: The list of ClovisTasks. | |
162 """ | |
163 if task.Action() == 'trace': | |
164 return SplitTask(task, 'urls') | |
blundell
2016/05/13 11:19:55
I would just fold SplitTask() into this function:
| |
165 elif task.Action() == 'report': | |
166 bucket = task.BackendParams().get('storage_bucket') | |
167 if not bucket: | |
168 clovis_logger.error('Missing storage bucket fot report task.') | |
blundell
2016/05/13 11:19:55
nit: for
| |
169 return None | |
170 traces = GetTracePaths(bucket) | |
171 if not traces: | |
blundell
2016/05/13 11:19:55
In general, your functions could use some blank li
| |
172 clovis_logger.error('No traces found in bucket: ' + bucket) | |
173 return None | |
174 task.ActionParams()['traces'] = traces | |
175 return SplitTask(task, 'traces') | |
176 clovis_logger.error('Cannot split task with action: ' + task.Action()) | |
177 return None | |
178 | |
179 | |
180 def GetTracePaths(bucket): | |
181 """Returns a list of trace files in a bucket. | |
182 | |
183 Finds and loads the trace databases, and returns their content as a list of | |
184 paths. | |
185 | |
186 This function assumes a specific structure for the files in the bucket. These | |
187 assumptions must match the behavior of the backend: | |
188 - The trace databases are located under a 'trace' directory under |bucket|. | |
189 - The trace databases files are the only objects with the 'trace_database' | |
190 prefix in their name. | |
191 | |
192 Returns: | |
193 list: The list of paths to traces, as strings. | |
194 """ | |
195 traces = [] | |
196 prefix = os.path.join('/', bucket, 'trace', 'trace_database') | |
197 file_stats = cloudstorage.listbucket(prefix) | |
198 for file_stat in file_stats: | |
199 database_file = file_stat.filename | |
200 clovis_logger.info('Loading trace database: ' + database_file) | |
201 with cloudstorage.open(database_file) as remote_file: | |
202 json_string = remote_file.read() | |
203 if not json_string: | |
204 clovis_logger.warning('Failed to download: ' + database_file) | |
205 continue | |
206 database = LoadingTraceDatabase.FromJsonString(json_string) | |
207 if not database: | |
208 clovis_logger.warning('Failed to parse: ' + database_file) | |
209 continue | |
210 for path in database.ToJsonDict(): | |
211 traces.append(path) | |
212 return traces | |
213 | |
214 | |
152 def StartFromJsonString(http_body_str): | 215 def StartFromJsonString(http_body_str): |
153 """Main function handling a JSON task posted by the user.""" | 216 """Main function handling a JSON task posted by the user.""" |
154 # Set up logging. | 217 # Set up logging. |
155 memory_logs = MemoryLogs(clovis_logger) | 218 memory_logs = MemoryLogs(clovis_logger) |
156 memory_logs.Start() | 219 memory_logs.Start() |
157 | 220 |
158 # Load the task from JSON. | 221 # Load the task from JSON. |
159 task = ClovisTask.FromJsonString(http_body_str) | 222 task = ClovisTask.FromJsonString(http_body_str) |
160 if not task: | 223 if not task: |
161 clovis_logger.error('Invalid JSON task.') | 224 clovis_logger.error('Invalid JSON task.') |
162 return Render('Invalid JSON task:\n' + http_body_str, memory_logs) | 225 return Render('Invalid JSON task:\n' + http_body_str, memory_logs) |
163 | 226 |
164 task_tag = task.BackendParams()['tag'] | 227 task_tag = task.BackendParams()['tag'] |
228 clovis_logger.info('Start processing %s task with tag %s.' % (task.Action(), | |
229 task_tag)) | |
165 | 230 |
166 # Create the instance template if required. | 231 # Create the instance template if required. |
167 if not CreateInstanceTemplate(task): | 232 if not CreateInstanceTemplate(task): |
168 return Render('Template creation failed.', memory_logs) | 233 return Render('Template creation failed.', memory_logs) |
169 | 234 |
170 # Split the task in smaller tasks. | 235 # Build the URL where the result will live. |
171 sub_tasks = [] | |
172 task_url = None | 236 task_url = None |
173 if task.Action() == 'trace': | 237 if task.Action() == 'trace': |
174 bucket = task.BackendParams().get('storage_bucket') | 238 bucket = task.BackendParams().get('storage_bucket') |
175 if bucket: | 239 if bucket: |
176 task_url = 'https://console.cloud.google.com/storage/' + bucket | 240 task_url = 'https://console.cloud.google.com/storage/' + bucket |
177 sub_tasks = SplitTraceTask(task) | 241 elif task.Action() == 'report': |
242 # This must match the table path defined in ReportTaskHandler. | |
243 table_id = ''.join(c for c in task_tag if c.isalnum() or c == '_') | |
blundell
2016/05/13 11:19:55
Maybe this should be defined in a function in comm
droger
2016/05/13 15:44:17
Good point.
I'm doing a separate CL for this thoug
| |
244 table_name = 'clovis_dataset.report_' + table_id | |
245 task_url = 'https://bigquery.cloud.google.com/table/%s:%s' %(project_name, | |
246 table_name) | |
178 else: | 247 else: |
179 error_string = 'Unsupported action: %s.' % task.Action() | 248 error_string = 'Unsupported action: %s.' % task.Action() |
180 clovis_logger.error(error_string) | 249 clovis_logger.error(error_string) |
181 return Render(error_string, memory_logs) | 250 return Render(error_string, memory_logs) |
251 clovis_logger.info('Task result URL: ' + task_url) | |
252 | |
253 # Split the task in smaller tasks. | |
254 sub_tasks = SplitClovisTask(task) | |
255 if not sub_tasks: | |
256 return Render('Task split failed.', memory_logs) | |
182 | 257 |
183 if not EnqueueTasks(sub_tasks, task_tag): | 258 if not EnqueueTasks(sub_tasks, task_tag): |
184 return Render('Task creation failed.', memory_logs) | 259 return Render('Task creation failed.', memory_logs) |
185 | 260 |
186 # Start the instances if required. | 261 # Start the instances if required. |
187 if not CreateInstances(task): | 262 if not CreateInstances(task): |
188 return Render('Instance creation failed.', memory_logs) | 263 return Render('Instance creation failed.', memory_logs) |
189 | 264 |
190 # Start polling the progress. | 265 # Start polling the progress. |
191 clovis_logger.info('Creating worker polling task.') | 266 clovis_logger.info('Creating worker polling task.') |
192 first_poll_delay_minutes = 10 | 267 first_poll_delay_minutes = 10 |
193 timeout_hours = task.BackendParams().get('timeout_hours', 5) | 268 timeout_hours = task.BackendParams().get('timeout_hours', 5) |
194 user_email = email_helper.GetUserEmail() | 269 user_email = email_helper.GetUserEmail() |
195 deferred.defer(PollWorkers, task_tag, time.time(), timeout_hours, user_email, | 270 deferred.defer(PollWorkers, task_tag, time.time(), timeout_hours, user_email, |
196 task_url, _countdown=(60 * first_poll_delay_minutes)) | 271 task_url, _countdown=(60 * first_poll_delay_minutes)) |
197 | 272 |
198 return Render(flask.Markup( | 273 return Render(flask.Markup( |
199 'Success!<br>Your task %s has started.<br>' | 274 'Success!<br>Your task %s has started.<br>' |
200 'You will be notified at %s when completed.') % (task_tag, user_email), | 275 'You will be notified at %s when completed.') % (task_tag, user_email), |
201 memory_logs) | 276 memory_logs) |
202 | 277 |
203 | 278 |
204 def SplitTraceTask(task): | 279 def SplitTask(task, key, values_per_task=1): |
205 """Splits a tracing task with potentially many URLs into several tracing tasks | 280 """Splits a task with potentially many values into several tasks with few |
206 with few URLs. | 281 values. |
282 | |
283 Params: | |
284 key(str): The key in the action parameters that is used as the dimension of | |
blundell
2016/05/13 11:19:55
same comment as the other one re: type information
| |
285 the split. | |
286 The value associated to this key must be a list. | |
287 The resulting tasks are identical to the input task, except that | |
288 they only have a slice of this list. | |
289 values_per_task(int): The size of the slice of the list identified by |key|. | |
290 | |
291 Returns: | |
292 list: The list of smaller tasks. | |
207 """ | 293 """ |
208 clovis_logger.debug('Splitting trace task.') | 294 clovis_logger.debug('Splitting task.') |
209 action_params = task.ActionParams() | 295 action_params = task.ActionParams() |
210 urls = action_params['urls'] | 296 values = action_params[key] |
211 | 297 |
212 # Split the task in smaller tasks with fewer URLs each. | 298 # Split the task in smaller tasks with fewer values each. |
213 urls_per_task = 1 | |
214 sub_tasks = [] | 299 sub_tasks = [] |
215 for i in range(0, len(urls), urls_per_task): | 300 for i in range(0, len(values), values_per_task): |
216 sub_task_params = action_params.copy() | 301 sub_task_params = action_params.copy() |
217 sub_task_params['urls'] = [url for url in urls[i:i+urls_per_task]] | 302 sub_task_params[key] = [v for v in values[i:i+values_per_task]] |
218 sub_tasks.append(ClovisTask(task.Action(), sub_task_params, | 303 sub_tasks.append(ClovisTask(task.Action(), sub_task_params, |
219 task.BackendParams())) | 304 task.BackendParams())) |
220 return sub_tasks | 305 return sub_tasks |
221 | 306 |
222 | 307 |
223 def EnqueueTasks(tasks, task_tag): | 308 def EnqueueTasks(tasks, task_tag): |
224 """Enqueues a list of tasks in the Google Cloud task queue, for consumption by | 309 """Enqueues a list of tasks in the Google Cloud task queue, for consumption by |
225 Google Compute Engine. | 310 Google Compute Engine. |
226 """ | 311 """ |
227 q = taskqueue.Queue('clovis-queue') | 312 q = taskqueue.Queue('clovis-queue') |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
268 @app.errorhandler(404) | 353 @app.errorhandler(404) |
269 def PageNotFound(e): # pylint: disable=unused-argument | 354 def PageNotFound(e): # pylint: disable=unused-argument |
270 """Return a custom 404 error.""" | 355 """Return a custom 404 error.""" |
271 return 'Sorry, Nothing at this URL.', 404 | 356 return 'Sorry, Nothing at this URL.', 404 |
272 | 357 |
273 | 358 |
274 @app.errorhandler(500) | 359 @app.errorhandler(500) |
275 def ApplicationError(e): | 360 def ApplicationError(e): |
276 """Return a custom 500 error.""" | 361 """Return a custom 500 error.""" |
277 return 'Sorry, unexpected error: {}'.format(e), 499 | 362 return 'Sorry, unexpected error: {}'.format(e), 499 |
OLD | NEW |