Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(161)

Side by Side Diff: swarm_client/tests/zip_package_test.py

Issue 69143004: Delete swarm_client. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/
Patch Set: Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 #!/usr/bin/env python
2 # Copyright 2013 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
5
6 import cStringIO as StringIO
7 import logging
8 import os
9 import shutil
10 import subprocess
11 import sys
12 import tempfile
13 import unittest
14 import zipfile
15
16 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
17 sys.path.insert(0, ROOT_DIR)
18
19 from utils import zip_package
20
21
22 class ZipPackageTest(unittest.TestCase):
23 def setUp(self):
24 super(ZipPackageTest, self).setUp()
25 self.temp_dir = tempfile.mkdtemp(prefix='zip_package_test')
26
27 def tearDown(self):
28 shutil.rmtree(self.temp_dir)
29 super(ZipPackageTest, self).tearDown()
30
31 def stage_files(self, files):
32 """Populates temp directory with given files specified as a list or dict."""
33 if not isinstance(files, dict):
34 files = dict((p, '') for p in files)
35 for path, content in files.iteritems():
36 abs_path = os.path.join(self.temp_dir, path.replace('/', os.sep))
37 dir_path = os.path.dirname(abs_path)
38 if not os.path.exists(dir_path):
39 os.makedirs(dir_path)
40 with open(abs_path, 'wb') as f:
41 f.write(content)
42
43 @staticmethod
44 def read_zip(stream):
45 """Given some stream with zip data, reads and decompresses it into dict."""
46 zip_file = zipfile.ZipFile(stream, 'r')
47 try:
48 return dict((i.filename, zip_file.read(i)) for i in zip_file.infolist())
49 finally:
50 zip_file.close()
51
52 def test_require_absolute_root(self):
53 # Absolute path is ok.
54 zip_package.ZipPackage(self.temp_dir)
55 # Relative path is not ok.
56 with self.assertRaises(AssertionError):
57 zip_package.ZipPackage('.')
58
59 def test_require_absolute_file_paths(self):
60 # Add some files to temp_dir.
61 self.stage_files([
62 'a.txt',
63 'b.py',
64 'c/c.txt',
65 ])
66
67 # Item to add -> method used to add it.
68 cases = [
69 ('a.txt', zip_package.ZipPackage.add_file),
70 ('b.py', zip_package.ZipPackage.add_python_file),
71 ('c', zip_package.ZipPackage.add_directory),
72 ]
73
74 for path, method in cases:
75 pkg = zip_package.ZipPackage(self.temp_dir)
76 # Absolute path is ok.
77 method(pkg, os.path.join(self.temp_dir, path))
78 # Relative path is not ok.
79 with self.assertRaises(AssertionError):
80 method(pkg, path)
81
82 def test_added_files_are_under_root(self):
83 # Add some files to temp_dir.
84 self.stage_files([
85 'a.txt',
86 'p.py',
87 'pkg/1.txt',
88 'some_dir/2.txt',
89 ])
90
91 # Adding using |archive_path| should work.
92 pkg = zip_package.ZipPackage(os.path.join(self.temp_dir, 'pkg'))
93 pkg.add_file(os.path.join(self.temp_dir, 'a.txt'), '_a.txt')
94 pkg.add_python_file(os.path.join(self.temp_dir, 'p.py'), '_p.py')
95 pkg.add_directory(os.path.join(self.temp_dir, 'pkg'), '_pkg')
96
97 # Adding without |archive_path| should fail.
98 with self.assertRaises(zip_package.ZipPackageError):
99 pkg.add_file(os.path.join(self.temp_dir, 'a.txt'))
100 with self.assertRaises(zip_package.ZipPackageError):
101 pkg.add_python_file(os.path.join(self.temp_dir, 'p.py'))
102 with self.assertRaises(zip_package.ZipPackageError):
103 pkg.add_directory(os.path.join(self.temp_dir, 'a.txt'))
104
105 def test_adding_missing_files(self):
106 pkg = zip_package.ZipPackage(self.temp_dir)
107 with self.assertRaises(zip_package.ZipPackageError):
108 pkg.add_file(os.path.join(self.temp_dir, 'im_not_here'))
109 with self.assertRaises(zip_package.ZipPackageError):
110 pkg.add_python_file(os.path.join(self.temp_dir, 'im_not_here.py'))
111 with self.assertRaises(zip_package.ZipPackageError):
112 pkg.add_directory(os.path.join(self.temp_dir, 'im_not_here_dir'))
113
114 def test_adding_dir_as_file(self):
115 # Create 'dir'.
116 self.stage_files(['dir/keep'])
117 # Try to add it as file, not a directory.
118 pkg = zip_package.ZipPackage(self.temp_dir)
119 with self.assertRaises(zip_package.ZipPackageError):
120 pkg.add_file(os.path.join(self.temp_dir, 'dir'))
121 # Adding as directory works.
122 pkg.add_directory(os.path.join(self.temp_dir, 'dir'))
123
124 def test_adding_non_python_as_python(self):
125 self.stage_files(['file.sh'])
126 pkg = zip_package.ZipPackage(self.temp_dir)
127 with self.assertRaises(zip_package.ZipPackageError):
128 pkg.add_python_file(os.path.join(self.temp_dir, 'file.sh'))
129
130 def test_adding_py_instead_of_pyc(self):
131 self.stage_files([
132 'file.py',
133 'file.pyo',
134 'file.pyc',
135 ])
136 for alternative in ('file.pyc', 'file.pyo'):
137 pkg = zip_package.ZipPackage(self.temp_dir)
138 pkg.add_python_file(os.path.join(self.temp_dir, alternative))
139 self.assertIn('file.py', pkg.files)
140
141 def test_adding_same_file_twice(self):
142 self.stage_files(['file'])
143 pkg = zip_package.ZipPackage(self.temp_dir)
144 pkg.add_file(os.path.join(self.temp_dir, 'file'))
145 with self.assertRaises(zip_package.ZipPackageError):
146 pkg.add_file(os.path.join(self.temp_dir, 'file'))
147
148 def test_add_directory(self):
149 should_add = [
150 'script.py',
151 'a/1.txt',
152 'a/2.txt',
153 'a/b/3.txt',
154 'a/script.py',
155 ]
156 should_ignore = [
157 'script.pyc',
158 'a/script.pyo',
159 '.git/stuff',
160 '.svn/stuff',
161 'a/.svn/stuff',
162 'a/b/.svn/stuff',
163 ]
164 # Add a whole set and verify only files from |should_add| were added.
165 self.stage_files(should_add + should_ignore)
166 pkg = zip_package.ZipPackage(self.temp_dir)
167 pkg.add_directory(self.temp_dir)
168 self.assertEqual(set(pkg.files), set(should_add))
169
170 def test_archive_path_is_respected(self):
171 self.stage_files(['a', 'b.py', 'dir/c'])
172 pkg = zip_package.ZipPackage(self.temp_dir)
173 pkg.add_file(os.path.join(self.temp_dir, 'a'), 'd1/a')
174 pkg.add_python_file(os.path.join(self.temp_dir, 'b.py'), 'd2/b.py')
175 pkg.add_directory(os.path.join(self.temp_dir, 'dir'), 'd3')
176 self.assertEqual(set(pkg.files), set(['d1/a', 'd2/b.py', 'd3/c']))
177
178 def test_add_buffer(self):
179 pkg = zip_package.ZipPackage(self.temp_dir)
180 pkg.add_buffer('buf1', '')
181 self.assertEqual(pkg.files, ['buf1'])
182 # No unicode.
183 with self.assertRaises(AssertionError):
184 pkg.add_buffer('buf2', u'unicode')
185
186 def test_zipping(self):
187 data = {'a': '123', 'b/c': '456'}
188 self.stage_files(data)
189 pkg = zip_package.ZipPackage(self.temp_dir)
190 pkg.add_directory(self.temp_dir)
191 # Test zip_into_buffer produces readable zip with same content.
192 for compress in (True, False):
193 buf = pkg.zip_into_buffer(compress=compress)
194 self.assertEqual(data, self.read_zip(StringIO.StringIO(buf)))
195 # Test zip_into_file produces readable zip with same content.
196 for compress in (True, False):
197 path = os.path.join(self.temp_dir, 'pkg.zip')
198 pkg.zip_into_file(path, compress=compress)
199 with open(path, 'rb') as f:
200 self.assertEqual(data, self.read_zip(f))
201
202 def test_repeatable_content(self):
203 content = []
204 for _ in range(2):
205 # Build temp dir content from scratch.
206 assert not os.listdir(self.temp_dir)
207 self.stage_files({'a': '123', 'b': '456', 'c': '789'})
208 # Zip it.
209 pkg = zip_package.ZipPackage(self.temp_dir)
210 pkg.add_directory(self.temp_dir)
211 content.append(pkg.zip_into_buffer())
212 # Clear everything.
213 for name in os.listdir(self.temp_dir):
214 os.remove(os.path.join(self.temp_dir, name))
215 # Contents of both runs should match exactly.
216 self.assertEqual(content[0], content[1])
217
218 def test_running_from_zip(self):
219 # Test assumes that it runs from a normal checkout, not a zip.
220 self.assertFalse(zip_package.is_zipped_module(sys.modules[__name__]))
221 self.assertIsNone(zip_package.get_module_zip_archive(sys.modules[__name__]))
222 self.assertTrue(os.path.abspath(
223 zip_package.get_main_script_path()).startswith(ROOT_DIR))
224
225 # Build executable zip that calls same functions.
226 pkg = zip_package.ZipPackage(self.temp_dir)
227 pkg.add_directory(os.path.join(ROOT_DIR, 'utils'), 'utils')
228 pkg.add_buffer('__main__.py', '\n'.join([
229 'import sys',
230 '',
231 'from utils import zip_package',
232 '',
233 'print zip_package.is_zipped_module(sys.modules[__name__])',
234 'print zip_package.get_module_zip_archive(sys.modules[__name__])',
235 'print zip_package.get_main_script_path()',
236 ]))
237 zip_file = os.path.join(self.temp_dir, 'out.zip')
238 pkg.zip_into_file(zip_file)
239
240 # Run the zip, validate results.
241 proc = subprocess.Popen(
242 [sys.executable, zip_file],
243 stdout=subprocess.PIPE,
244 stderr=subprocess.PIPE)
245 out, err = proc.communicate()
246
247 actual = out.strip().splitlines()
248 expected = ['True', zip_file, zip_file,]
249 self.assertEqual(err, '')
250 self.assertEqual(actual, expected)
251
252
253 if __name__ == '__main__':
254 VERBOSE = '-v' in sys.argv
255 logging.basicConfig(level=logging.DEBUG if VERBOSE else logging.ERROR)
256 unittest.main()
OLDNEW
« no previous file with comments | « swarm_client/tests/url_open_timeout_test.py ('k') | swarm_client/third_party/chromium/README.swarming » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698