OLD | NEW |
1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import datetime | 5 import datetime |
6 import hashlib | 6 import hashlib |
7 import json | 7 import json |
8 import os | 8 import os |
9 import pytz | 9 import pytz |
10 import time | 10 import time |
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
97 postdata=urllib.urlencode(params)) | 97 postdata=urllib.urlencode(params)) |
98 | 98 |
99 def _MakeSet(self, data): | 99 def _MakeSet(self, data): |
100 """Converts the input data string into a set of email addresses. | 100 """Converts the input data string into a set of email addresses. |
101 """ | 101 """ |
102 emails = (email.strip() for email in data.splitlines()) | 102 emails = (email.strip() for email in data.splitlines()) |
103 self._users = frozenset(email for email in emails if email) | 103 self._users = frozenset(email for email in emails if email) |
104 log.msg('Found %d users' % len(self._users)) | 104 log.msg('Found %d users' % len(self._users)) |
105 | 105 |
106 | 106 |
107 class _RietveldPoller(base.PollingChangeSource): | |
108 """Polls Rietveld for any pending patch sets to build. | |
109 | |
110 Periodically polls Rietveld to see if any patch sets have been marked by | |
111 users to be tried. If so, send them to the trybots. | |
112 """ | |
113 | |
114 def __init__(self, get_pending_endpoint, interval, cachepath=None): | |
115 """ | |
116 Args: | |
117 get_pending_endpoint: Rietveld URL string used to retrieve jobs to try. | |
118 interval: Interval used to poll Rietveld, in seconds. | |
119 cachepath: Path to file where state to persist between master restarts | |
120 will be stored. | |
121 """ | |
122 # Set interval time in base class. | |
123 self.pollInterval = interval | |
124 | |
125 # A string URL for the Rietveld endpoint to query for pending try jobs. | |
126 self._get_pending_endpoint = get_pending_endpoint | |
127 | |
128 # Cursor used to keep track of next patchset(s) to try. If the cursor | |
129 # is None, then try from the beginning. | |
130 self._cursor = None | |
131 | |
132 # Try job parent of this poller. | |
133 self._try_job_rietveld = None | |
134 | |
135 self._cachepath = cachepath | |
136 | |
137 if self._cachepath: | |
138 if os.path.exists(self._cachepath): | |
139 with open(self._cachepath) as f: | |
140 # Using JSON allows us to be flexible and extend the format | |
141 # in a compatible way. | |
142 data = json.load(f) | |
143 self._cursor = data.get('cursor').encode('utf-8') | |
144 | |
145 # base.PollingChangeSource overrides: | |
146 def poll(self): | |
147 """Polls Rietveld for any pending try jobs and submit them. | |
148 | |
149 Returns: | |
150 A deferred objects to be called once the operation completes. | |
151 """ | |
152 log.msg('RietveldPoller.poll') | |
153 d = defer.succeed(None) | |
154 d.addCallback(self._OpenUrl) | |
155 d.addCallback(self._ParseJson) | |
156 d.addErrback(log.err, 'error in RietveldPoller') # eat errors | |
157 return d | |
158 | |
159 def setServiceParent(self, parent): | |
160 base.PollingChangeSource.setServiceParent(self, parent) | |
161 self._try_job_rietveld = parent | |
162 | |
163 def _OpenUrl(self, _): | |
164 """Downloads pending patch sets from Rietveld. | |
165 | |
166 Returns: A string containing the pending patchsets from Rietveld | |
167 encoded as JSON. | |
168 """ | |
169 endpoint = self._get_pending_endpoint | |
170 if self._cursor: | |
171 sep = '&' if '?' in endpoint else '?' | |
172 endpoint = endpoint + '%scursor=%s' % (sep, self._cursor) | |
173 | |
174 log.msg('RietveldPoller._OpenUrl: %s' % endpoint) | |
175 return client.getPage(endpoint, agent='buildbot', timeout=2*60) | |
176 | |
177 def _ParseJson(self, json_string): | |
178 """Parses the JSON pending patch set information. | |
179 | |
180 Args: | |
181 json_string: A string containing the serialized JSON jobs. | |
182 | |
183 Returns: A list of pending try jobs. This is the list of all jobs returned | |
184 by Rietveld, not simply the ones we tried this time. | |
185 """ | |
186 data = json.loads(json_string) | |
187 d = self._try_job_rietveld.SubmitJobs(data['jobs']) | |
188 def success_callback(value): | |
189 self._cursor = str(data['cursor']) | |
190 self._try_job_rietveld.processed_keys.clear() | |
191 | |
192 if self._cachepath: | |
193 with open(self._cachepath, 'w') as f: | |
194 json.dump({'cursor': self._cursor}, f) | |
195 | |
196 return value | |
197 d.addCallback(success_callback) | |
198 return d | |
199 | |
200 | |
201 class _RietveldPollerWithCache(base.PollingChangeSource): | 107 class _RietveldPollerWithCache(base.PollingChangeSource): |
202 """Polls Rietveld for any pending patch sets to build. | 108 """Polls Rietveld for any pending patch sets to build. |
203 | 109 |
204 Periodically polls Rietveld to see if any patch sets have been marked by | 110 Periodically polls Rietveld to see if any patch sets have been marked by |
205 users to be tried. If so, send them to the trybots. Uses cursor to download | 111 users to be tried. If so, send them to the trybots. Uses cursor to download |
206 all pages within a single poll. To avoid sending same jobs, keeps a cache of | 112 all pages within a single poll. To avoid sending same jobs, keeps a cache of |
207 the jobs that were already processed thus reducing chances of a duplicate job | 113 the jobs that were already processed thus reducing chances of a duplicate job |
208 submission and increasing robustness to bugs in Rietveld. On the first poll | 114 submission and increasing robustness to bugs in Rietveld. On the first poll |
209 this cache is initialized with jobs currently pending on the Buildbot. | 115 this cache is initialized with jobs currently pending on the Buildbot. |
210 """ | 116 """ |
211 | 117 |
212 def __init__(self, pending_jobs_url, interval, cachepath=None): | 118 def __init__(self, pending_jobs_url, interval): |
213 """ | 119 """ |
214 Args: | 120 Args: |
215 pending_jobs_url: Rietveld URL string used to retrieve jobs to try. | 121 pending_jobs_url: Rietveld URL string used to retrieve jobs to try. |
216 interval: Interval used to poll Rietveld, in seconds. | 122 interval: Interval used to poll Rietveld, in seconds. |
217 """ | 123 """ |
218 self.pollInterval = interval | 124 self.pollInterval = interval |
219 self._try_job_rietveld = None | 125 self._try_job_rietveld = None |
220 self._pending_jobs_url = pending_jobs_url | 126 self._pending_jobs_url = pending_jobs_url |
221 self._cachepath = cachepath | |
222 self._processed_keys = None | 127 self._processed_keys = None |
223 | 128 |
224 # base.PollingChangeSource overrides: | 129 # base.PollingChangeSource overrides: |
225 def poll(self): | 130 def poll(self): |
226 """Polls Rietveld for any pending try jobs and submit them. | 131 """Polls Rietveld for any pending try jobs and submit them. |
227 | 132 |
228 Returns: | 133 Returns: |
229 A deferred object to be called once the operation completes. | 134 A deferred object to be called once the operation completes. |
230 """ | 135 """ |
231 | 136 |
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
343 else: | 248 else: |
344 num_removed += 1 | 249 num_removed += 1 |
345 log.msg('[RPWC] Removed %d old jobs from the cache.' % num_removed) | 250 log.msg('[RPWC] Removed %d old jobs from the cache.' % num_removed) |
346 self._processed_keys = new_processed_keys | 251 self._processed_keys = new_processed_keys |
347 | 252 |
348 | 253 |
349 class TryJobRietveld(TryJobBase): | 254 class TryJobRietveld(TryJobBase): |
350 """A try job source that gets jobs from pending Rietveld patch sets.""" | 255 """A try job source that gets jobs from pending Rietveld patch sets.""" |
351 | 256 |
352 def __init__(self, name, pools, properties=None, last_good_urls=None, | 257 def __init__(self, name, pools, properties=None, last_good_urls=None, |
353 code_review_sites=None, project=None, filter_master=False, | 258 code_review_sites=None, project=None, filter_master=False): |
354 cachepath=None, cache_processed_jobs=False): | |
355 """Creates a try job source for Rietveld patch sets. | 259 """Creates a try job source for Rietveld patch sets. |
356 | 260 |
357 Args: | 261 Args: |
358 name: Name of this scheduler. | 262 name: Name of this scheduler. |
359 pools: No idea. | 263 pools: No idea. |
360 properties: Extra build properties specific to this scheduler. | 264 properties: Extra build properties specific to this scheduler. |
361 last_good_urls: Dictionary of project to last known good build URL. | 265 last_good_urls: Dictionary of project to last known good build URL. |
362 code_review_sites: Dictionary of project to code review site. This | 266 code_review_sites: Dictionary of project to code review site. This |
363 class care only about the 'chrome' project. | 267 class care only about the 'chrome' project. |
364 project: The name of the project whose review site URL to extract. | 268 project: The name of the project whose review site URL to extract. |
365 If the project is not found in the dictionary, an exception is | 269 If the project is not found in the dictionary, an exception is |
366 raised. | 270 raised. |
367 filter_master: Filter try jobs by master name. Necessary if several try | 271 filter_master: Filter try jobs by master name. Necessary if several try |
368 masters share the same rietveld instance. | 272 masters share the same rietveld instance. |
369 """ | 273 """ |
370 TryJobBase.__init__(self, name, pools, properties, | 274 TryJobBase.__init__(self, name, pools, properties, |
371 last_good_urls, code_review_sites) | 275 last_good_urls, code_review_sites) |
372 endpoint = self._GetRietveldEndPointForProject( | 276 endpoint = self._GetRietveldEndPointForProject( |
373 code_review_sites, project, filter_master) | 277 code_review_sites, project, filter_master) |
374 | 278 |
375 if cache_processed_jobs: | 279 self._poller = _RietveldPollerWithCache(endpoint, interval=10) |
376 self._poller = _RietveldPollerWithCache(endpoint, interval=10) | |
377 else: | |
378 self._poller = _RietveldPoller(endpoint, interval=10, cachepath=cachepath) | |
379 self._valid_users = _ValidUserPoller(interval=12 * 60 * 60) | 280 self._valid_users = _ValidUserPoller(interval=12 * 60 * 60) |
380 self._project = project | 281 self._project = project |
381 | 282 |
382 # Cleared by _RietveldPoller._ParseJson's success callback. | 283 # Cleared by _RietveldPoller._ParseJson's success callback. |
383 self.processed_keys = set() | 284 self.processed_keys = set() |
384 | 285 |
385 log.msg('TryJobRietveld created, get_pending_endpoint=%s ' | 286 log.msg('TryJobRietveld created, get_pending_endpoint=%s ' |
386 'project=%s' % (endpoint, project)) | 287 'project=%s' % (endpoint, project)) |
387 | 288 |
388 @staticmethod | 289 @staticmethod |
(...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
499 else: | 400 else: |
500 self.processed_keys.add(job['key']) | 401 self.processed_keys.add(job['key']) |
501 log.err('Rietveld not updated: no corresponding service found.') | 402 log.err('Rietveld not updated: no corresponding service found.') |
502 | 403 |
503 # TryJobBase overrides: | 404 # TryJobBase overrides: |
504 def setServiceParent(self, parent): | 405 def setServiceParent(self, parent): |
505 TryJobBase.setServiceParent(self, parent) | 406 TryJobBase.setServiceParent(self, parent) |
506 self._poller.setServiceParent(self) | 407 self._poller.setServiceParent(self) |
507 self._poller.master = self.master | 408 self._poller.master = self.master |
508 self._valid_users.setServiceParent(self) | 409 self._valid_users.setServiceParent(self) |
OLD | NEW |