Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(260)

Side by Side Diff: client/cipd.py

Issue 2037253002: run_isolated.py: install CIPD packages (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-py@master
Patch Set: addressed comments Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2016 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file.
4
5 """Fetches CIPD client and installs packages."""
6
7 __version__ = '0.1'
8
9 import hashlib
10 import logging
11 import optparse
12 import os
13 import platform
14 import sys
15 import tempfile
16 import time
17 import urllib
18
19 from utils import file_path
20 from utils import fs
21 from utils import net
22 from utils import subprocess42
23 from utils import tools
24 import isolated_format
25 import isolateserver
26
27
28 # .exe on Windows.
29 EXECUTABLE_SUFFIX = '.exe' if sys.platform == 'win32' else ''
30
31
32 class Error(Exception):
33 """Raised on CIPD errors."""
34
35
36 def add_cipd_options(parser):
37 group = optparse.OptionGroup(parser, 'CIPD')
38 group.add_option(
39 '--cipd-server',
40 help='URL of the CIPD server. Only relevant with --cipd-package.')
41 group.add_option(
42 '--cipd-client-package',
43 help='Package of CIPD client. See --cipd-package for format. '
44 'Only relevant with --cipd-package. '
45 'Default: "%default"',
46 default='infra/tools/cipd/${platform}:latest')
47 group.add_option(
48 '--cipd-package',
49 help='CIPD package to install. '
50 'Format: "<package name template>:<version>". '
51 'Package name template is a CIPD package name with optional '
52 '${platform} and/or ${os_ver} parameters. '
53 '${platform} will be expanded to "<os>-<architecture>" and '
54 '${os_ver} will be expanded to OS version name. '
55 'This option can be specified more than once.',
56 action='append')
57 group.add_option(
58 '--cipd-cache',
59 help='CIPD cache directory, separate from isolate cache. '
60 'Only relevant with --cipd-package. '
61 'Default: "%default".',
62 default='')
63 parser.add_option_group(group)
64
65
66 def validate_cipd_options(parser, options):
67 """Calls parser.error on first found error among cipd options."""
68 if not options.cipd_package:
69 return
70 for p in options.cipd_package:
71 try:
72 parse_package(p)
73 except ValueError as ex:
74 parser.error('Invalid cipd package %r: %s' % (p, ex))
75
76 if not options.cipd_server:
77 parser.error('--cipd-package requires non-empty --cipd-server')
78
79 if not options.cipd_client_package:
80 parser.error('--cipd-package requires non-empty --cipd-client-package')
81 try:
82 parse_package(options.cipd_client_package)
83 except ValueError as ex:
84 parser.error(
85 'Invalid cipd client package %r: %s' %
86 (options.cipd_client_package, ex))
87
88
89 class CipdClient(object):
90 """Installs packages."""
91
92 def __init__(self, binary_path, service_url=None):
93 """Initializes CipdClient.
94
95 Args:
96 binary_path (str): path to the CIPD client binary.
97 service_url (str): if not None, URL of the CIPD backend that overrides
98 the default one.
99 """
100 self.binary_path = binary_path
101 self.service_url = service_url
102
103 def ensure(
104 self, site_root, packages, cache_dir=None, tmp_dir=None, timeout=None):
105 """Ensures that packages installed in |site_root| equals |packages| set.
106
107 Blocking call.
108
109 Args:
110 site_root (str): where to install packages.
111 packages (str): list of packages to install, parsable by parse_pacakge().
112 cache_dir (str): if set, cache dir for cipd binary own cache.
113 Typically contains packages and tags.
114 tmp_dir (str): if not None, dir for temp files.
115 timeout (int): if not None, timeout in seconds for this function to run.
116
117 Raises:
118 Error if could not install packages or timed out.
119 """
120 timeoutfn = tools.sliding_timeout(timeout)
121 logging.info('Installing packages %r into %s', packages, site_root)
122
123 list_file_handle, list_file_path = tempfile.mkstemp(
124 dir=tmp_dir, prefix=u'cipd-ensure-list-', suffix='.txt')
125 list_file_closed = False
126 try:
127 for p in packages:
128 pkg, version = parse_package(p)
129 pkg = render_package_name_template(pkg)
130 os.write(list_file_handle, '%s %s\n' % (pkg, version))
131 os.close(list_file_handle)
132 list_file_closed = True
133
134 cmd = [
135 self.binary_path, 'ensure',
136 '-root', site_root,
137 '-list', list_file_path,
138 '-verbose', # this is safe because cipd-ensure does not print a lot
139 ]
140 if cache_dir:
141 cmd += ['-cache-dir', cache_dir]
142 if self.service_url:
143 cmd += ['-service-url', self.service_url]
144
145 logging.debug('Running %r', cmd)
146 process = subprocess42.Popen(
147 cmd, stdout=subprocess42.PIPE, stderr=subprocess42.PIPE)
148 output = []
149 for pipe_name, line in process.yield_any_line(timeout=0.1):
150 to = timeoutfn()
151 if to is not None and to <= 0:
152 raise Error(
153 'Could not install packages; took more than %d seconds' % timeout)
154 if not pipe_name:
155 # stdout or stderr was closed, but yield_any_line still may have
156 # something to yield.
157 continue
158 output.append(line)
159 if pipe_name == 'stderr':
160 logging.debug('cipd client: %s', line)
161 else:
162 logging.info('cipd client: %s', line)
163
164 exit_code = process.wait(timeout=timeoutfn())
165 if exit_code != 0:
166 raise Error(
167 'Could not install packages; exit code %d\noutput:%s' % (
168 exit_code, '\n'.join(output)))
169 finally:
170 if not list_file_closed:
171 os.close(list_file_handle)
172 fs.remove(list_file_path)
173
174
175 def get_platform():
176 """Returns ${platform} parameter value.
177
178 Borrowed from
179 https://chromium.googlesource.com/infra/infra/+/aaf9586/build/build.py#204
180 """
181 # linux, mac or windows.
182 platform_variant = {
183 'darwin': 'mac',
184 'linux2': 'linux',
185 'win32': 'windows',
186 }.get(sys.platform)
187 if not platform_variant:
188 raise Error('Unknown OS: %s' % sys.platform)
189
190 # amd64, 386, etc.
191 machine = platform.machine().lower()
192 platform_arch = {
193 'amd64': 'amd64',
194 'i386': '386',
195 'i686': '386',
196 'x86': '386',
197 'x86_64': 'amd64',
198 }.get(machine)
199 if not platform_arch:
200 if machine.startswith('arm'):
201 platform_arch = 'armv6l'
202 else:
203 platform_arch = 'amd64' if sys.maxsize > 2**32 else '386'
204 return '%s-%s' % (platform_variant, platform_arch)
205
206
207 def get_os_ver():
208 """Returns ${os_ver} parameter value.
209
210 Examples: 'ubuntu14_04' or 'mac10_9' or 'win6_1'.
211
212 Borrowed from
213 https://chromium.googlesource.com/infra/infra/+/aaf9586/build/build.py#204
214 """
215 if sys.platform == 'darwin':
216 # platform.mac_ver()[0] is '10.9.5'.
217 dist = platform.mac_ver()[0].split('.')
218 return 'mac%s_%s' % (dist[0], dist[1])
219
220 if sys.platform == 'linux2':
221 # platform.linux_distribution() is ('Ubuntu', '14.04', ...).
222 dist = platform.linux_distribution()
223 return '%s%s' % (dist[0].lower(), dist[1].replace('.', '_'))
224
225 if sys.platform == 'win32':
226 # platform.version() is '6.1.7601'.
227 dist = platform.version().split('.')
228 return 'win%s_%s' % (dist[0], dist[1])
229 raise Error('Unknown OS: %s' % sys.platform)
230
231
232 def render_package_name_template(template):
233 """Expands template variables in a CIPD package name template."""
234 return (template
235 .replace('${platform}', get_platform())
236 .replace('${os_ver}', get_os_ver()))
237
238
239 def parse_package(package):
240 """Parses a package in --cipd-package format.
241
242 Returns:
243 (package_name_template, version) tuple.
244
245 Raises:
246 ValueError if package name or version is not specified.
247 """
248 if not package:
249 raise ValueError('package is not specified')
250 parts = package.split(':', 1)
251 if len(parts) != 2:
252 raise ValueError('version is not specified')
253 return tuple(parts)
254
255
256 def _check_response(res, fmt, *args):
257 """Raises Error if response is bad."""
258 if not res:
259 raise Error('%s: no response' % (fmt % args))
260
261 if res.get('status') != 'SUCCESS':
262 raise Error('%s: %s' % (
263 fmt % args,
264 res.get('error_message') or 'status is %s' % res.get('status')))
265
266
267 def resolve_version(cipd_server, package_name, version, timeout=None):
268 """Resolves a package instance version (e.g. a tag) to an instance id."""
269 url = '%s/_ah/api/repo/v1/instance/resolve?%s' % (
270 cipd_server,
271 urllib.urlencode({
272 'package_name': package_name,
273 'version': version,
274 }))
275 res = net.url_read_json(url, timeout=timeout)
276 _check_response(res, 'Could not resolve version %s:%s', package_name, version)
277 instance_id = res.get('instance_id')
278 if not instance_id:
279 raise Error('Invalid resolveVersion response: no instance id')
280 return instance_id
281
282
283 def get_client_fetch_url(service_url, package_name, instance_id, timeout=None):
284 """Returns a fetch URL of CIPD client binary contents.
285
286 Raises:
287 Error if cannot retrieve fetch URL.
288 """
289 # Fetch the URL of the binary from CIPD backend.
290 package_name = render_package_name_template(package_name)
291 url = '%s/_ah/api/repo/v1/client?%s' % (service_url, urllib.urlencode({
292 'package_name': package_name,
293 'instance_id': instance_id,
294 }))
295 res = net.url_read_json(url, timeout=timeout)
296 _check_response(
297 res, 'Could not fetch CIPD client %s:%s',package_name, instance_id)
298 fetch_url = res.get('client_binary', {}).get('fetch_url')
299 if not fetch_url:
300 raise Error('Invalid fetchClientBinary response: no fetch_url')
301 return fetch_url
302
303
304 def _fetch_cipd_client(disk_cache, instance_id, fetch_url, timeoutfn):
305 """Fetches cipd binary to |disk_cache|.
306
307 Retries requests with exponential back-off.
308
309 Raises:
310 Error if could not fetch content.
311 """
312 sleep_time = 1
313 for attempt in xrange(5):
314 if attempt > 0:
315 if timeoutfn() is not None and timeoutfn() < sleep_time:
316 raise Error('Could not fetch CIPD client: timeout')
317 logging.warning('Will retry to fetch CIPD client in %ds', sleep_time)
318 time.sleep(sleep_time)
319 sleep_time *= 2
320
321 try:
322 res = net.url_open(fetch_url, timeout=timeoutfn())
323 if res:
324 disk_cache.write(instance_id, res.iter_content(64 * 1024))
325 except net.TimeoutError as ex:
326 raise Error('Could not fetch CIPD client: %s', ex)
327 except net.NetError as ex:
328 logging.warning(
329 'Could not fetch CIPD client on attempt #%d: %s', attempt + 1, ex)
330
331 raise Error('Could not fetch CIPD client after 5 retries')
332
333
334 def get_client(
335 service_url, package_name, version, cache_dir, timeout=None):
336 """Creates a CipdClient. A blocking call.
337
338 Args:
339 service_url (str): URL of the CIPD backend.
340 package_name (str): package name template of the CIPD client.
341 version (str): version of CIPD client package.
342 cache_dir: directory to store instance cache, version cache
343 and a hardlink to the client binary.
344 timeout (int): if not None, timeout in seconds for this function.
345
346 Returns:
347 CipdClient.
348
349 Raises:
350 Error if CIPD client version cannot be resolved or client cannot be fetched.
351 """
352 timeoutfn = tools.sliding_timeout(timeout)
353
354 package_name = render_package_name_template(package_name)
355
356 # Resolve version to instance id.
357 # Is it an instance id already? They look like HEX SHA1.
358 if isolated_format.is_valid_hash(version, hashlib.sha1):
359 instance_id = version
360 else:
361 # version_cache is {version_digest -> instance id} mapping.
362 # It does not take a lot of disk space.
363 version_cache = isolateserver.DiskCache(
364 unicode(os.path.join(cache_dir, 'versions')),
365 isolateserver.CachePolicies(0, 0, 300),
366 hashlib.sha1)
367
368 # Convert |version| to a string that may be used as a filename in disk cache
369 # by hashing it.
370 version_digest = hashlib.sha1(version).hexdigest()
371 try:
372 instance_id = version_cache.read(version_digest)
373 except isolateserver.CacheMiss:
374 instance_id = resolve_version(
375 service_url, package_name, version, timeout=timeoutfn())
376 version_cache.write(version_digest, instance_id)
377
378 # instance_cache is {instance_id -> client binary} mapping.
379 # It is bounded by 5 client versions.
380 instance_cache = isolateserver.DiskCache(
381 unicode(os.path.join(cache_dir, 'clients')),
382 isolateserver.CachePolicies(0, 0, 5),
383 hashlib.sha1)
384 if instance_id not in instance_cache:
385 logging.info('Fetching CIPD client %s:%s', package_name, instance_id)
386 fetch_url = get_client_fetch_url(
387 service_url, package_name, instance_id, timeout=timeoutfn())
388 _fetch_cipd_client(instance_cache, instance_id, fetch_url, timeoutfn)
389
390 # A single host can run multiple swarming bots, but ATM they do not share same
391 # root bot directory. Thus, it is safe to use the same name for the binary.
392 binary_path = unicode(os.path.join(cache_dir, 'cipd' + EXECUTABLE_SUFFIX))
393 if fs.isfile(binary_path):
394 fs.unlink(binary_path)
395 instance_cache.hardlink(instance_id, binary_path, 0511) # -r-x--x--x
396
397 return CipdClient(binary_path)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698