OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2008 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 """ | |
5 Tests for L{twisted.python.zipstream} | |
6 """ | |
7 import sys | |
8 import random | |
9 import md5 | |
10 import zipfile | |
11 | |
12 from twisted.python import zipstream, filepath | |
13 from twisted.trial import unittest | |
14 | |
15 class FileEntryMixin: | |
16 """ | |
17 File entry classes should behave as file-like objects | |
18 """ | |
19 def getFileEntry(self, contents): | |
20 """ | |
21 Return an appropriate zip file entry | |
22 """ | |
23 filename = self.mktemp() | |
24 z = zipfile.ZipFile(filename, 'w', self.compression) | |
25 z.writestr('content', contents) | |
26 z.close() | |
27 z = zipstream.ChunkingZipFile(filename, 'r') | |
28 return z.readfile('content') | |
29 | |
30 | |
31 def test_isatty(self): | |
32 """ | |
33 zip files should not be ttys, so isatty() should be false | |
34 """ | |
35 self.assertEquals(self.getFileEntry('').isatty(), False) | |
36 | |
37 | |
38 def test_closed(self): | |
39 """ | |
40 The C{closed} attribute should reflect whether C{close()} has been | |
41 called. | |
42 """ | |
43 fileEntry = self.getFileEntry('') | |
44 self.assertEquals(fileEntry.closed, False) | |
45 fileEntry.close() | |
46 self.assertEquals(fileEntry.closed, True) | |
47 | |
48 | |
49 def test_readline(self): | |
50 """ | |
51 C{readline()} should mirror L{file.readline} and return up to a single | |
52 deliminter. | |
53 """ | |
54 fileEntry = self.getFileEntry('hoho\nho') | |
55 self.assertEquals(fileEntry.readline(), 'hoho\n') | |
56 self.assertEquals(fileEntry.readline(), 'ho') | |
57 self.assertEquals(fileEntry.readline(), '') | |
58 | |
59 | |
60 def test_next(self): | |
61 """ | |
62 Zip file entries should implement the iterator protocol as files do. | |
63 """ | |
64 fileEntry = self.getFileEntry('ho\nhoho') | |
65 self.assertEquals(fileEntry.next(), 'ho\n') | |
66 self.assertEquals(fileEntry.next(), 'hoho') | |
67 self.assertRaises(StopIteration, fileEntry.next) | |
68 | |
69 | |
70 def test_readlines(self): | |
71 """ | |
72 C{readlines()} should return a list of all the lines. | |
73 """ | |
74 fileEntry = self.getFileEntry('ho\nho\nho') | |
75 self.assertEquals(fileEntry.readlines(), ['ho\n', 'ho\n', 'ho']) | |
76 | |
77 | |
78 def test_iteration(self): | |
79 """ | |
80 C{__iter__()} and C{xreadlines()} should return C{self}. | |
81 """ | |
82 fileEntry = self.getFileEntry('') | |
83 self.assertIdentical(iter(fileEntry), fileEntry) | |
84 self.assertIdentical(fileEntry.xreadlines(), fileEntry) | |
85 | |
86 | |
87 def test_readWhole(self): | |
88 """ | |
89 C{.read()} should read the entire file. | |
90 """ | |
91 contents = "Hello, world!" | |
92 entry = self.getFileEntry(contents) | |
93 self.assertEquals(entry.read(), contents) | |
94 | |
95 | |
96 def test_readPartial(self): | |
97 """ | |
98 C{.read(num)} should read num bytes from the file. | |
99 """ | |
100 contents = "0123456789" | |
101 entry = self.getFileEntry(contents) | |
102 one = entry.read(4) | |
103 two = entry.read(200) | |
104 self.assertEquals(one, "0123") | |
105 self.assertEquals(two, "456789") | |
106 | |
107 | |
108 def test_tell(self): | |
109 """ | |
110 C{.tell()} should return the number of bytes that have been read so | |
111 far. | |
112 """ | |
113 contents = "x" * 100 | |
114 entry = self.getFileEntry(contents) | |
115 entry.read(2) | |
116 self.assertEquals(entry.tell(), 2) | |
117 entry.read(4) | |
118 self.assertEquals(entry.tell(), 6) | |
119 | |
120 | |
121 | |
122 class DeflatedZipFileEntryTest(FileEntryMixin, unittest.TestCase): | |
123 """ | |
124 DeflatedZipFileEntry should be file-like | |
125 """ | |
126 compression = zipfile.ZIP_DEFLATED | |
127 | |
128 | |
129 | |
130 class ZipFileEntryTest(FileEntryMixin, unittest.TestCase): | |
131 """ | |
132 ZipFileEntry should be file-like | |
133 """ | |
134 compression = zipfile.ZIP_STORED | |
135 | |
136 | |
137 | |
138 class ZipstreamTest(unittest.TestCase): | |
139 """ | |
140 Tests for twisted.python.zipstream | |
141 """ | |
142 def setUp(self): | |
143 """ | |
144 Creates junk data that can be compressed and a test directory for any | |
145 files that will be created | |
146 """ | |
147 self.testdir = filepath.FilePath(self.mktemp()) | |
148 self.testdir.makedirs() | |
149 self.unzipdir = self.testdir.child('unzipped') | |
150 self.unzipdir.makedirs() | |
151 | |
152 | |
153 def makeZipFile(self, contents, directory=''): | |
154 """ | |
155 Makes a zip file archive containing len(contents) files. Contents | |
156 should be a list of strings, each string being the content of one file. | |
157 """ | |
158 zpfilename = self.testdir.child('zipfile.zip').path | |
159 zpfile = zipfile.ZipFile(zpfilename, 'w') | |
160 for i, content in enumerate(contents): | |
161 filename = str(i) | |
162 if directory: | |
163 filename = directory + "/" + filename | |
164 zpfile.writestr(filename, content) | |
165 zpfile.close() | |
166 return zpfilename | |
167 | |
168 | |
169 def test_countEntries(self): | |
170 """ | |
171 Make sure the deprecated L{countZipFileEntries} returns the correct | |
172 number of entries for a zip file. | |
173 """ | |
174 name = self.makeZipFile(["one", "two", "three", "four", "five"]) | |
175 result = self.assertWarns(DeprecationWarning, | |
176 "countZipFileEntries is deprecated.", | |
177 __file__, lambda : | |
178 zipstream.countZipFileEntries(name)) | |
179 self.assertEquals(result, 5) | |
180 | |
181 | |
182 def test_invalidMode(self): | |
183 """ | |
184 A ChunkingZipFile opened in write-mode should not allow .readfile(), | |
185 and raise a RuntimeError instead. | |
186 """ | |
187 czf = zipstream.ChunkingZipFile(self.mktemp(), "w") | |
188 self.assertRaises(RuntimeError, czf.readfile, "something") | |
189 | |
190 | |
191 def test_closedArchive(self): | |
192 """ | |
193 A closed ChunkingZipFile should raise a L{RuntimeError} when | |
194 .readfile() is invoked. | |
195 """ | |
196 czf = zipstream.ChunkingZipFile(self.makeZipFile(["something"]), "r") | |
197 czf.close() | |
198 self.assertRaises(RuntimeError, czf.readfile, "something") | |
199 | |
200 | |
201 def test_invalidHeader(self): | |
202 """ | |
203 A zipfile entry with the wrong magic number should raise BadZipfile for | |
204 readfile(), but that should not affect other files in the archive. | |
205 """ | |
206 fn = self.makeZipFile(["test contents", | |
207 "more contents"]) | |
208 zf = zipfile.ZipFile(fn, "r") | |
209 zeroOffset = zf.getinfo("0").header_offset | |
210 zf.close() | |
211 # Zero out just the one header. | |
212 scribble = file(fn, "r+b") | |
213 scribble.seek(zeroOffset, 0) | |
214 scribble.write(chr(0) * 4) | |
215 scribble.close() | |
216 czf = zipstream.ChunkingZipFile(fn) | |
217 self.assertRaises(zipfile.BadZipfile, czf.readfile, "0") | |
218 self.assertEquals(czf.readfile("1").read(), "more contents") | |
219 | |
220 | |
221 def test_filenameMismatch(self): | |
222 """ | |
223 A zipfile entry with a different filename than is found in the central | |
224 directory should raise BadZipfile. | |
225 """ | |
226 fn = self.makeZipFile(["test contents", | |
227 "more contents"]) | |
228 zf = zipfile.ZipFile(fn, "r") | |
229 info = zf.getinfo("0") | |
230 info.filename = "not zero" | |
231 zf.close() | |
232 scribble = file(fn, "r+b") | |
233 scribble.seek(info.header_offset, 0) | |
234 scribble.write(info.FileHeader()) | |
235 scribble.close() | |
236 | |
237 czf = zipstream.ChunkingZipFile(fn) | |
238 self.assertRaises(zipfile.BadZipfile, czf.readfile, "0") | |
239 self.assertEquals(czf.readfile("1").read(), "more contents") | |
240 | |
241 | |
242 if sys.version_info < (2, 5): | |
243 # In python 2.4 and earlier, consistency between the directory and the | |
244 # file header are verified at archive-opening time. In python 2.5 | |
245 # (and, presumably, later) it is readzipfile's responsibility. | |
246 message = "Consistency-checking only necessary in 2.5." | |
247 test_invalidHeader.skip = message | |
248 test_filenameMismatch.skip = message | |
249 | |
250 | |
251 | |
252 def test_unsupportedCompression(self): | |
253 """ | |
254 A zipfile which describes an unsupported compression mechanism should | |
255 raise BadZipfile. | |
256 """ | |
257 fn = self.mktemp() | |
258 zf = zipfile.ZipFile(fn, "w") | |
259 zi = zipfile.ZipInfo("0") | |
260 zf.writestr(zi, "some data") | |
261 # Mangle its compression type in the central directory; can't do this | |
262 # before the writestr call or zipfile will (correctly) tell us not to | |
263 # pass bad compression types :) | |
264 zi.compress_type = 1234 | |
265 zf.close() | |
266 | |
267 czf = zipstream.ChunkingZipFile(fn) | |
268 self.assertRaises(zipfile.BadZipfile, czf.readfile, "0") | |
269 | |
270 | |
271 def test_extraData(self): | |
272 """ | |
273 readfile() should skip over 'extra' data present in the zip metadata. | |
274 """ | |
275 fn = self.mktemp() | |
276 zf = zipfile.ZipFile(fn, 'w') | |
277 zi = zipfile.ZipInfo("0") | |
278 zi.extra = "hello, extra" | |
279 zf.writestr(zi, "the real data") | |
280 zf.close() | |
281 czf = zipstream.ChunkingZipFile(fn) | |
282 self.assertEquals(czf.readfile("0").read(), "the real data") | |
283 | |
284 | |
285 def test_unzipIter(self): | |
286 """ | |
287 L{twisted.python.zipstream.unzipIter} should unzip a file for each | |
288 iteration and yield the number of files left to unzip after that | |
289 iteration | |
290 """ | |
291 numfiles = 10 | |
292 contents = ['This is test file %d!' % i for i in range(numfiles)] | |
293 zpfilename = self.makeZipFile(contents) | |
294 uziter = zipstream.unzipIter(zpfilename, self.unzipdir.path) | |
295 for i in range(numfiles): | |
296 self.assertEquals(len(list(self.unzipdir.children())), i) | |
297 self.assertEquals(uziter.next(), numfiles - i - 1) | |
298 self.assertEquals(len(list(self.unzipdir.children())), numfiles) | |
299 | |
300 for child in self.unzipdir.children(): | |
301 num = int(child.basename()) | |
302 self.assertEquals(child.open().read(), contents[num]) | |
303 | |
304 | |
305 def test_unzip(self): | |
306 """ | |
307 L{twisted.python.zipstream.unzip} should extract all files from a zip | |
308 archive | |
309 """ | |
310 numfiles = 3 | |
311 zpfilename = self.makeZipFile([''] * numfiles) | |
312 zipstream.unzip(zpfilename, self.unzipdir.path) | |
313 self.assertEquals(len(list(self.unzipdir.children())), numfiles) | |
314 | |
315 | |
316 def test_overwrite(self): | |
317 """ | |
318 L{twisted.python.zipstream.unzip} and | |
319 L{twisted.python.zipstream.unzipIter} shouldn't overwrite files unless | |
320 the 'overwrite' flag is passed | |
321 """ | |
322 testfile = self.unzipdir.child('0') | |
323 zpfilename = self.makeZipFile(['OVERWRITTEN']) | |
324 | |
325 testfile.setContent('NOT OVERWRITTEN') | |
326 zipstream.unzip(zpfilename, self.unzipdir.path) | |
327 self.assertEquals(testfile.open().read(), 'NOT OVERWRITTEN') | |
328 zipstream.unzip(zpfilename, self.unzipdir.path, overwrite=True) | |
329 self.assertEquals(testfile.open().read(), 'OVERWRITTEN') | |
330 | |
331 testfile.setContent('NOT OVERWRITTEN') | |
332 uziter = zipstream.unzipIter(zpfilename, self.unzipdir.path) | |
333 uziter.next() | |
334 self.assertEquals(testfile.open().read(), 'NOT OVERWRITTEN') | |
335 uziter = zipstream.unzipIter(zpfilename, self.unzipdir.path, | |
336 overwrite=True) | |
337 uziter.next() | |
338 self.assertEquals(testfile.open().read(), 'OVERWRITTEN') | |
339 | |
340 | |
341 # XXX these tests are kind of gross and old, but I think unzipIterChunky is | |
342 # kind of a gross function anyway. We should really write an abstract | |
343 # copyTo/moveTo that operates on FilePath and make sure ZipPath can support | |
344 # it, then just deprecate / remove this stuff. | |
345 def _unzipIterChunkyTest(self, compression, chunksize, lower, upper): | |
346 """ | |
347 unzipIterChunky should unzip the given number of bytes per iteration. | |
348 """ | |
349 junk = ' '.join([str(random.random()) for n in xrange(1000)]) | |
350 junkmd5 = md5.new(junk).hexdigest() | |
351 | |
352 tempdir = filepath.FilePath(self.mktemp()) | |
353 tempdir.makedirs() | |
354 zfpath = tempdir.child('bigfile.zip').path | |
355 self._makebigfile(zfpath, compression, junk) | |
356 uziter = zipstream.unzipIterChunky(zfpath, tempdir.path, | |
357 chunksize=chunksize) | |
358 r = uziter.next() | |
359 # test that the number of chunks is in the right ballpark; | |
360 # this could theoretically be any number but statistically it | |
361 # should always be in this range | |
362 approx = lower < r < upper | |
363 self.failUnless(approx) | |
364 for r in uziter: | |
365 pass | |
366 self.assertEqual(r, 0) | |
367 newmd5 = md5.new( | |
368 tempdir.child("zipstreamjunk").open().read()).hexdigest() | |
369 self.assertEqual(newmd5, junkmd5) | |
370 | |
371 def test_unzipIterChunkyStored(self): | |
372 """ | |
373 unzipIterChunky should unzip the given number of bytes per iteration on | |
374 a stored archive. | |
375 """ | |
376 self._unzipIterChunkyTest(zipfile.ZIP_STORED, 500, 35, 45) | |
377 | |
378 | |
379 def test_chunkyDeflated(self): | |
380 """ | |
381 unzipIterChunky should unzip the given number of bytes per iteration on | |
382 a deflated archive. | |
383 """ | |
384 self._unzipIterChunkyTest(zipfile.ZIP_DEFLATED, 972, 23, 27) | |
385 | |
386 | |
387 def _makebigfile(self, filename, compression, junk): | |
388 """ | |
389 Create a zip file with the given file name and compression scheme. | |
390 """ | |
391 zf = zipfile.ZipFile(filename, 'w', compression) | |
392 for i in range(10): | |
393 fn = 'zipstream%d' % i | |
394 zf.writestr(fn, "") | |
395 zf.writestr('zipstreamjunk', junk) | |
396 zf.close() | |
OLD | NEW |