OLD | NEW |
---|---|
1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import datetime | 5 import datetime |
6 import hashlib | 6 import hashlib |
7 import json | 7 import json |
8 import os | 8 import os |
9 import pytz | 9 import pytz |
10 import time | 10 import time |
(...skipping 215 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
226 """ | 226 """ |
227 Args: | 227 Args: |
228 pending_jobs_url: Rietveld URL string used to retrieve jobs to try. | 228 pending_jobs_url: Rietveld URL string used to retrieve jobs to try. |
229 interval: Interval used to poll Rietveld, in seconds. | 229 interval: Interval used to poll Rietveld, in seconds. |
230 """ | 230 """ |
231 self.pollInterval = interval | 231 self.pollInterval = interval |
232 self._try_job_rietveld = None | 232 self._try_job_rietveld = None |
233 self._pending_jobs_url = pending_jobs_url | 233 self._pending_jobs_url = pending_jobs_url |
234 self._processed_keys = None | 234 self._processed_keys = None |
235 | 235 |
236 def getPage(self, url): # pylint: disable=R0201 | |
237 """Schedules a page at `url` to be downloaded. Returns a deferred.""" | |
238 return client.getPage(url, agent='buildbot', timeout=2*60) | |
pgervais
2014/06/20 15:46:08
While we're at it, define this timeout in an insta
| |
239 | |
236 # base.PollingChangeSource overrides: | 240 # base.PollingChangeSource overrides: |
237 def poll(self): | 241 def poll(self): |
238 """Polls Rietveld for any pending try jobs and submit them. | 242 """Polls Rietveld for any pending try jobs and submit them. |
239 | 243 |
240 Returns: | 244 Returns: |
241 A deferred object to be called once the operation completes. | 245 A deferred object to be called once the operation completes. |
242 """ | 246 """ |
243 | 247 |
244 # Make sure setServiceParent was called before this method is called. | 248 # Make sure setServiceParent was called before this method is called. |
245 assert self._try_job_rietveld | 249 assert self._try_job_rietveld |
246 | 250 |
247 # Check if we have valid user list, otherwise - do not submit any jobs. This | 251 # Check if we have valid user list, otherwise - do not submit any jobs. This |
248 # is different from having no valid users in the list, in which case | 252 # is different from having no valid users in the list, in which case |
249 # SubmitJobs will correctly discard the jobs. | 253 # SubmitJobs will correctly discard the jobs. |
250 if not self._try_job_rietveld.has_valid_user_list(): | 254 if not self._try_job_rietveld.has_valid_user_list(): |
251 log.msg('[RPWC] No valid user list. Ignoring the poll request.') | 255 log.msg('[RPWC] No valid user list. Ignoring the poll request.') |
252 return | 256 return |
253 | 257 |
254 log.msg('[RPWC] Poll started') | 258 log.msg('[RPWC] Poll started') |
255 log.msg('[RPWC] Downloading %s...' % self._pending_jobs_url) | 259 log.msg('[RPWC] Downloading %s...' % self._pending_jobs_url) |
256 pollDeferred = client.getPage(self._pending_jobs_url, agent='buildbot', | 260 pollDeferred = self.getPage(self._pending_jobs_url) |
257 timeout=2*60) | |
258 pollDeferred.addCallback(self._ProcessResults) | 261 pollDeferred.addCallback(self._ProcessResults) |
259 pollDeferred.addErrback(log.err, '[RPWC] error') | 262 pollDeferred.addErrback(log.err, '[RPWC] error') |
260 return pollDeferred | 263 return pollDeferred |
261 | 264 |
262 def setServiceParent(self, parent): | 265 def setServiceParent(self, parent): |
263 base.PollingChangeSource.setServiceParent(self, parent) | 266 base.PollingChangeSource.setServiceParent(self, parent) |
264 self._try_job_rietveld = parent | 267 self._try_job_rietveld = parent |
265 | 268 |
266 @defer.inlineCallbacks | 269 @defer.inlineCallbacks |
267 def _InitProcessedKeysCache(self): | 270 def _InitProcessedKeysCache(self): |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
315 yield self._InitProcessedKeysCache() | 318 yield self._InitProcessedKeysCache() |
316 | 319 |
317 prev_cursor = None | 320 prev_cursor = None |
318 # TODO(sergiyb): Change logic to use 'more' field when it is implemented on | 321 # TODO(sergiyb): Change logic to use 'more' field when it is implemented on |
319 # Rietveld. This field will indicate whether there are more pages. | 322 # Rietveld. This field will indicate whether there are more pages. |
320 while len(results['jobs']) and prev_cursor != results['cursor']: | 323 while len(results['jobs']) and prev_cursor != results['cursor']: |
321 all_jobs.extend(results['jobs']) | 324 all_jobs.extend(results['jobs']) |
322 next_url = self._pending_jobs_url + '&cursor=%s' % str(results['cursor']) | 325 next_url = self._pending_jobs_url + '&cursor=%s' % str(results['cursor']) |
323 prev_cursor = results['cursor'] | 326 prev_cursor = results['cursor'] |
324 log.msg('[RPWC] Downloading %s...' % next_url) | 327 log.msg('[RPWC] Downloading %s...' % next_url) |
325 page_json = yield client.getPage(next_url, agent='buildbot', timeout=2*60) | 328 page_json = yield self.getPage(next_url) |
326 results = json.loads(page_json) | 329 results = json.loads(page_json) |
327 | 330 |
328 log.msg('[RPWC] Retrieved %d jobs' % len(all_jobs)) | 331 log.msg('[RPWC] Retrieved %d jobs' % len(all_jobs)) |
329 | 332 |
330 # Rietveld uses AppEngine NDB API, which serves naive UTC datetimes. | 333 # Rietveld uses AppEngine NDB API, which serves naive UTC datetimes. |
331 cutoff_timestamp = datetime.datetime.utcnow() - datetime.timedelta(hours=6) | 334 cutoff_timestamp = datetime.datetime.utcnow() - datetime.timedelta(hours=6) |
332 | 335 |
333 log.msg('[RPWC] Cache contains %d jobs' % len(self._processed_keys)) | 336 log.msg('[RPWC] Cache contains %d jobs' % len(self._processed_keys)) |
334 | 337 |
335 # Find new jobs and put them into cache. | 338 # Find new jobs and put them into cache. |
(...skipping 184 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
520 else: | 523 else: |
521 self.processed_keys.add(job['key']) | 524 self.processed_keys.add(job['key']) |
522 log.err('Rietveld not updated: no corresponding service found.') | 525 log.err('Rietveld not updated: no corresponding service found.') |
523 | 526 |
524 # TryJobBase overrides: | 527 # TryJobBase overrides: |
525 def setServiceParent(self, parent): | 528 def setServiceParent(self, parent): |
526 TryJobBase.setServiceParent(self, parent) | 529 TryJobBase.setServiceParent(self, parent) |
527 self._poller.setServiceParent(self) | 530 self._poller.setServiceParent(self) |
528 self._poller.master = self.master | 531 self._poller.master = self.master |
529 self._valid_users.setServiceParent(self) | 532 self._valid_users.setServiceParent(self) |
OLD | NEW |