OLD | NEW |
1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import datetime | 5 import datetime |
6 import hashlib | 6 import hashlib |
7 import json | 7 import json |
8 import os | 8 import os |
9 import pytz | 9 import pytz |
10 import time | 10 import time |
(...skipping 204 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
215 """Polls Rietveld for any pending patch sets to build. | 215 """Polls Rietveld for any pending patch sets to build. |
216 | 216 |
217 Periodically polls Rietveld to see if any patch sets have been marked by | 217 Periodically polls Rietveld to see if any patch sets have been marked by |
218 users to be tried. If so, send them to the trybots. Uses cursor to download | 218 users to be tried. If so, send them to the trybots. Uses cursor to download |
219 all pages within a single poll. To avoid sending same jobs, keeps a cache of | 219 all pages within a single poll. To avoid sending same jobs, keeps a cache of |
220 the jobs that were already processed thus reducing chances of a duplicate job | 220 the jobs that were already processed thus reducing chances of a duplicate job |
221 submission and increasing robustness to bugs in Rietveld. On the first poll | 221 submission and increasing robustness to bugs in Rietveld. On the first poll |
222 this cache is initialized with jobs currently pending on the Buildbot. | 222 this cache is initialized with jobs currently pending on the Buildbot. |
223 """ | 223 """ |
224 | 224 |
225 def __init__(self, pending_jobs_url, interval): | 225 def __init__(self, pending_jobs_url, interval, timeout=2*60): |
226 """ | 226 """ |
227 Args: | 227 Args: |
228 pending_jobs_url: Rietveld URL string used to retrieve jobs to try. | 228 pending_jobs_url: Rietveld URL string used to retrieve jobs to try. |
229 interval: Interval used to poll Rietveld, in seconds. | 229 interval: Interval used to poll Rietveld, in seconds. |
230 """ | 230 """ |
231 self.pollInterval = interval | 231 self.pollInterval = interval |
232 self._try_job_rietveld = None | 232 self._try_job_rietveld = None |
233 self._pending_jobs_url = pending_jobs_url | 233 self._pending_jobs_url = pending_jobs_url |
234 self._processed_keys = None | 234 self._processed_keys = None |
| 235 self.timeout = timeout |
| 236 |
| 237 def getPage(self, url): # pylint: disable=R0201 |
| 238 """Schedules a page at `url` to be downloaded. Returns a deferred.""" |
| 239 return client.getPage(url, agent='buildbot', timeout=self.timeout) |
235 | 240 |
236 # base.PollingChangeSource overrides: | 241 # base.PollingChangeSource overrides: |
237 def poll(self): | 242 def poll(self): |
238 """Polls Rietveld for any pending try jobs and submit them. | 243 """Polls Rietveld for any pending try jobs and submit them. |
239 | 244 |
240 Returns: | 245 Returns: |
241 A deferred object to be called once the operation completes. | 246 A deferred object to be called once the operation completes. |
242 """ | 247 """ |
243 | 248 |
244 # Make sure setServiceParent was called before this method is called. | 249 # Make sure setServiceParent was called before this method is called. |
245 assert self._try_job_rietveld | 250 assert self._try_job_rietveld |
246 | 251 |
247 # Check if we have valid user list, otherwise - do not submit any jobs. This | 252 # Check if we have valid user list, otherwise - do not submit any jobs. This |
248 # is different from having no valid users in the list, in which case | 253 # is different from having no valid users in the list, in which case |
249 # SubmitJobs will correctly discard the jobs. | 254 # SubmitJobs will correctly discard the jobs. |
250 if not self._try_job_rietveld.has_valid_user_list(): | 255 if not self._try_job_rietveld.has_valid_user_list(): |
251 log.msg('[RPWC] No valid user list. Ignoring the poll request.') | 256 log.msg('[RPWC] No valid user list. Ignoring the poll request.') |
252 return | 257 return |
253 | 258 |
254 log.msg('[RPWC] Poll started') | 259 log.msg('[RPWC] Poll started') |
255 log.msg('[RPWC] Downloading %s...' % self._pending_jobs_url) | 260 log.msg('[RPWC] Downloading %s...' % self._pending_jobs_url) |
256 pollDeferred = client.getPage(self._pending_jobs_url, agent='buildbot', | 261 pollDeferred = self.getPage(self._pending_jobs_url) |
257 timeout=2*60) | |
258 pollDeferred.addCallback(self._ProcessResults) | 262 pollDeferred.addCallback(self._ProcessResults) |
259 pollDeferred.addErrback(log.err, '[RPWC] error') | 263 pollDeferred.addErrback(log.err, '[RPWC] error') |
260 return pollDeferred | 264 return pollDeferred |
261 | 265 |
262 def setServiceParent(self, parent): | 266 def setServiceParent(self, parent): |
263 base.PollingChangeSource.setServiceParent(self, parent) | 267 base.PollingChangeSource.setServiceParent(self, parent) |
264 self._try_job_rietveld = parent | 268 self._try_job_rietveld = parent |
265 | 269 |
266 @defer.inlineCallbacks | 270 @defer.inlineCallbacks |
267 def _InitProcessedKeysCache(self): | 271 def _InitProcessedKeysCache(self): |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
315 yield self._InitProcessedKeysCache() | 319 yield self._InitProcessedKeysCache() |
316 | 320 |
317 prev_cursor = None | 321 prev_cursor = None |
318 # TODO(sergiyb): Change logic to use 'more' field when it is implemented on | 322 # TODO(sergiyb): Change logic to use 'more' field when it is implemented on |
319 # Rietveld. This field will indicate whether there are more pages. | 323 # Rietveld. This field will indicate whether there are more pages. |
320 while len(results['jobs']) and prev_cursor != results['cursor']: | 324 while len(results['jobs']) and prev_cursor != results['cursor']: |
321 all_jobs.extend(results['jobs']) | 325 all_jobs.extend(results['jobs']) |
322 next_url = self._pending_jobs_url + '&cursor=%s' % str(results['cursor']) | 326 next_url = self._pending_jobs_url + '&cursor=%s' % str(results['cursor']) |
323 prev_cursor = results['cursor'] | 327 prev_cursor = results['cursor'] |
324 log.msg('[RPWC] Downloading %s...' % next_url) | 328 log.msg('[RPWC] Downloading %s...' % next_url) |
325 page_json = yield client.getPage(next_url, agent='buildbot', timeout=2*60) | 329 page_json = yield self.getPage(next_url) |
326 results = json.loads(page_json) | 330 results = json.loads(page_json) |
327 | 331 |
328 log.msg('[RPWC] Retrieved %d jobs' % len(all_jobs)) | 332 log.msg('[RPWC] Retrieved %d jobs' % len(all_jobs)) |
329 | 333 |
330 # Rietveld uses AppEngine NDB API, which serves naive UTC datetimes. | 334 # Rietveld uses AppEngine NDB API, which serves naive UTC datetimes. |
331 cutoff_timestamp = datetime.datetime.utcnow() - datetime.timedelta(hours=6) | 335 cutoff_timestamp = datetime.datetime.utcnow() - datetime.timedelta(hours=6) |
332 | 336 |
333 log.msg('[RPWC] Cache contains %d jobs' % len(self._processed_keys)) | 337 log.msg('[RPWC] Cache contains %d jobs' % len(self._processed_keys)) |
334 | 338 |
335 # Find new jobs and put them into cache. | 339 # Find new jobs and put them into cache. |
(...skipping 184 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
520 else: | 524 else: |
521 self.processed_keys.add(job['key']) | 525 self.processed_keys.add(job['key']) |
522 log.err('Rietveld not updated: no corresponding service found.') | 526 log.err('Rietveld not updated: no corresponding service found.') |
523 | 527 |
524 # TryJobBase overrides: | 528 # TryJobBase overrides: |
525 def setServiceParent(self, parent): | 529 def setServiceParent(self, parent): |
526 TryJobBase.setServiceParent(self, parent) | 530 TryJobBase.setServiceParent(self, parent) |
527 self._poller.setServiceParent(self) | 531 self._poller.setServiceParent(self) |
528 self._poller.master = self.master | 532 self._poller.master = self.master |
529 self._valid_users.setServiceParent(self) | 533 self._valid_users.setServiceParent(self) |
OLD | NEW |