OLD | NEW |
1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import datetime | 5 import datetime |
6 import hashlib | 6 import hashlib |
7 import json | 7 import json |
8 import os | 8 import os |
9 import pytz | 9 import pytz |
10 import time | 10 import time |
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
110 | 110 |
111 def _MakeSet(self, data): | 111 def _MakeSet(self, data): |
112 """Converts the input data string into a set of email addresses. | 112 """Converts the input data string into a set of email addresses. |
113 """ | 113 """ |
114 emails = (email.strip() for email in data.splitlines()) | 114 emails = (email.strip() for email in data.splitlines()) |
115 self._users = frozenset(email for email in emails if email) | 115 self._users = frozenset(email for email in emails if email) |
116 log.msg('Found %d users' % len(self._users)) | 116 log.msg('Found %d users' % len(self._users)) |
117 self._valid = True | 117 self._valid = True |
118 | 118 |
119 | 119 |
| 120 class _RietveldPoller(base.PollingChangeSource): |
| 121 """Polls Rietveld for any pending patch sets to build. |
| 122 |
| 123 Periodically polls Rietveld to see if any patch sets have been marked by |
| 124 users to be tried. If so, send them to the trybots. |
| 125 """ |
| 126 |
| 127 def __init__(self, get_pending_endpoint, interval, cachepath=None): |
| 128 """ |
| 129 Args: |
| 130 get_pending_endpoint: Rietveld URL string used to retrieve jobs to try. |
| 131 interval: Interval used to poll Rietveld, in seconds. |
| 132 cachepath: Path to file where state to persist between master restarts |
| 133 will be stored. |
| 134 """ |
| 135 # Set interval time in base class. |
| 136 self.pollInterval = interval |
| 137 |
| 138 # A string URL for the Rietveld endpoint to query for pending try jobs. |
| 139 self._get_pending_endpoint = get_pending_endpoint |
| 140 |
| 141 # Cursor used to keep track of next patchset(s) to try. If the cursor |
| 142 # is None, then try from the beginning. |
| 143 self._cursor = None |
| 144 |
| 145 # Try job parent of this poller. |
| 146 self._try_job_rietveld = None |
| 147 |
| 148 self._cachepath = cachepath |
| 149 |
| 150 if self._cachepath: |
| 151 if os.path.exists(self._cachepath): |
| 152 with open(self._cachepath) as f: |
| 153 # Using JSON allows us to be flexible and extend the format |
| 154 # in a compatible way. |
| 155 data = json.load(f) |
| 156 self._cursor = data.get('cursor').encode('utf-8') |
| 157 |
| 158 # base.PollingChangeSource overrides: |
| 159 def poll(self): |
| 160 """Polls Rietveld for any pending try jobs and submit them. |
| 161 |
| 162 Returns: |
| 163 A deferred objects to be called once the operation completes. |
| 164 """ |
| 165 log.msg('RietveldPoller.poll') |
| 166 d = defer.succeed(None) |
| 167 d.addCallback(self._OpenUrl) |
| 168 d.addCallback(self._ParseJson) |
| 169 d.addErrback(log.err, 'error in RietveldPoller') # eat errors |
| 170 return d |
| 171 |
| 172 def setServiceParent(self, parent): |
| 173 base.PollingChangeSource.setServiceParent(self, parent) |
| 174 self._try_job_rietveld = parent |
| 175 |
| 176 def _OpenUrl(self, _): |
| 177 """Downloads pending patch sets from Rietveld. |
| 178 |
| 179 Returns: A string containing the pending patchsets from Rietveld |
| 180 encoded as JSON. |
| 181 """ |
| 182 endpoint = self._get_pending_endpoint |
| 183 if self._cursor: |
| 184 sep = '&' if '?' in endpoint else '?' |
| 185 endpoint = endpoint + '%scursor=%s' % (sep, self._cursor) |
| 186 |
| 187 log.msg('RietveldPoller._OpenUrl: %s' % endpoint) |
| 188 return client.getPage(endpoint, agent='buildbot', timeout=2*60) |
| 189 |
| 190 def _ParseJson(self, json_string): |
| 191 """Parses the JSON pending patch set information. |
| 192 |
| 193 Args: |
| 194 json_string: A string containing the serialized JSON jobs. |
| 195 |
| 196 Returns: A list of pending try jobs. This is the list of all jobs returned |
| 197 by Rietveld, not simply the ones we tried this time. |
| 198 """ |
| 199 data = json.loads(json_string) |
| 200 d = self._try_job_rietveld.SubmitJobs(data['jobs']) |
| 201 def success_callback(value): |
| 202 self._cursor = str(data['cursor']) |
| 203 self._try_job_rietveld.processed_keys.clear() |
| 204 |
| 205 if self._cachepath: |
| 206 with open(self._cachepath, 'w') as f: |
| 207 json.dump({'cursor': self._cursor}, f) |
| 208 |
| 209 return value |
| 210 d.addCallback(success_callback) |
| 211 return d |
| 212 |
| 213 |
120 class _RietveldPollerWithCache(base.PollingChangeSource): | 214 class _RietveldPollerWithCache(base.PollingChangeSource): |
121 """Polls Rietveld for any pending patch sets to build. | 215 """Polls Rietveld for any pending patch sets to build. |
122 | 216 |
123 Periodically polls Rietveld to see if any patch sets have been marked by | 217 Periodically polls Rietveld to see if any patch sets have been marked by |
124 users to be tried. If so, send them to the trybots. Uses cursor to download | 218 users to be tried. If so, send them to the trybots. Uses cursor to download |
125 all pages within a single poll. To avoid sending same jobs, keeps a cache of | 219 all pages within a single poll. To avoid sending same jobs, keeps a cache of |
126 the jobs that were already processed thus reducing chances of a duplicate job | 220 the jobs that were already processed thus reducing chances of a duplicate job |
127 submission and increasing robustness to bugs in Rietveld. On the first poll | 221 submission and increasing robustness to bugs in Rietveld. On the first poll |
128 this cache is initialized with jobs currently pending on the Buildbot. | 222 this cache is initialized with jobs currently pending on the Buildbot. |
129 """ | 223 """ |
(...skipping 296 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
426 else: | 520 else: |
427 self.processed_keys.add(job['key']) | 521 self.processed_keys.add(job['key']) |
428 log.err('Rietveld not updated: no corresponding service found.') | 522 log.err('Rietveld not updated: no corresponding service found.') |
429 | 523 |
430 # TryJobBase overrides: | 524 # TryJobBase overrides: |
431 def setServiceParent(self, parent): | 525 def setServiceParent(self, parent): |
432 TryJobBase.setServiceParent(self, parent) | 526 TryJobBase.setServiceParent(self, parent) |
433 self._poller.setServiceParent(self) | 527 self._poller.setServiceParent(self) |
434 self._poller.master = self.master | 528 self._poller.master = self.master |
435 self._valid_users.setServiceParent(self) | 529 self._valid_users.setServiceParent(self) |
OLD | NEW |