Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(9)

Side by Side Diff: tools/android/loading/cloud/backend/worker.py

Issue 1895033002: tools/android/loading Switch the GCE worker to pull queues (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@appengine
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2016 The Chromium Authors. All rights reserved. 1 # Copyright 2016 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 import argparse
5 import json 6 import json
7 import logging
6 import os 8 import os
7 import re 9 import re
8 import threading 10 import sys
9 import time 11 import time
10 import subprocess 12
11 import sys 13 from googleapiclient import discovery
14 from oauth2client.client import GoogleCredentials
12 15
13 # NOTE: The parent directory needs to be first in sys.path to avoid conflicts 16 # NOTE: The parent directory needs to be first in sys.path to avoid conflicts
14 # with catapult modules that have colliding names, as catapult inserts itself 17 # with catapult modules that have colliding names, as catapult inserts itself
15 # into the path as the second element. This is an ugly and fragile hack. 18 # into the path as the second element. This is an ugly and fragile hack.
16 sys.path.insert(0, 19 sys.path.insert(0,
17 os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir)) 20 os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir,
21 os.pardir))
18 import controller 22 import controller
23 from cloud.common.clovis_task import ClovisTask
19 from google_storage_accessor import GoogleStorageAccessor 24 from google_storage_accessor import GoogleStorageAccessor
20 import loading_trace 25 import loading_trace
21 from loading_trace_database import LoadingTraceDatabase 26 from loading_trace_database import LoadingTraceDatabase
22 import options 27 import options
23 28
24 29
25 class ServerApp(object): 30 class Worker(object):
26 """Simple web server application, collecting traces and writing them in 31 def __init__(self, config, logger):
27 Google Cloud Storage. 32 """See README.md for the config format."""
28 """ 33 self._project_name = config['project_name']
34 self._taskqueue_tag = config['taskqueue_tag']
35 self._credentials = GoogleCredentials.get_application_default()
36 self._logger = logger
29 37
30 def __init__(self, configuration_file): 38 # Separate the cloud storage path into the bucket and the base path under
31 """|configuration_file| is a path to a file containing JSON as described in 39 # the bucket.
32 README.md. 40 storage_path_components = config['cloud_storage_path'].split('/')
33 """ 41 self._bucket_name = storage_path_components[0]
34 self._tasks = [] # List of remaining tasks, only modified by _thread. 42 self._base_path_in_bucket = ''
35 self._failed_tasks = [] # Failed tasks, only modified by _thread. 43 if len(storage_path_components) > 1:
36 self._thread = None 44 self._base_path_in_bucket = '/'.join(storage_path_components[1:])
37 self._tasks_lock = threading.Lock() # Protects _tasks and _failed_tasks. 45 if not self._base_path_in_bucket.endswith('/'):
38 self._initial_task_count = -1 46 self._base_path_in_bucket += '/'
39 self._start_time = None
40 print 'Reading configuration'
41 with open(configuration_file) as config_json:
42 config = json.load(config_json)
43 47
44 # Separate the cloud storage path into the bucket and the base path under 48 # TODO: improve the trace database to support concurrent access.
45 # the bucket. 49 self._traces_dir = self._base_path_in_bucket + 'traces/'
46 storage_path_components = config['cloud_storage_path'].split('/') 50 self._trace_database = LoadingTraceDatabase({})
47 self._bucket_name = storage_path_components[0]
48 self._base_path_in_bucket = ''
49 if len(storage_path_components) > 1:
50 self._base_path_in_bucket = '/'.join(storage_path_components[1:])
51 if not self._base_path_in_bucket.endswith('/'):
52 self._base_path_in_bucket += '/'
53 51
54 self._src_path = config['src_path'] 52 self._src_path = config['src_path']
55 self._google_storage_accessor = GoogleStorageAccessor( 53 self._google_storage_accessor = GoogleStorageAccessor(
56 project_name=config['project_name'], bucket_name=self._bucket_name) 54 credentials=self._credentials, project_name=self._project_name,
55 bucket_name=self._bucket_name)
57 56
58 # Initialize the global options that will be used during trace generation. 57 # Initialize the global options that will be used during trace generation.
59 options.OPTIONS.ParseArgs([]) 58 options.OPTIONS.ParseArgs([])
60 options.OPTIONS.local_binary = config['chrome_path'] 59 options.OPTIONS.local_binary = config['chrome_path']
61 60
62 def _IsProcessingTasks(self): 61 def Start(self):
63 """Returns True if the application is currently processing tasks.""" 62 """Main worker loop.
64 return self._thread is not None and self._thread.is_alive() 63
64 Repeatedly pulls tasks from the task queue and processes them. Returns when
65 the queue is empty.
66 """
67 task_api = discovery.build('taskqueue', 'v1beta2',
68 credentials=self._credentials)
69 queue_name = 'clovis-queue'
70 # Workaround for
71 # https://code.google.com/p/googleappengine/issues/detail?id=10199
72 project = 's~' + self._project_name
73
74 while True:
75 self._logger.debug('Fetching new task.')
76 (clovis_task, task_id) = self._FetchClovisTask(project, task_api,
77 queue_name)
78 if not clovis_task:
79 if self._trace_database.ToJsonDict():
80 self._logger.info('No remaining tasks in the queue.')
81 break
82 else:
83 delay_seconds = 60
84 self._logger.info(
85 'Nothing in the queue, retrying in %i seconds.' % delay_seconds)
86 time.sleep(delay_seconds)
87 continue
88
89 self._logger.info('Processing task %s' % task_id)
90 self._ProcessClovisTask(clovis_task)
91 self._logger.debug('Deleting task %s' % task_id)
92 task_api.tasks().delete(project=project, taskqueue=queue_name,
93 task=task_id).execute()
94 self._logger.info('Finished task %s' % task_id)
95 self._Finalize()
96
97 def _FetchClovisTask(self, project_name, task_api, queue_name):
98 """Fetches a ClovisTask from the task queue.
99
100 Params:
101 project_name(str): The name of the Google Cloud project.
102 task_api: The TaskQueue service.
103 queue_name(str): The name of the task queue.
104
105 Returns:
106 (ClovisTask, str): The fetched ClovisTask and its task ID, or (None, None)
107 if no tasks are found.
108 """
109 response = task_api.tasks().lease(
110 project=project_name, taskqueue=queue_name, numTasks=1, leaseSecs=180,
111 groupByTag=True, tag=self._taskqueue_tag).execute()
112 if (not response.get('items')) or (len(response['items']) < 1):
113 return (None, None)
114
115 google_task = response['items'][0]
116 task_id = google_task['id']
117 clovis_task = ClovisTask.FromBase64(google_task['payloadBase64'])
118 return (clovis_task, task_id)
119
120 def _Finalize(self):
121 """Called before exiting."""
122 self._logger.info('Uploading trace database')
123 self._google_storage_accessor.UploadString(
124 json.dumps(self._trace_database.ToJsonDict(), indent=2),
125 self._traces_dir + 'trace_database.json')
126 # TODO(droger): Implement automatic instance destruction.
127 self._logger.info('Done')
128
65 129
66 def _GenerateTrace(self, url, emulate_device, emulate_network, filename, 130 def _GenerateTrace(self, url, emulate_device, emulate_network, filename,
67 log_filename): 131 log_filename):
68 """ Generates a trace on _thread. 132 """ Generates a trace.
69 133
70 Args: 134 Args:
71 url: URL as a string. 135 url: URL as a string.
72 emulate_device: Name of the device to emulate. Empty for no emulation. 136 emulate_device: Name of the device to emulate. Empty for no emulation.
73 emulate_network: Type of network emulation. Empty for no emulation. 137 emulate_network: Type of network emulation. Empty for no emulation.
74 filename: Name of the file where the trace is saved. 138 filename: Name of the file where the trace is saved.
75 log_filename: Name of the file where standard output and errors are logged 139 log_filename: Name of the file where standard output and errors are
140 logged.
76 141
77 Returns: 142 Returns:
78 A dictionary of metadata about the trace, including a 'succeeded' field 143 A dictionary of metadata about the trace, including a 'succeeded' field
79 indicating whether the trace was successfully generated. 144 indicating whether the trace was successfully generated.
80 """ 145 """
81 try: 146 try:
82 os.remove(filename) # Remove any existing trace for this URL. 147 os.remove(filename) # Remove any existing trace for this URL.
83 except OSError: 148 except OSError:
84 pass # Nothing to remove. 149 pass # Nothing to remove.
85 150
(...skipping 30 matching lines...) Expand all
116 181
117 if trace: 182 if trace:
118 with open(filename, 'w') as f: 183 with open(filename, 'w') as f:
119 json.dump(trace.ToJsonDict(), f, sort_keys=True, indent=2) 184 json.dump(trace.ToJsonDict(), f, sort_keys=True, indent=2)
120 185
121 sys.stdout = old_stdout 186 sys.stdout = old_stdout
122 sys.stderr = old_stderr 187 sys.stderr = old_stderr
123 188
124 return trace_metadata 189 return trace_metadata
125 190
126 def _GetCurrentTaskCount(self): 191 def _ProcessClovisTask(self, clovis_task):
127 """Returns the number of remaining tasks. Thread safe.""" 192 """Processes one clovis_task."""
128 self._tasks_lock.acquire() 193 if clovis_task.Action() != 'trace':
129 task_count = len(self._tasks) 194 self._logger.error('Unsupported task action: %s' % clovis_task.Action())
130 self._tasks_lock.release() 195 return
131 return task_count
132 196
133 def _ProcessTasks(self, tasks, repeat_count, emulate_device, emulate_network): 197 # Extract the task parameters.
134 """Iterates over _task, generating a trace for each of them. Uploads the 198 params = clovis_task.Params()
135 resulting traces to Google Cloud Storage. Runs on _thread. 199 urls = params['urls']
200 repeat_count = params.get('repeat_count', 1)
201 emulate_device = params.get('emulate_device')
202 emulate_network = params.get('emulate_network')
136 203
137 Args:
138 tasks: The list of URLs to process.
139 repeat_count: The number of traces generated for each URL.
140 emulate_device: Name of the device to emulate. Empty for no emulation.
141 emulate_network: Type of network emulation. Empty for no emulation.
142 """
143 # The main thread might be reading the task lists, take the lock to modify.
144 self._tasks_lock.acquire()
145 self._tasks = tasks
146 self._failed_tasks = []
147 self._tasks_lock.release()
148 failures_dir = self._base_path_in_bucket + 'failures/' 204 failures_dir = self._base_path_in_bucket + 'failures/'
149 traces_dir = self._base_path_in_bucket + 'traces/'
150
151 trace_database = LoadingTraceDatabase({})
152
153 # TODO(blundell): Fix this up. 205 # TODO(blundell): Fix this up.
154 logs_dir = self._base_path_in_bucket + 'analyze_logs/' 206 logs_dir = self._base_path_in_bucket + 'analyze_logs/'
155 log_filename = 'analyze.log' 207 log_filename = 'analyze.log'
156 # Avoid special characters in storage object names 208 # Avoid special characters in storage object names
157 pattern = re.compile(r"[#\?\[\]\*/]") 209 pattern = re.compile(r"[#\?\[\]\*/]")
158 while len(self._tasks) > 0: 210
159 url = self._tasks[-1] 211 while len(urls) > 0:
212 url = urls.pop()
160 local_filename = pattern.sub('_', url) 213 local_filename = pattern.sub('_', url)
161 for repeat in range(repeat_count): 214 for repeat in range(repeat_count):
162 print 'Generating trace for URL: %s' % url 215 self._logger.debug('Generating trace for URL: %s' % url)
163 remote_filename = local_filename + '/' + str(repeat) 216 remote_filename = local_filename + '/' + str(repeat)
164 trace_metadata = self._GenerateTrace( 217 trace_metadata = self._GenerateTrace(
165 url, emulate_device, emulate_network, local_filename, log_filename) 218 url, emulate_device, emulate_network, local_filename, log_filename)
166 if trace_metadata['succeeded']: 219 if trace_metadata['succeeded']:
167 print 'Uploading: %s' % remote_filename 220 self._logger.debug('Uploading: %s' % remote_filename)
168 remote_trace_location = traces_dir + remote_filename 221 remote_trace_location = self._traces_dir + remote_filename
169 self._google_storage_accessor.UploadFile(local_filename, 222 self._google_storage_accessor.UploadFile(local_filename,
170 remote_trace_location) 223 remote_trace_location)
171 full_cloud_storage_path = ('gs://' + self._bucket_name + '/' + 224 full_cloud_storage_path = ('gs://' + self._bucket_name + '/' +
172 remote_trace_location) 225 remote_trace_location)
173 trace_database.AddTrace(full_cloud_storage_path, trace_metadata) 226 self._trace_database.AddTrace(full_cloud_storage_path, trace_metadata)
174 else: 227 else:
175 print 'Trace generation failed for URL: %s' % url 228 self._logger.warning('Trace generation failed for URL: %s' % url)
176 self._tasks_lock.acquire() 229 # TODO: upload the failure
177 self._failed_tasks.append({ "url": url, "repeat": repeat})
178 self._tasks_lock.release()
179 if os.path.isfile(local_filename): 230 if os.path.isfile(local_filename):
180 self._google_storage_accessor.UploadFile(local_filename, 231 self._google_storage_accessor.UploadFile(local_filename,
181 failures_dir + remote_filename) 232 failures_dir + remote_filename)
182 print 'Uploading log' 233 self._logger.debug('Uploading log')
183 self._google_storage_accessor.UploadFile(log_filename, 234 self._google_storage_accessor.UploadFile(log_filename,
184 logs_dir + remote_filename) 235 logs_dir + remote_filename)
185 # Pop once task is finished, for accurate status tracking.
186 self._tasks_lock.acquire()
187 url = self._tasks.pop()
188 self._tasks_lock.release()
189
190 self._google_storage_accessor.UploadString(
191 json.dumps(trace_database.ToJsonDict(), indent=2),
192 traces_dir + 'trace_database.json')
193
194 if len(self._failed_tasks) > 0:
195 print 'Uploading failing URLs'
196 self._google_storage_accessor.UploadString(
197 json.dumps(self._failed_tasks, indent=2),
198 failures_dir + 'failures.json')
199
200 def _SetTaskList(self, http_body):
201 """Sets the list of tasks and starts processing them
202
203 Args:
204 http_body: JSON dictionary. See README.md for a description of the format.
205
206 Returns:
207 A string to be sent back to the client, describing the success status of
208 the request.
209 """
210 if self._IsProcessingTasks():
211 return 'Error: Already running\n'
212
213 load_parameters = json.loads(http_body)
214 try:
215 tasks = load_parameters['urls']
216 except KeyError:
217 return 'Error: invalid urls\n'
218 # Optional parameters.
219 try:
220 repeat_count = int(load_parameters.get('repeat_count', '1'))
221 except ValueError:
222 return 'Error: invalid repeat_count\n'
223 emulate_device = load_parameters.get('emulate_device', '')
224 emulate_network = load_parameters.get('emulate_network', '')
225
226 if len(tasks) == 0:
227 return 'Error: Empty task list\n'
228 else:
229 self._initial_task_count = len(tasks)
230 self._start_time = time.time()
231 self._thread = threading.Thread(
232 target = self._ProcessTasks,
233 args = (tasks, repeat_count, emulate_device, emulate_network))
234 self._thread.start()
235 return 'Starting generation of %s tasks\n' % str(self._initial_task_count)
236
237 def __call__(self, environ, start_response):
238 path = environ['PATH_INFO']
239
240 if path == '/set_tasks':
241 # Get the tasks from the HTTP body.
242 try:
243 body_size = int(environ.get('CONTENT_LENGTH', 0))
244 except (ValueError):
245 body_size = 0
246 body = environ['wsgi.input'].read(body_size)
247 data = self._SetTaskList(body)
248 elif path == '/test':
249 data = 'hello\n'
250 elif path == '/status':
251 if not self._IsProcessingTasks():
252 data = 'Idle\n'
253 else:
254 task_count = self._GetCurrentTaskCount()
255 if task_count == 0:
256 data = '%s tasks complete. Finalizing.\n' % self._initial_task_count
257 else:
258 data = 'Remaining tasks: %s / %s\n' % (
259 task_count, self._initial_task_count)
260 elapsed = time.time() - self._start_time
261 data += 'Elapsed time: %s seconds\n' % str(elapsed)
262 self._tasks_lock.acquire()
263 failed_tasks = self._failed_tasks
264 self._tasks_lock.release()
265 data += '%s failed tasks:\n' % len(failed_tasks)
266 data += json.dumps(failed_tasks, indent=2)
267 else:
268 start_response('404 NOT FOUND', [('Content-Length', '0')])
269 return iter([''])
270
271 response_headers = [
272 ('Content-type','text/plain'),
273 ('Content-Length', str(len(data)))
274 ]
275 start_response('200 OK', response_headers)
276 return iter([data])
277 236
278 237
279 def StartApp(configuration_file): 238 if __name__ == '__main__':
280 return ServerApp(configuration_file) 239 parser = argparse.ArgumentParser(
240 description='ComputeEngine Worker for Clovis')
241 parser.add_argument('--config', required=True,
242 help='Path to the configuration file.')
243 args = parser.parse_args()
244
245 # Configure logging.
246 logging.basicConfig(level=logging.WARNING)
247 worker_logger = logging.getLogger('worker')
248 worker_logger.setLevel(logging.INFO)
249
250 worker_logger.info('Reading configuration')
251 with open(args.config) as config_json:
252 worker = Worker(json.load(config_json), worker_logger)
253 worker.Start()
254
OLDNEW
« no previous file with comments | « tools/android/loading/cloud/backend/startup-script.sh ('k') | tools/android/loading/cloud/frontend/lib/common » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698