Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(242)

Side by Side Diff: third_party/logilab/logilab/common/shellutils.py

Issue 1920403002: [content/test/gpu] Run pylint check of gpu tests in unittest instead of PRESUBMIT (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Update path to LICENSE.txt of logilab/README.chromium Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
3 #
4 # This file is part of logilab-common.
5 #
6 # logilab-common is free software: you can redistribute it and/or modify it unde r
7 # the terms of the GNU Lesser General Public License as published by the Free
8 # Software Foundation, either version 2.1 of the License, or (at your option) an y
9 # later version.
10 #
11 # logilab-common is distributed in the hope that it will be useful, but WITHOUT
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 # details.
15 #
16 # You should have received a copy of the GNU Lesser General Public License along
17 # with logilab-common. If not, see <http://www.gnu.org/licenses/>.
18 """shell/term utilities, useful to write some python scripts instead of shell
19 scripts.
20 """
21
22 from __future__ import print_function
23
24 __docformat__ = "restructuredtext en"
25
26 import os
27 import glob
28 import shutil
29 import stat
30 import sys
31 import tempfile
32 import time
33 import fnmatch
34 import errno
35 import string
36 import random
37 import subprocess
38 from os.path import exists, isdir, islink, basename, join
39
40 from six import string_types
41 from six.moves import range, input as raw_input
42
43 from logilab.common import STD_BLACKLIST, _handle_blacklist
44 from logilab.common.compat import str_to_bytes
45 from logilab.common.deprecation import deprecated
46
47 try:
48 from logilab.common.proc import ProcInfo, NoSuchProcess
49 except ImportError:
50 # windows platform
51 class NoSuchProcess(Exception): pass
52
53 def ProcInfo(pid):
54 raise NoSuchProcess()
55
56
57 class tempdir(object):
58
59 def __enter__(self):
60 self.path = tempfile.mkdtemp()
61 return self.path
62
63 def __exit__(self, exctype, value, traceback):
64 # rmtree in all cases
65 shutil.rmtree(self.path)
66 return traceback is None
67
68
69 class pushd(object):
70 def __init__(self, directory):
71 self.directory = directory
72
73 def __enter__(self):
74 self.cwd = os.getcwd()
75 os.chdir(self.directory)
76 return self.directory
77
78 def __exit__(self, exctype, value, traceback):
79 os.chdir(self.cwd)
80
81
82 def chown(path, login=None, group=None):
83 """Same as `os.chown` function but accepting user login or group name as
84 argument. If login or group is omitted, it's left unchanged.
85
86 Note: you must own the file to chown it (or be root). Otherwise OSError is r aised.
87 """
88 if login is None:
89 uid = -1
90 else:
91 try:
92 uid = int(login)
93 except ValueError:
94 import pwd # Platforms: Unix
95 uid = pwd.getpwnam(login).pw_uid
96 if group is None:
97 gid = -1
98 else:
99 try:
100 gid = int(group)
101 except ValueError:
102 import grp
103 gid = grp.getgrnam(group).gr_gid
104 os.chown(path, uid, gid)
105
106 def mv(source, destination, _action=shutil.move):
107 """A shell-like mv, supporting wildcards.
108 """
109 sources = glob.glob(source)
110 if len(sources) > 1:
111 assert isdir(destination)
112 for filename in sources:
113 _action(filename, join(destination, basename(filename)))
114 else:
115 try:
116 source = sources[0]
117 except IndexError:
118 raise OSError('No file matching %s' % source)
119 if isdir(destination) and exists(destination):
120 destination = join(destination, basename(source))
121 try:
122 _action(source, destination)
123 except OSError as ex:
124 raise OSError('Unable to move %r to %r (%s)' % (
125 source, destination, ex))
126
127 def rm(*files):
128 """A shell-like rm, supporting wildcards.
129 """
130 for wfile in files:
131 for filename in glob.glob(wfile):
132 if islink(filename):
133 os.remove(filename)
134 elif isdir(filename):
135 shutil.rmtree(filename)
136 else:
137 os.remove(filename)
138
139 def cp(source, destination):
140 """A shell-like cp, supporting wildcards.
141 """
142 mv(source, destination, _action=shutil.copy)
143
144 def find(directory, exts, exclude=False, blacklist=STD_BLACKLIST):
145 """Recursively find files ending with the given extensions from the director y.
146
147 :type directory: str
148 :param directory:
149 directory where the search should start
150
151 :type exts: basestring or list or tuple
152 :param exts:
153 extensions or lists or extensions to search
154
155 :type exclude: boolean
156 :param exts:
157 if this argument is True, returning files NOT ending with the given
158 extensions
159
160 :type blacklist: list or tuple
161 :param blacklist:
162 optional list of files or directory to ignore, default to the value of
163 `logilab.common.STD_BLACKLIST`
164
165 :rtype: list
166 :return:
167 the list of all matching files
168 """
169 if isinstance(exts, string_types):
170 exts = (exts,)
171 if exclude:
172 def match(filename, exts):
173 for ext in exts:
174 if filename.endswith(ext):
175 return False
176 return True
177 else:
178 def match(filename, exts):
179 for ext in exts:
180 if filename.endswith(ext):
181 return True
182 return False
183 files = []
184 for dirpath, dirnames, filenames in os.walk(directory):
185 _handle_blacklist(blacklist, dirnames, filenames)
186 # don't append files if the directory is blacklisted
187 dirname = basename(dirpath)
188 if dirname in blacklist:
189 continue
190 files.extend([join(dirpath, f) for f in filenames if match(f, exts)])
191 return files
192
193
194 def globfind(directory, pattern, blacklist=STD_BLACKLIST):
195 """Recursively finds files matching glob `pattern` under `directory`.
196
197 This is an alternative to `logilab.common.shellutils.find`.
198
199 :type directory: str
200 :param directory:
201 directory where the search should start
202
203 :type pattern: basestring
204 :param pattern:
205 the glob pattern (e.g *.py, foo*.py, etc.)
206
207 :type blacklist: list or tuple
208 :param blacklist:
209 optional list of files or directory to ignore, default to the value of
210 `logilab.common.STD_BLACKLIST`
211
212 :rtype: iterator
213 :return:
214 iterator over the list of all matching files
215 """
216 for curdir, dirnames, filenames in os.walk(directory):
217 _handle_blacklist(blacklist, dirnames, filenames)
218 for fname in fnmatch.filter(filenames, pattern):
219 yield join(curdir, fname)
220
221 def unzip(archive, destdir):
222 import zipfile
223 if not exists(destdir):
224 os.mkdir(destdir)
225 zfobj = zipfile.ZipFile(archive)
226 for name in zfobj.namelist():
227 if name.endswith('/'):
228 os.mkdir(join(destdir, name))
229 else:
230 outfile = open(join(destdir, name), 'wb')
231 outfile.write(zfobj.read(name))
232 outfile.close()
233
234
235 class Execute:
236 """This is a deadlock safe version of popen2 (no stdin), that returns
237 an object with errorlevel, out and err.
238 """
239
240 def __init__(self, command):
241 cmd = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stde rr=subprocess.PIPE)
242 self.out, self.err = cmd.communicate()
243 self.status = os.WEXITSTATUS(cmd.returncode)
244
245 Execute = deprecated('Use subprocess.Popen instead')(Execute)
246
247
248 def acquire_lock(lock_file, max_try=10, delay=10, max_delay=3600):
249 """Acquire a lock represented by a file on the file system
250
251 If the process written in lock file doesn't exist anymore, we remove the
252 lock file immediately
253 If age of the lock_file is greater than max_delay, then we raise a UserWarni ng
254 """
255 count = abs(max_try)
256 while count:
257 try:
258 fd = os.open(lock_file, os.O_EXCL | os.O_RDWR | os.O_CREAT)
259 os.write(fd, str_to_bytes(str(os.getpid())) )
260 os.close(fd)
261 return True
262 except OSError as e:
263 if e.errno == errno.EEXIST:
264 try:
265 fd = open(lock_file, "r")
266 pid = int(fd.readline())
267 pi = ProcInfo(pid)
268 age = (time.time() - os.stat(lock_file)[stat.ST_MTIME])
269 if age / max_delay > 1 :
270 raise UserWarning("Command '%s' (pid %s) has locked the "
271 "file '%s' for %s minutes"
272 % (pi.name(), pid, lock_file, age/60))
273 except UserWarning:
274 raise
275 except NoSuchProcess:
276 os.remove(lock_file)
277 except Exception:
278 # The try block is not essential. can be skipped.
279 # Note: ProcInfo object is only available for linux
280 # process information are not accessible...
281 # or lock_file is no more present...
282 pass
283 else:
284 raise
285 count -= 1
286 time.sleep(delay)
287 else:
288 raise Exception('Unable to acquire %s' % lock_file)
289
290 def release_lock(lock_file):
291 """Release a lock represented by a file on the file system."""
292 os.remove(lock_file)
293
294
295 class ProgressBar(object):
296 """A simple text progression bar."""
297
298 def __init__(self, nbops, size=20, stream=sys.stdout, title=''):
299 if title:
300 self._fstr = '\r%s [%%-%ss]' % (title, int(size))
301 else:
302 self._fstr = '\r[%%-%ss]' % int(size)
303 self._stream = stream
304 self._total = nbops
305 self._size = size
306 self._current = 0
307 self._progress = 0
308 self._current_text = None
309 self._last_text_write_size = 0
310
311 def _get_text(self):
312 return self._current_text
313
314 def _set_text(self, text=None):
315 if text != self._current_text:
316 self._current_text = text
317 self.refresh()
318
319 def _del_text(self):
320 self.text = None
321
322 text = property(_get_text, _set_text, _del_text)
323
324 def update(self, offset=1, exact=False):
325 """Move FORWARD to new cursor position (cursor will never go backward).
326
327 :offset: fraction of ``size``
328
329 :exact:
330
331 - False: offset relative to current cursor position if True
332 - True: offset as an asbsolute position
333
334 """
335 if exact:
336 self._current = offset
337 else:
338 self._current += offset
339
340 progress = int((float(self._current)/float(self._total))*self._size)
341 if progress > self._progress:
342 self._progress = progress
343 self.refresh()
344
345 def refresh(self):
346 """Refresh the progression bar display."""
347 self._stream.write(self._fstr % ('=' * min(self._progress, self._size)) )
348 if self._last_text_write_size or self._current_text:
349 template = ' %%-%is' % (self._last_text_write_size)
350 text = self._current_text
351 if text is None:
352 text = ''
353 self._stream.write(template % text)
354 self._last_text_write_size = len(text.rstrip())
355 self._stream.flush()
356
357 def finish(self):
358 self._stream.write('\n')
359 self._stream.flush()
360
361
362 class DummyProgressBar(object):
363 __slot__ = ('text',)
364
365 def refresh(self):
366 pass
367 def update(self):
368 pass
369 def finish(self):
370 pass
371
372
373 _MARKER = object()
374 class progress(object):
375
376 def __init__(self, nbops=_MARKER, size=_MARKER, stream=_MARKER, title=_MARKE R, enabled=True):
377 self.nbops = nbops
378 self.size = size
379 self.stream = stream
380 self.title = title
381 self.enabled = enabled
382
383 def __enter__(self):
384 if self.enabled:
385 kwargs = {}
386 for attr in ('nbops', 'size', 'stream', 'title'):
387 value = getattr(self, attr)
388 if value is not _MARKER:
389 kwargs[attr] = value
390 self.pb = ProgressBar(**kwargs)
391 else:
392 self.pb = DummyProgressBar()
393 return self.pb
394
395 def __exit__(self, exc_type, exc_val, exc_tb):
396 self.pb.finish()
397
398 class RawInput(object):
399
400 def __init__(self, input=None, printer=None):
401 self._input = input or raw_input
402 self._print = printer
403
404 def ask(self, question, options, default):
405 assert default in options
406 choices = []
407 for option in options:
408 if option == default:
409 label = option[0].upper()
410 else:
411 label = option[0].lower()
412 if len(option) > 1:
413 label += '(%s)' % option[1:].lower()
414 choices.append((option, label))
415 prompt = "%s [%s]: " % (question,
416 '/'.join([opt[1] for opt in choices]))
417 tries = 3
418 while tries > 0:
419 answer = self._input(prompt).strip().lower()
420 if not answer:
421 return default
422 possible = [option for option, label in choices
423 if option.lower().startswith(answer)]
424 if len(possible) == 1:
425 return possible[0]
426 elif len(possible) == 0:
427 msg = '%s is not an option.' % answer
428 else:
429 msg = ('%s is an ambiguous answer, do you mean %s ?' % (
430 answer, ' or '.join(possible)))
431 if self._print:
432 self._print(msg)
433 else:
434 print(msg)
435 tries -= 1
436 raise Exception('unable to get a sensible answer')
437
438 def confirm(self, question, default_is_yes=True):
439 default = default_is_yes and 'y' or 'n'
440 answer = self.ask(question, ('y', 'n'), default)
441 return answer == 'y'
442
443 ASK = RawInput()
444
445
446 def getlogin():
447 """avoid using os.getlogin() because of strange tty / stdin problems
448 (man 3 getlogin)
449 Another solution would be to use $LOGNAME, $USER or $USERNAME
450 """
451 if sys.platform != 'win32':
452 import pwd # Platforms: Unix
453 return pwd.getpwuid(os.getuid())[0]
454 else:
455 return os.environ['USERNAME']
456
457 def generate_password(length=8, vocab=string.ascii_letters + string.digits):
458 """dumb password generation function"""
459 pwd = ''
460 for i in range(length):
461 pwd += random.choice(vocab)
462 return pwd
OLDNEW
« no previous file with comments | « third_party/logilab/logilab/common/registry.py ('k') | third_party/logilab/logilab/common/sphinx_ext.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698