OLD | NEW |
(Empty) | |
| 1 import os |
| 2 import zipfile |
| 3 import sys |
| 4 import tempfile |
| 5 import unittest |
| 6 import shutil |
| 7 import stat |
| 8 import unicodedata |
| 9 |
| 10 from subprocess import Popen as _Popen, PIPE as _PIPE |
| 11 |
| 12 |
| 13 def _remove_dir(target): |
| 14 |
| 15 #on windows this seems to a problem |
| 16 for dir_path, dirs, files in os.walk(target): |
| 17 os.chmod(dir_path, stat.S_IWRITE) |
| 18 for filename in files: |
| 19 os.chmod(os.path.join(dir_path, filename), stat.S_IWRITE) |
| 20 shutil.rmtree(target) |
| 21 |
| 22 |
| 23 class ZippedEnvironment(unittest.TestCase): |
| 24 |
| 25 datafile = None |
| 26 dataname = None |
| 27 old_cwd = None |
| 28 |
| 29 def setUp(self): |
| 30 if self.datafile is None or self.dataname is None: |
| 31 return |
| 32 |
| 33 if not os.path.isfile(self.datafile): |
| 34 self.old_cwd = None |
| 35 return |
| 36 |
| 37 self.old_cwd = os.getcwd() |
| 38 |
| 39 self.temp_dir = tempfile.mkdtemp() |
| 40 zip_file, source, target = [None, None, None] |
| 41 try: |
| 42 zip_file = zipfile.ZipFile(self.datafile) |
| 43 for files in zip_file.namelist(): |
| 44 zip_file.extract(files, self.temp_dir) |
| 45 finally: |
| 46 if zip_file: |
| 47 zip_file.close() |
| 48 del zip_file |
| 49 |
| 50 os.chdir(os.path.join(self.temp_dir, self.dataname)) |
| 51 |
| 52 def tearDown(self): |
| 53 #Assume setUp was never completed |
| 54 if self.dataname is None or self.datafile is None: |
| 55 return |
| 56 |
| 57 try: |
| 58 if self.old_cwd: |
| 59 os.chdir(self.old_cwd) |
| 60 _remove_dir(self.temp_dir) |
| 61 except OSError: |
| 62 #sigh? |
| 63 pass |
| 64 |
| 65 |
| 66 def _which_dirs(cmd): |
| 67 result = set() |
| 68 for path in os.environ.get('PATH', '').split(os.pathsep): |
| 69 filename = os.path.join(path, cmd) |
| 70 if os.access(filename, os.X_OK): |
| 71 result.add(path) |
| 72 return result |
| 73 |
| 74 |
| 75 def run_setup_py(cmd, pypath=None, path=None, |
| 76 data_stream=0, env=None): |
| 77 """ |
| 78 Execution command for tests, separate from those used by the |
| 79 code directly to prevent accidental behavior issues |
| 80 """ |
| 81 if env is None: |
| 82 env = dict() |
| 83 for envname in os.environ: |
| 84 env[envname] = os.environ[envname] |
| 85 |
| 86 #override the python path if needed |
| 87 if pypath is not None: |
| 88 env["PYTHONPATH"] = pypath |
| 89 |
| 90 #overide the execution path if needed |
| 91 if path is not None: |
| 92 env["PATH"] = path |
| 93 if not env.get("PATH", ""): |
| 94 env["PATH"] = _which_dirs("tar").union(_which_dirs("gzip")) |
| 95 env["PATH"] = os.pathsep.join(env["PATH"]) |
| 96 |
| 97 cmd = [sys.executable, "setup.py"] + list(cmd) |
| 98 |
| 99 # http://bugs.python.org/issue8557 |
| 100 shell = sys.platform == 'win32' |
| 101 |
| 102 try: |
| 103 proc = _Popen( |
| 104 cmd, stdout=_PIPE, stderr=_PIPE, shell=shell, env=env, |
| 105 ) |
| 106 |
| 107 data = proc.communicate()[data_stream] |
| 108 except OSError: |
| 109 return 1, '' |
| 110 |
| 111 #decode the console string if needed |
| 112 if hasattr(data, "decode"): |
| 113 # use the default encoding |
| 114 data = data.decode() |
| 115 data = unicodedata.normalize('NFC', data) |
| 116 |
| 117 #communciate calls wait() |
| 118 return proc.returncode, data |
OLD | NEW |