Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 import datetime | 5 import datetime |
| 6 import hashlib | 6 import hashlib |
| 7 import json | 7 import json |
| 8 import os | 8 import os |
| 9 import pytz | 9 import pytz |
| 10 import time | 10 import time |
| (...skipping 215 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 226 """ | 226 """ |
| 227 Args: | 227 Args: |
| 228 pending_jobs_url: Rietveld URL string used to retrieve jobs to try. | 228 pending_jobs_url: Rietveld URL string used to retrieve jobs to try. |
| 229 interval: Interval used to poll Rietveld, in seconds. | 229 interval: Interval used to poll Rietveld, in seconds. |
| 230 """ | 230 """ |
| 231 self.pollInterval = interval | 231 self.pollInterval = interval |
| 232 self._try_job_rietveld = None | 232 self._try_job_rietveld = None |
| 233 self._pending_jobs_url = pending_jobs_url | 233 self._pending_jobs_url = pending_jobs_url |
| 234 self._processed_keys = None | 234 self._processed_keys = None |
| 235 | 235 |
| 236 def getPage(self, url): # pylint: disable=R0201 | |
| 237 """Schedules a page at `url` to be downloaded. Returns a deferred.""" | |
| 238 return client.getPage(url, agent='buildbot', timeout=2*60) | |
|
pgervais
2014/06/20 15:46:08
While we're at it, define this timeout in an insta
| |
| 239 | |
| 236 # base.PollingChangeSource overrides: | 240 # base.PollingChangeSource overrides: |
| 237 def poll(self): | 241 def poll(self): |
| 238 """Polls Rietveld for any pending try jobs and submit them. | 242 """Polls Rietveld for any pending try jobs and submit them. |
| 239 | 243 |
| 240 Returns: | 244 Returns: |
| 241 A deferred object to be called once the operation completes. | 245 A deferred object to be called once the operation completes. |
| 242 """ | 246 """ |
| 243 | 247 |
| 244 # Make sure setServiceParent was called before this method is called. | 248 # Make sure setServiceParent was called before this method is called. |
| 245 assert self._try_job_rietveld | 249 assert self._try_job_rietveld |
| 246 | 250 |
| 247 # Check if we have valid user list, otherwise - do not submit any jobs. This | 251 # Check if we have valid user list, otherwise - do not submit any jobs. This |
| 248 # is different from having no valid users in the list, in which case | 252 # is different from having no valid users in the list, in which case |
| 249 # SubmitJobs will correctly discard the jobs. | 253 # SubmitJobs will correctly discard the jobs. |
| 250 if not self._try_job_rietveld.has_valid_user_list(): | 254 if not self._try_job_rietveld.has_valid_user_list(): |
| 251 log.msg('[RPWC] No valid user list. Ignoring the poll request.') | 255 log.msg('[RPWC] No valid user list. Ignoring the poll request.') |
| 252 return | 256 return |
| 253 | 257 |
| 254 log.msg('[RPWC] Poll started') | 258 log.msg('[RPWC] Poll started') |
| 255 log.msg('[RPWC] Downloading %s...' % self._pending_jobs_url) | 259 log.msg('[RPWC] Downloading %s...' % self._pending_jobs_url) |
| 256 pollDeferred = client.getPage(self._pending_jobs_url, agent='buildbot', | 260 pollDeferred = self.getPage(self._pending_jobs_url) |
| 257 timeout=2*60) | |
| 258 pollDeferred.addCallback(self._ProcessResults) | 261 pollDeferred.addCallback(self._ProcessResults) |
| 259 pollDeferred.addErrback(log.err, '[RPWC] error') | 262 pollDeferred.addErrback(log.err, '[RPWC] error') |
| 260 return pollDeferred | 263 return pollDeferred |
| 261 | 264 |
| 262 def setServiceParent(self, parent): | 265 def setServiceParent(self, parent): |
| 263 base.PollingChangeSource.setServiceParent(self, parent) | 266 base.PollingChangeSource.setServiceParent(self, parent) |
| 264 self._try_job_rietveld = parent | 267 self._try_job_rietveld = parent |
| 265 | 268 |
| 266 @defer.inlineCallbacks | 269 @defer.inlineCallbacks |
| 267 def _InitProcessedKeysCache(self): | 270 def _InitProcessedKeysCache(self): |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 315 yield self._InitProcessedKeysCache() | 318 yield self._InitProcessedKeysCache() |
| 316 | 319 |
| 317 prev_cursor = None | 320 prev_cursor = None |
| 318 # TODO(sergiyb): Change logic to use 'more' field when it is implemented on | 321 # TODO(sergiyb): Change logic to use 'more' field when it is implemented on |
| 319 # Rietveld. This field will indicate whether there are more pages. | 322 # Rietveld. This field will indicate whether there are more pages. |
| 320 while len(results['jobs']) and prev_cursor != results['cursor']: | 323 while len(results['jobs']) and prev_cursor != results['cursor']: |
| 321 all_jobs.extend(results['jobs']) | 324 all_jobs.extend(results['jobs']) |
| 322 next_url = self._pending_jobs_url + '&cursor=%s' % str(results['cursor']) | 325 next_url = self._pending_jobs_url + '&cursor=%s' % str(results['cursor']) |
| 323 prev_cursor = results['cursor'] | 326 prev_cursor = results['cursor'] |
| 324 log.msg('[RPWC] Downloading %s...' % next_url) | 327 log.msg('[RPWC] Downloading %s...' % next_url) |
| 325 page_json = yield client.getPage(next_url, agent='buildbot', timeout=2*60) | 328 page_json = yield self.getPage(next_url) |
| 326 results = json.loads(page_json) | 329 results = json.loads(page_json) |
| 327 | 330 |
| 328 log.msg('[RPWC] Retrieved %d jobs' % len(all_jobs)) | 331 log.msg('[RPWC] Retrieved %d jobs' % len(all_jobs)) |
| 329 | 332 |
| 330 # Rietveld uses AppEngine NDB API, which serves naive UTC datetimes. | 333 # Rietveld uses AppEngine NDB API, which serves naive UTC datetimes. |
| 331 cutoff_timestamp = datetime.datetime.utcnow() - datetime.timedelta(hours=6) | 334 cutoff_timestamp = datetime.datetime.utcnow() - datetime.timedelta(hours=6) |
| 332 | 335 |
| 333 log.msg('[RPWC] Cache contains %d jobs' % len(self._processed_keys)) | 336 log.msg('[RPWC] Cache contains %d jobs' % len(self._processed_keys)) |
| 334 | 337 |
| 335 # Find new jobs and put them into cache. | 338 # Find new jobs and put them into cache. |
| (...skipping 184 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 520 else: | 523 else: |
| 521 self.processed_keys.add(job['key']) | 524 self.processed_keys.add(job['key']) |
| 522 log.err('Rietveld not updated: no corresponding service found.') | 525 log.err('Rietveld not updated: no corresponding service found.') |
| 523 | 526 |
| 524 # TryJobBase overrides: | 527 # TryJobBase overrides: |
| 525 def setServiceParent(self, parent): | 528 def setServiceParent(self, parent): |
| 526 TryJobBase.setServiceParent(self, parent) | 529 TryJobBase.setServiceParent(self, parent) |
| 527 self._poller.setServiceParent(self) | 530 self._poller.setServiceParent(self) |
| 528 self._poller.master = self.master | 531 self._poller.master = self.master |
| 529 self._valid_users.setServiceParent(self) | 532 self._valid_users.setServiceParent(self) |
| OLD | NEW |