| OLD | NEW |
| 1 #!/usr/bin/env python | 1 #!/usr/bin/env python |
| 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | 2 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 3 # Use of this source code is governed by a BSD-style license that can be | 3 # Use of this source code is governed by a BSD-style license that can be |
| 4 # found in the LICENSE file. | 4 # found in the LICENSE file. |
| 5 | 5 |
| 6 import os | 6 import os |
| 7 import shutil | 7 import shutil |
| 8 import subprocess | 8 import subprocess |
| 9 import sys | 9 import sys |
| 10 import tempfile | 10 import tempfile |
| 11 import unittest | 11 import unittest |
| 12 import zipfile | 12 import zipfile |
| 13 | 13 |
| 14 SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) | 14 SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) |
| 15 PARENT_DIR = os.path.dirname(SCRIPT_DIR) | 15 PARENT_DIR = os.path.dirname(SCRIPT_DIR) |
| 16 | 16 |
| 17 sys.path.append(PARENT_DIR) | 17 sys.path.append(PARENT_DIR) |
| 18 | 18 |
| 19 import oshelpers | 19 import oshelpers |
| 20 | 20 |
| 21 class RunZipError(subprocess.CalledProcessError): | 21 class RunError(subprocess.CalledProcessError): |
| 22 def __init__(self, retcode, command, output, error_output): | 22 def __init__(self, retcode, command, output, error_output): |
| 23 subprocess.CalledProcessError.__init__(self, retcode, command) | 23 subprocess.CalledProcessError.__init__(self, retcode, command) |
| 24 self.output = output | 24 self.output = output |
| 25 self.error_output = error_output | 25 self.error_output = error_output |
| 26 | 26 |
| 27 def __str__(self): | 27 def __str__(self): |
| 28 msg = subprocess.CalledProcessError.__str__(self) | 28 msg = subprocess.CalledProcessError.__str__(self) |
| 29 msg += '.\nstdout: """%s"""' % (self.output,) | 29 msg += '.\nstdout: """%s"""' % (self.output,) |
| 30 msg += '.\nstderr: """%s"""' % (self.error_output,) | 30 msg += '.\nstderr: """%s"""' % (self.error_output,) |
| 31 return msg | 31 return msg |
| 32 | 32 |
| 33 | 33 |
| 34 def RunZip(args, cwd): | 34 def RunCmd(cmd, args, cwd, env=None): |
| 35 command = [sys.executable, 'oshelpers.py', 'zip'] + args | 35 env = env or os.environ |
| 36 command = [sys.executable, 'oshelpers.py', cmd] + args |
| 36 process = subprocess.Popen(stdout=subprocess.PIPE, | 37 process = subprocess.Popen(stdout=subprocess.PIPE, |
| 37 stderr=subprocess.PIPE, | 38 stderr=subprocess.PIPE, |
| 38 args=command, | 39 args=command, |
| 39 cwd=cwd) | 40 cwd=cwd, |
| 41 env=env) |
| 40 output, error_output = process.communicate() | 42 output, error_output = process.communicate() |
| 41 retcode = process.returncode | 43 retcode = process.returncode |
| 42 | 44 |
| 43 if retcode: | 45 if retcode: |
| 44 raise RunZipError(retcode, command, output, error_output) | 46 raise RunError(retcode, command, output, error_output) |
| 45 return output, error_output | 47 return output, error_output |
| 46 | 48 |
| 47 | 49 |
| 48 class TestZip(unittest.TestCase): | 50 class TestZip(unittest.TestCase): |
| 49 def setUp(self): | 51 def setUp(self): |
| 50 # make zipname -> "testFooBar.zip" | 52 # make zipname -> "testFooBar.zip" |
| 51 self.zipname = self.id().split('.')[-1] + '.zip' | 53 self.zipname = self.id().split('.')[-1] + '.zip' |
| 52 self.zipfile = None | 54 self.zipfile = None |
| 53 self.tempdir = tempfile.mkdtemp() | 55 self.tempdir = tempfile.mkdtemp() |
| 54 shutil.copy(os.path.join(PARENT_DIR, 'oshelpers.py'), | 56 shutil.copy(os.path.join(PARENT_DIR, 'oshelpers.py'), |
| 55 self.tempdir) | 57 self.tempdir) |
| 56 | 58 |
| 57 def tearDown(self): | 59 def tearDown(self): |
| 58 if self.zipfile: | 60 if self.zipfile: |
| 59 self.zipfile.close() | 61 self.zipfile.close() |
| 60 shutil.rmtree(self.tempdir) | 62 shutil.rmtree(self.tempdir) |
| 61 | 63 |
| 62 def GetTempPath(self, basename): | 64 def GetTempPath(self, basename): |
| 63 return os.path.join(self.tempdir, basename) | 65 return os.path.join(self.tempdir, basename) |
| 64 | 66 |
| 65 def MakeFile(self, rel_path, size): | 67 def MakeFile(self, rel_path, size): |
| 66 with open(os.path.join(self.tempdir, rel_path), 'wb') as f: | 68 with open(os.path.join(self.tempdir, rel_path), 'wb') as f: |
| 67 f.write('0' * size) | 69 f.write('0' * size) |
| 68 return rel_path | 70 return rel_path |
| 69 | 71 |
| 70 def RunZip(self, *args): | 72 def RunZip(self, *args): |
| 71 return RunZip(*args, cwd=self.tempdir) | 73 return RunCmd('zip', list(args), cwd=self.tempdir) |
| 72 | 74 |
| 73 def OpenZipFile(self): | 75 def OpenZipFile(self): |
| 74 self.zipfile = zipfile.ZipFile(self.GetTempPath(self.zipname), 'r') | 76 self.zipfile = zipfile.ZipFile(self.GetTempPath(self.zipname), 'r') |
| 75 | 77 |
| 76 def CloseZipFile(self): | 78 def CloseZipFile(self): |
| 77 self.zipfile.close() | 79 self.zipfile.close() |
| 78 self.zipfile = None | 80 self.zipfile = None |
| 79 | 81 |
| 80 def GetZipInfo(self, path): | 82 def GetZipInfo(self, path): |
| 81 return self.zipfile.getinfo(oshelpers.OSMakeZipPath(path)) | 83 return self.zipfile.getinfo(oshelpers.OSMakeZipPath(path)) |
| 82 | 84 |
| 83 | 85 |
| 84 def testNothingToDo(self): | 86 def testNothingToDo(self): |
| 85 self.assertRaises(subprocess.CalledProcessError, self.RunZip, | 87 self.assertRaises(subprocess.CalledProcessError, self.RunZip, |
| 86 [self.zipname, 'nonexistent_file']) | 88 self.zipname, 'nonexistent_file') |
| 87 self.assertFalse(os.path.exists(self.zipname)) | 89 self.assertFalse(os.path.exists(self.zipname)) |
| 88 | 90 |
| 89 def testAddSomeFiles(self): | 91 def testAddSomeFiles(self): |
| 90 file1 = self.MakeFile('file1', 1024) | 92 file1 = self.MakeFile('file1', 1024) |
| 91 file2 = self.MakeFile('file2', 3354) | 93 file2 = self.MakeFile('file2', 3354) |
| 92 self.RunZip([self.zipname, file1, file2]) | 94 self.RunZip(self.zipname, file1, file2) |
| 93 self.OpenZipFile() | 95 self.OpenZipFile() |
| 94 self.assertEqual(len(self.zipfile.namelist()), 2) | 96 self.assertEqual(len(self.zipfile.namelist()), 2) |
| 95 self.assertEqual(self.GetZipInfo(file1).file_size, 1024) | 97 self.assertEqual(self.GetZipInfo(file1).file_size, 1024) |
| 96 self.assertEqual(self.GetZipInfo(file2).file_size, 3354) | 98 self.assertEqual(self.GetZipInfo(file2).file_size, 3354) |
| 97 # make sure files are added in order | 99 # make sure files are added in order |
| 98 self.assertEqual(self.zipfile.namelist()[0], file1) | 100 self.assertEqual(self.zipfile.namelist()[0], file1) |
| 99 | 101 |
| 100 def testAddFilesWithGlob(self): | 102 def testAddFilesWithGlob(self): |
| 101 self.MakeFile('file1', 1024) | 103 self.MakeFile('file1', 1024) |
| 102 self.MakeFile('file2', 3354) | 104 self.MakeFile('file2', 3354) |
| 103 self.RunZip([self.zipname, 'file*']) | 105 self.RunZip(self.zipname, 'file*') |
| 104 self.OpenZipFile() | 106 self.OpenZipFile() |
| 105 self.assertEqual(len(self.zipfile.namelist()), 2) | 107 self.assertEqual(len(self.zipfile.namelist()), 2) |
| 106 | 108 |
| 107 def testAddDir(self): | 109 def testAddDir(self): |
| 108 os.mkdir(self.GetTempPath('dir1')) | 110 os.mkdir(self.GetTempPath('dir1')) |
| 109 self.RunZip([self.zipname, 'dir1']) | 111 self.RunZip(self.zipname, 'dir1') |
| 110 self.OpenZipFile() | 112 self.OpenZipFile() |
| 111 self.assertEqual(len(self.zipfile.namelist()), 1) | 113 self.assertEqual(len(self.zipfile.namelist()), 1) |
| 112 self.assertRaises(KeyError, self.zipfile.getinfo, 'dir1') | 114 self.assertRaises(KeyError, self.zipfile.getinfo, 'dir1') |
| 113 self.zipfile.getinfo('dir1/') | 115 self.zipfile.getinfo('dir1/') |
| 114 | 116 |
| 115 def testAddRecursive(self): | 117 def testAddRecursive(self): |
| 116 os.mkdir(self.GetTempPath('dir1')) | 118 os.mkdir(self.GetTempPath('dir1')) |
| 117 self.MakeFile(os.path.join('dir1', 'file1'), 256) | 119 self.MakeFile(os.path.join('dir1', 'file1'), 256) |
| 118 os.mkdir(self.GetTempPath(os.path.join('dir1', 'dir2'))) | 120 os.mkdir(self.GetTempPath(os.path.join('dir1', 'dir2'))) |
| 119 self.MakeFile(os.path.join('dir1', 'dir2', 'file2'), 1234) | 121 self.MakeFile(os.path.join('dir1', 'dir2', 'file2'), 1234) |
| 120 self.RunZip([self.zipname, '-r', 'dir1']) | 122 self.RunZip(self.zipname, '-r', 'dir1') |
| 121 self.OpenZipFile() | 123 self.OpenZipFile() |
| 122 self.assertEqual(len(self.zipfile.namelist()), 4) | 124 self.assertEqual(len(self.zipfile.namelist()), 4) |
| 123 | 125 |
| 124 def testUpdate(self): | 126 def testUpdate(self): |
| 125 file1 = self.MakeFile('file1', 1223) | 127 file1 = self.MakeFile('file1', 1223) |
| 126 self.RunZip([self.zipname, file1]) | 128 self.RunZip(self.zipname, file1) |
| 127 self.OpenZipFile() | 129 self.OpenZipFile() |
| 128 self.assertEqual(self.GetZipInfo(file1).file_size, 1223) | 130 self.assertEqual(self.GetZipInfo(file1).file_size, 1223) |
| 129 | 131 |
| 130 file1 = self.MakeFile('file1', 2334) | 132 file1 = self.MakeFile('file1', 2334) |
| 131 self.RunZip([self.zipname, file1]) | 133 self.RunZip(self.zipname, file1) |
| 132 self.OpenZipFile() | 134 self.OpenZipFile() |
| 133 self.assertEqual(len(self.zipfile.namelist()), 1) | 135 self.assertEqual(len(self.zipfile.namelist()), 1) |
| 134 self.assertEqual(self.GetZipInfo(file1).file_size, 2334) | 136 self.assertEqual(self.GetZipInfo(file1).file_size, 2334) |
| 135 | 137 |
| 136 def testUpdateOneFileOutOfMany(self): | 138 def testUpdateOneFileOutOfMany(self): |
| 137 file1 = self.MakeFile('file1', 128) | 139 file1 = self.MakeFile('file1', 128) |
| 138 file2 = self.MakeFile('file2', 256) | 140 file2 = self.MakeFile('file2', 256) |
| 139 file3 = self.MakeFile('file3', 512) | 141 file3 = self.MakeFile('file3', 512) |
| 140 file4 = self.MakeFile('file4', 1024) | 142 file4 = self.MakeFile('file4', 1024) |
| 141 self.RunZip([self.zipname, file1, file2, file3, file4]) | 143 self.RunZip(self.zipname, file1, file2, file3, file4) |
| 142 self.OpenZipFile() | 144 self.OpenZipFile() |
| 143 self.assertEqual(len(self.zipfile.namelist()), 4) | 145 self.assertEqual(len(self.zipfile.namelist()), 4) |
| 144 self.CloseZipFile() | 146 self.CloseZipFile() |
| 145 | 147 |
| 146 file3 = self.MakeFile('file3', 768) | 148 file3 = self.MakeFile('file3', 768) |
| 147 self.RunZip([self.zipname, file3]) | 149 self.RunZip(self.zipname, file3) |
| 148 self.OpenZipFile() | 150 self.OpenZipFile() |
| 149 self.assertEqual(len(self.zipfile.namelist()), 4) | 151 self.assertEqual(len(self.zipfile.namelist()), 4) |
| 150 self.assertEqual(self.zipfile.namelist()[0], file1) | 152 self.assertEqual(self.zipfile.namelist()[0], file1) |
| 151 self.assertEqual(self.GetZipInfo(file1).file_size, 128) | 153 self.assertEqual(self.GetZipInfo(file1).file_size, 128) |
| 152 self.assertEqual(self.zipfile.namelist()[1], file2) | 154 self.assertEqual(self.zipfile.namelist()[1], file2) |
| 153 self.assertEqual(self.GetZipInfo(file2).file_size, 256) | 155 self.assertEqual(self.GetZipInfo(file2).file_size, 256) |
| 154 self.assertEqual(self.zipfile.namelist()[2], file3) | 156 self.assertEqual(self.zipfile.namelist()[2], file3) |
| 155 self.assertEqual(self.GetZipInfo(file3).file_size, 768) | 157 self.assertEqual(self.GetZipInfo(file3).file_size, 768) |
| 156 self.assertEqual(self.zipfile.namelist()[3], file4) | 158 self.assertEqual(self.zipfile.namelist()[3], file4) |
| 157 self.assertEqual(self.GetZipInfo(file4).file_size, 1024) | 159 self.assertEqual(self.GetZipInfo(file4).file_size, 1024) |
| 158 | 160 |
| 159 def testUpdateSubdirectory(self): | 161 def testUpdateSubdirectory(self): |
| 160 os.mkdir(self.GetTempPath('dir1')) | 162 os.mkdir(self.GetTempPath('dir1')) |
| 161 file1 = self.MakeFile(os.path.join('dir1', 'file1'), 256) | 163 file1 = self.MakeFile(os.path.join('dir1', 'file1'), 256) |
| 162 os.mkdir(self.GetTempPath(os.path.join('dir1', 'dir2'))) | 164 os.mkdir(self.GetTempPath(os.path.join('dir1', 'dir2'))) |
| 163 self.MakeFile(os.path.join('dir1', 'dir2', 'file2'), 1234) | 165 self.MakeFile(os.path.join('dir1', 'dir2', 'file2'), 1234) |
| 164 self.RunZip([self.zipname, '-r', 'dir1']) | 166 self.RunZip(self.zipname, '-r', 'dir1') |
| 165 self.OpenZipFile() | 167 self.OpenZipFile() |
| 166 self.assertEqual(len(self.zipfile.namelist()), 4) | 168 self.assertEqual(len(self.zipfile.namelist()), 4) |
| 167 self.assertEqual(self.GetZipInfo(file1).file_size, 256) | 169 self.assertEqual(self.GetZipInfo(file1).file_size, 256) |
| 168 self.CloseZipFile() | 170 self.CloseZipFile() |
| 169 | 171 |
| 170 self.MakeFile(file1, 2560) | 172 self.MakeFile(file1, 2560) |
| 171 self.RunZip([self.zipname, file1]) | 173 self.RunZip(self.zipname, file1) |
| 172 self.OpenZipFile() | 174 self.OpenZipFile() |
| 173 self.assertEqual(len(self.zipfile.namelist()), 4) | 175 self.assertEqual(len(self.zipfile.namelist()), 4) |
| 174 self.assertEqual(self.GetZipInfo(file1).file_size, 2560) | 176 self.assertEqual(self.GetZipInfo(file1).file_size, 2560) |
| 175 | 177 |
| 176 def testAppend(self): | 178 def testAppend(self): |
| 177 file1 = self.MakeFile('file1', 128) | 179 file1 = self.MakeFile('file1', 128) |
| 178 file2 = self.MakeFile('file2', 256) | 180 file2 = self.MakeFile('file2', 256) |
| 179 self.RunZip([self.zipname, file1, file2]) | 181 self.RunZip(self.zipname, file1, file2) |
| 180 self.OpenZipFile() | 182 self.OpenZipFile() |
| 181 self.assertEqual(len(self.zipfile.namelist()), 2) | 183 self.assertEqual(len(self.zipfile.namelist()), 2) |
| 182 self.CloseZipFile() | 184 self.CloseZipFile() |
| 183 | 185 |
| 184 file3 = self.MakeFile('file3', 768) | 186 file3 = self.MakeFile('file3', 768) |
| 185 self.RunZip([self.zipname, file3]) | 187 self.RunZip(self.zipname, file3) |
| 186 self.OpenZipFile() | 188 self.OpenZipFile() |
| 187 self.assertEqual(len(self.zipfile.namelist()), 3) | 189 self.assertEqual(len(self.zipfile.namelist()), 3) |
| 188 | 190 |
| 189 | 191 |
| 192 class TestWhich(unittest.TestCase): |
| 193 def setUp(self): |
| 194 self.path_list = [] |
| 195 self.tempdir = tempfile.mkdtemp() |
| 196 shutil.copy(os.path.join(PARENT_DIR, 'oshelpers.py'), |
| 197 self.tempdir) |
| 198 |
| 199 def tearDown(self): |
| 200 shutil.rmtree(self.tempdir) |
| 201 |
| 202 def Mkdir(self, path): |
| 203 os.mkdir(os.path.join(self.tempdir, path)) |
| 204 |
| 205 def MakeExecutableFile(self, *path_components): |
| 206 path = os.path.join(self.tempdir, *path_components) |
| 207 if sys.platform == 'win32': |
| 208 path += '.exe' |
| 209 |
| 210 with open(path, 'w') as f: |
| 211 f.write('') |
| 212 os.chmod(path, 0755) |
| 213 |
| 214 return path |
| 215 |
| 216 def RunWhich(self, *args): |
| 217 paths = os.pathsep.join(os.path.join(self.tempdir, p) |
| 218 for p in self.path_list) |
| 219 env = {'PATH': paths} |
| 220 return RunCmd('which', list(args), cwd=self.tempdir, env=env) |
| 221 |
| 222 def testNothing(self): |
| 223 self.assertRaises(RunError, self.RunWhich, 'foo') |
| 224 |
| 225 def testBasic(self): |
| 226 self.Mkdir('bin') |
| 227 bin_cp = self.MakeExecutableFile('bin', 'cp') |
| 228 cp = os.path.basename(bin_cp) |
| 229 |
| 230 self.path_list.append('bin') |
| 231 output, _ = self.RunWhich(cp) |
| 232 self.assertTrue(os.path.join(self.tempdir, 'bin', cp) in output) |
| 233 |
| 234 def testMulti(self): |
| 235 self.Mkdir('bin') |
| 236 bin_cp = self.MakeExecutableFile('bin', 'cp') |
| 237 bin_ls = self.MakeExecutableFile('bin', 'ls') |
| 238 cp = os.path.basename(bin_cp) |
| 239 ls = os.path.basename(bin_ls) |
| 240 |
| 241 self.path_list.append('bin') |
| 242 output, _ = self.RunWhich(cp, ls) |
| 243 self.assertTrue(os.path.join(self.tempdir, 'bin', cp) in output) |
| 244 self.assertTrue(os.path.join(self.tempdir, 'bin', ls) in output) |
| 245 |
| 246 def testNonPath(self): |
| 247 self.Mkdir('bin') |
| 248 bin_cp = self.MakeExecutableFile('bin', 'cp') |
| 249 cp = os.path.basename(bin_cp) |
| 250 |
| 251 # Note, "bin" not added to PATH. |
| 252 output, _ = self.RunWhich(bin_cp) |
| 253 self.assertTrue(os.path.join('bin', cp) in output) |
| 254 |
| 255 |
| 190 if __name__ == '__main__': | 256 if __name__ == '__main__': |
| 191 unittest.main() | 257 unittest.main() |
| OLD | NEW |